From 48f9a39498baa9d484b8b94d05549aa2358e50e3 Mon Sep 17 00:00:00 2001 From: Basique Evangelist Date: Wed, 3 Mar 2021 16:41:08 +0300 Subject: [PATCH] Add heartbeats --- JetHerald/Controllers/HeartbeatController.cs | 46 +++++++++++++++++ JetHerald/Db.cs | 54 ++++++++++++++++++++ JetHerald/JetHeraldBot.cs | 44 ++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 JetHerald/Controllers/HeartbeatController.cs diff --git a/JetHerald/Controllers/HeartbeatController.cs b/JetHerald/Controllers/HeartbeatController.cs new file mode 100644 index 0000000..8ee707f --- /dev/null +++ b/JetHerald/Controllers/HeartbeatController.cs @@ -0,0 +1,46 @@ +using System; +using System.Runtime.Serialization; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Mvc; + +namespace JetHerald.Controllers +{ + [ApiController] + public class HeartbeatController : ControllerBase + { + Db Db { get; } + JetHeraldBot Herald { get; } + + public HeartbeatController(Db db, JetHeraldBot herald) + { + Herald = herald; + Db = db; + } + + [Route("api/heartbeat")] + [HttpPost] + public async Task Heartbeat([FromBody] HeartbeatArgs args) + { + var t = await Db.GetTopic(args.Topic); + if (t == null) + return new NotFoundResult(); + else if (!t.WriteToken.Equals(args.WriteToken, StringComparison.OrdinalIgnoreCase)) + return StatusCode(403); + + if (t.ExpiryMessageSent) + await Herald.HeartbeatSent(t); + + await Db.AddExpiry(t.Name, args.ExpiryTimeout); + + return new OkResult(); + } + + [DataContract] + public class HeartbeatArgs + { + [DataMember] public string Topic { get; set; } + [DataMember] public int ExpiryTimeout { get; set; } + [DataMember] public string WriteToken { get; set; } + } + } +} \ No newline at end of file diff --git a/JetHerald/Db.cs b/JetHerald/Db.cs index 7fd48fe..57693e0 100644 --- a/JetHerald/Db.cs +++ b/JetHerald/Db.cs @@ -1,8 +1,10 @@ +using System; using System.Collections.Generic; using Microsoft.Extensions.Options; using MySql.Data.MySqlClient; using Dapper; using System.Threading.Tasks; +using System.Threading; namespace JetHerald { @@ -17,10 +19,19 @@ namespace JetHerald public string ReadToken { get; set; } public string WriteToken { get; set; } public string AdminToken { get; set; } + public DateTime? ExpiryTime { get; set; } + public bool ExpiryMessageSent { get; set; } public long? ChatId { get; set; } } + public class ExpiredTopicChat + { + public long ChatId; + public string Description; + public DateTime ExpiryTime { get; set; } + } + public async Task DeleteTopic(string name, string adminToken) { using (var c = GetConnection()) @@ -119,6 +130,49 @@ namespace JetHerald new { topicName, chatId }); } + public Task AddExpiry(string topicName, int addedTime) + { + using var c = GetConnection(); + return c.ExecuteAsync( + " UPDATE topic" + + " SET ExpiryTime = CURRENT_TIMESTAMP() + INTERVAL @addedTime SECOND," + + " ExpiryMessageSent = 0" + + " WHERE Name = @topicName", + new { topicName, addedTime }); + } + + public Task DisableExpiry(string name, string adminToken) + { + using var c = GetConnection(); + return c.ExecuteAsync( + " UPDATE topic" + + " SET ExpiryTime = NULL," + + " ExpiryMessageSent = 0" + + " WHERE Name = @name AND AdminToken = @adminToken", + new { name, adminToken }); + } + + public Task> GetExpiredTopics(CancellationToken token = default) + { + using var c = GetConnection(); + return c.QueryAsync( + " SELECT tc.ChatId, t.Description, t.ExpiryTime" + + " FROM topic_chat tc" + + " INNER JOIN topic t ON t.TopicId = tc.TopicId" + + " WHERE t.ExpiryTime < CURRENT_TIMESTAMP() AND NOT t.ExpiryMessageSent", + token); + } + + public Task MarkExpiredTopics(CancellationToken token = default) + { + using var c = GetConnection(); + return c.ExecuteAsync( + " UPDATE topic t" + + " SET t.ExpiryMessageSent = 1" + + " WHERE t.ExpiryTime < CURRENT_TIMESTAMP()", + token); + } + public Db(IOptions cfg) { Config = cfg.Value; diff --git a/JetHerald/JetHeraldBot.cs b/JetHerald/JetHeraldBot.cs index 7d9641d..dc0eea0 100644 --- a/JetHerald/JetHeraldBot.cs +++ b/JetHerald/JetHeraldBot.cs @@ -8,6 +8,7 @@ using Telegram.Bot.Args; using Telegram.Bot.Types.Enums; using JetHerald.Commands; +using System.Threading; namespace JetHerald { @@ -26,6 +27,8 @@ namespace JetHerald TelegramBotClient Client { get; set; } ChatCommandRouter Commands; + CancellationTokenSource HeartbeatCancellation; + Task HeartbeatTask; Telegram.Bot.Types.User Me { get; set; } public async Task Init() @@ -49,10 +52,51 @@ namespace JetHerald Commands.Add(new UnsubscribeCommand(Db), "unsubscribe", "unsub"); Commands.Add(new ListCommand(Db), "list"); + HeartbeatCancellation = new(); + HeartbeatTask = CheckHeartbeats(HeartbeatCancellation.Token); + Client.OnMessage += BotOnMessageReceived; Client.StartReceiving(); } + public async Task Stop() + { + Client.StopReceiving(); + HeartbeatCancellation.Cancel(); + try + { + await HeartbeatTask; + } + catch (TaskCanceledException) + { + + } + } + + public async Task CheckHeartbeats(CancellationToken token) + { + while (!token.IsCancellationRequested) + { + await Task.Delay(1000 * 10, token); + + foreach (var chatSent in await Db.GetExpiredTopics(token)) + { + var formatted = $"!{chatSent.Description}!:\nTimeout expired at {chatSent.ExpiryTime}"; + await Client.SendTextMessageAsync(chatSent.ChatId, formatted, cancellationToken: token); + } + + await Db.MarkExpiredTopics(token); + } + } + + public async Task HeartbeatSent(Db.Topic topic) + { + var chatIds = await Db.GetChatIdsForTopic(topic.TopicId); + var formatted = $"!{topic.Description}!:\nA heartbeat has been sent."; + foreach (var c in chatIds) + await Client.SendTextMessageAsync(c, formatted); + } + public async Task PublishMessage(Db.Topic topic, string message) { var chatIds = await Db.GetChatIdsForTopic(topic.TopicId);