Add heartbeats

This commit is contained in:
Basique Evangelist 2021-03-03 16:41:08 +03:00
parent ca5bd54495
commit f3c8202b10
3 changed files with 144 additions and 0 deletions

View File

@ -0,0 +1,46 @@
using System;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
namespace JetHerald.Controllers
{
[ApiController]
public class HeartbeatController : ControllerBase
{
Db Db { get; }
JetHeraldBot Herald { get; }
public HeartbeatController(Db db, JetHeraldBot herald)
{
Herald = herald;
Db = db;
}
[Route("api/heartbeat")]
[HttpPost]
public async Task<IActionResult> Heartbeat([FromBody] HeartbeatArgs args)
{
var t = await Db.GetTopic(args.Topic);
if (t == null)
return new NotFoundResult();
else if (!t.WriteToken.Equals(args.WriteToken, StringComparison.OrdinalIgnoreCase))
return StatusCode(403);
if (t.ExpiryMessageSent)
await Herald.HeartbeatSent(t);
await Db.AddExpiry(t.Name, args.ExpiryTimeout);
return new OkResult();
}
[DataContract]
public class HeartbeatArgs
{
[DataMember] public string Topic { get; set; }
[DataMember] public int ExpiryTimeout { get; set; }
[DataMember] public string WriteToken { get; set; }
}
}
}

View File

@ -1,8 +1,10 @@
using System;
using System.Collections.Generic; using System.Collections.Generic;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient; using MySql.Data.MySqlClient;
using Dapper; using Dapper;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Threading;
namespace JetHerald namespace JetHerald
{ {
@ -17,10 +19,19 @@ namespace JetHerald
public string ReadToken { get; set; } public string ReadToken { get; set; }
public string WriteToken { get; set; } public string WriteToken { get; set; }
public string AdminToken { get; set; } public string AdminToken { get; set; }
public DateTime? ExpiryTime { get; set; }
public bool ExpiryMessageSent { get; set; }
public long? ChatId { get; set; } public long? ChatId { get; set; }
} }
public class ExpiredTopicChat
{
public long ChatId;
public string Description;
public DateTime ExpiryTime { get; set; }
}
public async Task<int> DeleteTopic(string name, string adminToken) public async Task<int> DeleteTopic(string name, string adminToken)
{ {
using (var c = GetConnection()) using (var c = GetConnection())
@ -119,6 +130,49 @@ namespace JetHerald
new { topicName, chatId }); new { topicName, chatId });
} }
public Task AddExpiry(string topicName, int addedTime)
{
using var c = GetConnection();
return c.ExecuteAsync(
" UPDATE topic" +
" SET ExpiryTime = CURRENT_TIMESTAMP() + INTERVAL @addedTime SECOND," +
" ExpiryMessageSent = 0" +
" WHERE Name = @topicName",
new { topicName, addedTime });
}
public Task DisableExpiry(string name, string adminToken)
{
using var c = GetConnection();
return c.ExecuteAsync(
" UPDATE topic" +
" SET ExpiryTime = NULL," +
" ExpiryMessageSent = 0" +
" WHERE Name = @name AND AdminToken = @adminToken",
new { name, adminToken });
}
public Task<IEnumerable<ExpiredTopicChat>> GetExpiredTopics(CancellationToken token = default)
{
using var c = GetConnection();
return c.QueryAsync<ExpiredTopicChat>(
" SELECT tc.ChatId, t.Description, t.ExpiryTime" +
" FROM topic_chat tc" +
" INNER JOIN topic t ON t.TopicId = tc.TopicId" +
" WHERE t.ExpiryTime < CURRENT_TIMESTAMP() AND NOT t.ExpiryMessageSent",
token);
}
public Task MarkExpiredTopics(CancellationToken token = default)
{
using var c = GetConnection();
return c.ExecuteAsync(
" UPDATE topic t" +
" SET t.ExpiryMessageSent = 1" +
" WHERE t.ExpiryTime < CURRENT_TIMESTAMP()",
token);
}
public Db(IOptions<Options.ConnectionStrings> cfg) public Db(IOptions<Options.ConnectionStrings> cfg)
{ {
Config = cfg.Value; Config = cfg.Value;

View File

@ -8,6 +8,7 @@ using Telegram.Bot.Args;
using Telegram.Bot.Types.Enums; using Telegram.Bot.Types.Enums;
using JetHerald.Commands; using JetHerald.Commands;
using System.Threading;
namespace JetHerald namespace JetHerald
{ {
@ -26,6 +27,8 @@ namespace JetHerald
TelegramBotClient Client { get; set; } TelegramBotClient Client { get; set; }
ChatCommandRouter Commands; ChatCommandRouter Commands;
CancellationTokenSource HeartbeatCancellation;
Task HeartbeatTask;
Telegram.Bot.Types.User Me { get; set; } Telegram.Bot.Types.User Me { get; set; }
public async Task Init() public async Task Init()
@ -49,10 +52,51 @@ namespace JetHerald
Commands.Add(new UnsubscribeCommand(Db), "unsubscribe", "unsub"); Commands.Add(new UnsubscribeCommand(Db), "unsubscribe", "unsub");
Commands.Add(new ListCommand(Db), "list"); Commands.Add(new ListCommand(Db), "list");
HeartbeatCancellation = new();
HeartbeatTask = CheckHeartbeats(HeartbeatCancellation.Token);
Client.OnMessage += BotOnMessageReceived; Client.OnMessage += BotOnMessageReceived;
Client.StartReceiving(); Client.StartReceiving();
} }
public async Task Stop()
{
Client.StopReceiving();
HeartbeatCancellation.Cancel();
try
{
await HeartbeatTask;
}
catch (TaskCanceledException)
{
}
}
public async Task CheckHeartbeats(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
await Task.Delay(1000 * 10, token);
foreach (var chatSent in await Db.GetExpiredTopics(token))
{
var formatted = $"!{chatSent.Description}!:\nTimeout expired at {chatSent.ExpiryTime}";
await Client.SendTextMessageAsync(chatSent.ChatId, formatted, cancellationToken: token);
}
await Db.MarkExpiredTopics(token);
}
}
public async Task HeartbeatSent(Db.Topic topic)
{
var chatIds = await Db.GetChatIdsForTopic(topic.TopicId);
var formatted = $"!{topic.Description}!:\nA heartbeat has been sent.";
foreach (var c in chatIds)
await Client.SendTextMessageAsync(c, formatted);
}
public async Task PublishMessage(Db.Topic topic, string message) public async Task PublishMessage(Db.Topic topic, string message)
{ {
var chatIds = await Db.GetChatIdsForTopic(topic.TopicId); var chatIds = await Db.GetChatIdsForTopic(topic.TopicId);