mirror of
https://github.com/Jetsparrow/jetherald.git
synced 2026-01-21 07:56:09 +03:00
Add heartbeats
This commit is contained in:
parent
73e1802cb4
commit
48f9a39498
46
JetHerald/Controllers/HeartbeatController.cs
Normal file
46
JetHerald/Controllers/HeartbeatController.cs
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
using System;
|
||||||
|
using System.Runtime.Serialization;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
|
||||||
|
namespace JetHerald.Controllers
|
||||||
|
{
|
||||||
|
[ApiController]
|
||||||
|
public class HeartbeatController : ControllerBase
|
||||||
|
{
|
||||||
|
Db Db { get; }
|
||||||
|
JetHeraldBot Herald { get; }
|
||||||
|
|
||||||
|
public HeartbeatController(Db db, JetHeraldBot herald)
|
||||||
|
{
|
||||||
|
Herald = herald;
|
||||||
|
Db = db;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Route("api/heartbeat")]
|
||||||
|
[HttpPost]
|
||||||
|
public async Task<IActionResult> Heartbeat([FromBody] HeartbeatArgs args)
|
||||||
|
{
|
||||||
|
var t = await Db.GetTopic(args.Topic);
|
||||||
|
if (t == null)
|
||||||
|
return new NotFoundResult();
|
||||||
|
else if (!t.WriteToken.Equals(args.WriteToken, StringComparison.OrdinalIgnoreCase))
|
||||||
|
return StatusCode(403);
|
||||||
|
|
||||||
|
if (t.ExpiryMessageSent)
|
||||||
|
await Herald.HeartbeatSent(t);
|
||||||
|
|
||||||
|
await Db.AddExpiry(t.Name, args.ExpiryTimeout);
|
||||||
|
|
||||||
|
return new OkResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
[DataContract]
|
||||||
|
public class HeartbeatArgs
|
||||||
|
{
|
||||||
|
[DataMember] public string Topic { get; set; }
|
||||||
|
[DataMember] public int ExpiryTimeout { get; set; }
|
||||||
|
[DataMember] public string WriteToken { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,8 +1,10 @@
|
|||||||
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using MySql.Data.MySqlClient;
|
using MySql.Data.MySqlClient;
|
||||||
using Dapper;
|
using Dapper;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
namespace JetHerald
|
namespace JetHerald
|
||||||
{
|
{
|
||||||
@ -17,10 +19,19 @@ namespace JetHerald
|
|||||||
public string ReadToken { get; set; }
|
public string ReadToken { get; set; }
|
||||||
public string WriteToken { get; set; }
|
public string WriteToken { get; set; }
|
||||||
public string AdminToken { get; set; }
|
public string AdminToken { get; set; }
|
||||||
|
public DateTime? ExpiryTime { get; set; }
|
||||||
|
public bool ExpiryMessageSent { get; set; }
|
||||||
|
|
||||||
public long? ChatId { get; set; }
|
public long? ChatId { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class ExpiredTopicChat
|
||||||
|
{
|
||||||
|
public long ChatId;
|
||||||
|
public string Description;
|
||||||
|
public DateTime ExpiryTime { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
public async Task<int> DeleteTopic(string name, string adminToken)
|
public async Task<int> DeleteTopic(string name, string adminToken)
|
||||||
{
|
{
|
||||||
using (var c = GetConnection())
|
using (var c = GetConnection())
|
||||||
@ -119,6 +130,49 @@ namespace JetHerald
|
|||||||
new { topicName, chatId });
|
new { topicName, chatId });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task AddExpiry(string topicName, int addedTime)
|
||||||
|
{
|
||||||
|
using var c = GetConnection();
|
||||||
|
return c.ExecuteAsync(
|
||||||
|
" UPDATE topic" +
|
||||||
|
" SET ExpiryTime = CURRENT_TIMESTAMP() + INTERVAL @addedTime SECOND," +
|
||||||
|
" ExpiryMessageSent = 0" +
|
||||||
|
" WHERE Name = @topicName",
|
||||||
|
new { topicName, addedTime });
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task DisableExpiry(string name, string adminToken)
|
||||||
|
{
|
||||||
|
using var c = GetConnection();
|
||||||
|
return c.ExecuteAsync(
|
||||||
|
" UPDATE topic" +
|
||||||
|
" SET ExpiryTime = NULL," +
|
||||||
|
" ExpiryMessageSent = 0" +
|
||||||
|
" WHERE Name = @name AND AdminToken = @adminToken",
|
||||||
|
new { name, adminToken });
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<IEnumerable<ExpiredTopicChat>> GetExpiredTopics(CancellationToken token = default)
|
||||||
|
{
|
||||||
|
using var c = GetConnection();
|
||||||
|
return c.QueryAsync<ExpiredTopicChat>(
|
||||||
|
" SELECT tc.ChatId, t.Description, t.ExpiryTime" +
|
||||||
|
" FROM topic_chat tc" +
|
||||||
|
" INNER JOIN topic t ON t.TopicId = tc.TopicId" +
|
||||||
|
" WHERE t.ExpiryTime < CURRENT_TIMESTAMP() AND NOT t.ExpiryMessageSent",
|
||||||
|
token);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task MarkExpiredTopics(CancellationToken token = default)
|
||||||
|
{
|
||||||
|
using var c = GetConnection();
|
||||||
|
return c.ExecuteAsync(
|
||||||
|
" UPDATE topic t" +
|
||||||
|
" SET t.ExpiryMessageSent = 1" +
|
||||||
|
" WHERE t.ExpiryTime < CURRENT_TIMESTAMP()",
|
||||||
|
token);
|
||||||
|
}
|
||||||
|
|
||||||
public Db(IOptions<Options.ConnectionStrings> cfg)
|
public Db(IOptions<Options.ConnectionStrings> cfg)
|
||||||
{
|
{
|
||||||
Config = cfg.Value;
|
Config = cfg.Value;
|
||||||
|
|||||||
@ -8,6 +8,7 @@ using Telegram.Bot.Args;
|
|||||||
using Telegram.Bot.Types.Enums;
|
using Telegram.Bot.Types.Enums;
|
||||||
|
|
||||||
using JetHerald.Commands;
|
using JetHerald.Commands;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
namespace JetHerald
|
namespace JetHerald
|
||||||
{
|
{
|
||||||
@ -26,6 +27,8 @@ namespace JetHerald
|
|||||||
|
|
||||||
TelegramBotClient Client { get; set; }
|
TelegramBotClient Client { get; set; }
|
||||||
ChatCommandRouter Commands;
|
ChatCommandRouter Commands;
|
||||||
|
CancellationTokenSource HeartbeatCancellation;
|
||||||
|
Task HeartbeatTask;
|
||||||
Telegram.Bot.Types.User Me { get; set; }
|
Telegram.Bot.Types.User Me { get; set; }
|
||||||
|
|
||||||
public async Task Init()
|
public async Task Init()
|
||||||
@ -49,10 +52,51 @@ namespace JetHerald
|
|||||||
Commands.Add(new UnsubscribeCommand(Db), "unsubscribe", "unsub");
|
Commands.Add(new UnsubscribeCommand(Db), "unsubscribe", "unsub");
|
||||||
Commands.Add(new ListCommand(Db), "list");
|
Commands.Add(new ListCommand(Db), "list");
|
||||||
|
|
||||||
|
HeartbeatCancellation = new();
|
||||||
|
HeartbeatTask = CheckHeartbeats(HeartbeatCancellation.Token);
|
||||||
|
|
||||||
Client.OnMessage += BotOnMessageReceived;
|
Client.OnMessage += BotOnMessageReceived;
|
||||||
Client.StartReceiving();
|
Client.StartReceiving();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task Stop()
|
||||||
|
{
|
||||||
|
Client.StopReceiving();
|
||||||
|
HeartbeatCancellation.Cancel();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await HeartbeatTask;
|
||||||
|
}
|
||||||
|
catch (TaskCanceledException)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task CheckHeartbeats(CancellationToken token)
|
||||||
|
{
|
||||||
|
while (!token.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
await Task.Delay(1000 * 10, token);
|
||||||
|
|
||||||
|
foreach (var chatSent in await Db.GetExpiredTopics(token))
|
||||||
|
{
|
||||||
|
var formatted = $"!{chatSent.Description}!:\nTimeout expired at {chatSent.ExpiryTime}";
|
||||||
|
await Client.SendTextMessageAsync(chatSent.ChatId, formatted, cancellationToken: token);
|
||||||
|
}
|
||||||
|
|
||||||
|
await Db.MarkExpiredTopics(token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task HeartbeatSent(Db.Topic topic)
|
||||||
|
{
|
||||||
|
var chatIds = await Db.GetChatIdsForTopic(topic.TopicId);
|
||||||
|
var formatted = $"!{topic.Description}!:\nA heartbeat has been sent.";
|
||||||
|
foreach (var c in chatIds)
|
||||||
|
await Client.SendTextMessageAsync(c, formatted);
|
||||||
|
}
|
||||||
|
|
||||||
public async Task PublishMessage(Db.Topic topic, string message)
|
public async Task PublishMessage(Db.Topic topic, string message)
|
||||||
{
|
{
|
||||||
var chatIds = await Db.GetChatIdsForTopic(topic.TopicId);
|
var chatIds = await Db.GetChatIdsForTopic(topic.TopicId);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user