using MySql.Data.MySqlClient; using Dapper; using JetHerald.Options; using JetHerald.Contracts; namespace JetHerald.Services; public class Db { public async Task DeleteTopic(string name, string adminToken) { using var c = GetConnection(); return await c.ExecuteAsync( " DELETE" + " FROM topic" + " WHERE Name = @name AND AdminToken = @adminToken", new { name, adminToken }); } public async Task GetTopic(string name) { using var c = GetConnection(); return await c.QuerySingleOrDefaultAsync( " SELECT *" + " FROM topic" + " WHERE Name = @name", new { name }); } public async Task GetTopicForSub(string token, NamespacedId sub) { using var c = GetConnection(); return await c.QuerySingleOrDefaultAsync( " SELECT t.*, ts.Sub " + " FROM topic t " + " LEFT JOIN topic_sub ts ON t.TopicId = ts.TopicId AND ts.Sub = @sub " + " WHERE ReadToken = @token", new { token, sub }); } public async Task CreateTopic(NamespacedId user, string name, string descr) { using var c = GetConnection(); return await c.QuerySingleOrDefaultAsync( " INSERT INTO topic " + " ( Creator, Name, Description, ReadToken, WriteToken, AdminToken) " + " VALUES " + " (@Creator, @Name, @Description, @ReadToken, @WriteToken, @AdminToken); " + " SELECT * FROM topic WHERE TopicId = LAST_INSERT_ID(); ", new Topic { Creator = user, Name = name, Description = descr, ReadToken = TokenHelper.GetToken(), WriteToken = TokenHelper.GetToken(), AdminToken = TokenHelper.GetToken() }); } public async Task> GetSubsForTopic(uint topicId) { using var c = GetConnection(); return await c.QueryAsync( " SELECT Sub " + " FROM topic_sub " + " WHERE TopicId = @topicid", new { topicId }); } public async Task> GetTopicsForSub(NamespacedId sub) { using var c = GetConnection(); return await c.QueryAsync( " SELECT t.*" + " FROM topic_sub ts" + " JOIN topic t USING (TopicId)" + " WHERE ts.Sub = @sub", new { sub }); } public async Task CreateSubscription(uint topicId, NamespacedId sub) { using var c = GetConnection(); await c.ExecuteAsync( " INSERT INTO topic_sub" + " (TopicId, Sub)" + " VALUES" + " (@topicId, @sub)", new { topicId, sub }); } public async Task RemoveSubscription(string topicName, NamespacedId sub) { using var c = GetConnection(); return await c.ExecuteAsync( " DELETE ts " + " FROM topic_sub ts" + " JOIN topic t USING (TopicId) " + " WHERE t.Name = @topicName AND ts.Sub = @sub;", new { topicName, sub }); } public async Task ReportHeartbeat(uint topicId, string heart, int timeoutSeconds) { using var c = GetConnection(); return await c.ExecuteAsync( @" INSERT INTO heart (TopicId, Heart, Status, ExpiryTs) VALUES (@topicId, @heart, 'beating', CURRENT_TIMESTAMP() + INTERVAL @timeoutSeconds SECOND) ON DUPLICATE KEY UPDATE Status = 'beating', ExpiryTs = CURRENT_TIMESTAMP() + INTERVAL @timeoutSeconds SECOND; ", new { topicId, heart, timeoutSeconds }); } public async Task> ProcessHearts() { using var c = GetConnection(); return await c.QueryAsync("CALL process_hearts();"); } public async Task MarkHeartAttackReported(ulong id) { using var c = GetConnection(); await c.ExecuteAsync("UPDATE heartevent SET Status = 'reported' WHERE HeartEventId = @id", new { id }); } public Db(IOptionsMonitor cfg) { Config = cfg; } IOptionsMonitor Config { get; } MySqlConnection GetConnection() => new(Config.CurrentValue.DefaultConnection); }