netnr 2024-12-31

场景

如服务器监控系统,主表记录每一台服务器的信息(主要是 IP 地址)

明细表记录每台服务器上报的记录(服务器信息),假如每 2 分钟上报一次,一个小时就是 30 条,一天就会产生 720 条数据

如果想删除某台服务器及记录,则需要考虑明细表数据量过大执行超时的问题。

处理

边读边删,同时加入执行队列

创建两个数据库连接对象,一个按行流式读取、一个按批删除,在一个连接对象边读边删会出错。

在请求里面执行主表删除,另一个线程后台慢慢删除明细表,直接返回主表删除的结果;
考虑到在后台慢慢删除明细的过程中,又提交了一个或多个删除,会出现多个后台删除明细的线程,会严重拖慢数据库,所以还需要按队列执行,等前面的删除任务结束才能开始

这里没考虑后台线程执行过程中,应用程序挂了或数据库断开了等情况,如果需要考虑,那就得先把删除主表的主键及删除状态记录到数据库,服务恢复后读取记录继续执行,执行完同时更新删除状态

代码

项目环境是 EFCore

Program.cs 添加数据库连接池

builder.Services.AddDbContextPool<ContextBase>(options =>
{
    options.UseSqlServer("conn");
}, 7);

XOpsController 控制器

public class XOpsController(ContextBase cb, IServiceScopeFactory serviceScopeFactory)
{
    public ContextBase db = cb;
    public IServiceScopeFactory ssf = serviceScopeFactory;

    /// <summary>
    /// 删除记录每批次
    /// </summary>
    public int DeleteRecordMax { get; set; } = 100;

    /// <summary>
    /// 删除 Host(同时删除记录)
    /// </summary>
    /// <param name="id">唯一标识</param>
    /// <returns></returns>
    [HttpGet]
    public async Task<ResultVM> HostDelete(long id)
    {
        var vm = new ResultVM();

        if (id > 0)
        {
            // 删除主表记录
            var num = await db.XopsHost.Where(x => x.HostId == id).ExecuteDeleteAsync();
            vm.Set(num > 0);

            //删除明细记录
            if (num > 0)
            {
                var tag = $"{nameof(HostDelete)} {id}";

                // 加入队列
                QueueTo.AddWork(async () =>
                {
                    var st = Stopwatch.StartNew();

                    // 两个数据库连接对象
                    using var scopeSelect = ssf.CreateScope();
                    using var scopeDelete = ssf.CreateScope();
                    using var dbSelect = scopeSelect.ServiceProvider.GetRequiredService<ContextBase>();
                    using var dbDelete = scopeDelete.ServiceProvider.GetRequiredService<ContextBase>();

                    var batchCount = 0;
                    var batchKeys = new List<long>(DeleteRecordMax);

                    // 流式按行读取明细表主键
                    var asyncKeys = dbSelect.RecordHost.Where(x => x.HostId == id).Select(x => x.RecordId).AsAsyncEnumerable();
                    await foreach (var keyItem in asyncKeys)
                    {
                        batchKeys.Add(keyItem);
                        if (batchKeys.Count == DeleteRecordMax)
                        {
                            batchCount += await dbDelete.RecordHost.Where(x => batchKeys.Contains(x.RecordId)).ExecuteDeleteAsync();
                            batchKeys.Clear();

                            Console.WriteLine($"{tag} 累计删除 {batchCount} 累计耗时 {st.Elapsed}");
                            await Task.Delay(100);
                        }
                    }
                    // 最后一批
                    if (batchKeys.Count > 0)
                    {
                        batchCount += await dbDelete.RecordHost.Where(x => batchKeys.Contains(x.RecordId)).ExecuteDeleteAsync();
                        batchKeys.Clear();

                        Console.WriteLine($"{tag} 累计删除 {batchCount} 累计耗时 {st.Elapsed}");
                    }

                    Console.WriteLine($"{tag} done!");
                }, tag);
            }
        }
        else
        {
            vm.Set(RCodeTypes.failure);
            vm.Msg = "ID 或 地址(唯一标识)不能为空";
        }

        return vm;
    }
}

QueueTo.cs 队列执行

/// <summary>
/// 队列
/// </summary>
public static class QueueTo
{
    /// <summary>
    /// 队列
    /// </summary>
    public static ConcurrentQueue<Func<Task>> QueueAction { get; set; } = [];

    /// <summary>
    /// 标记
    /// </summary>
    static int isRunning = 0;

    /// <summary>
    /// 消费
    /// </summary>
    public static void QueueRun()
    {
        if (Interlocked.CompareExchange(ref isRunning, 1, 0) == 0)
        {
            Task.Run(async () =>
            {
                while (QueueAction.TryDequeue(out var action))
                {
                    try
                    {
                        await action.Invoke();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex);
                    }

                    Console.WriteLine($"{nameof(QueueTo)} count {QueueAction.Count}");
                    await Task.Delay(0);
                }

                Interlocked.Exchange(ref isRunning, 0);
            });
        }
    }

    /// <summary>
    /// 添加作业,已处理异常
    /// </summary>
    /// <param name="workItem"></param>
    /// <param name="workRemark">作业备注,添加日志输出</param>
    public static void AddWork(Func<Task> workItem, string workRemark = null)
    {
        QueueAction.Enqueue(workItem);
        if (!string.IsNullOrEmpty(workRemark))
        {
            Console.WriteLine($"{nameof(QueueTo)}-{nameof(AddWork)}");
            Console.WriteLine(workRemark);
        }

        QueueRun();
    }
}
登录写评论