场景
如服务器监控系统,主表记录每一台服务器的信息(主要是 IP 地址)
明细表记录每台服务器上报的记录(服务器信息),假如每 2 分钟上报一次,一个小时就是 30 条,一天就会产生 720 条数据
如果想删除某台服务器及记录,则需要考虑明细表数据量过大执行超时的问题。
处理
边读边删,同时加入执行队列
创建两个数据库连接对象,一个按行流式读取、一个按批删除,在一个连接对象边读边删会出错。
在请求里面执行主表删除,另一个线程后台慢慢删除明细表,直接返回主表删除的结果;
考虑到在后台慢慢删除明细的过程中,又提交了一个或多个删除,会出现多个后台删除明细的线程,会严重拖慢数据库,所以还需要按队列执行,等前面的删除任务结束才能开始
这里没考虑后台线程执行过程中,应用程序挂了或数据库断开了等情况,如果需要考虑,那就得先把删除主表的主键及删除状态记录到数据库,服务恢复后读取记录继续执行,执行完同时更新删除状态
代码
项目环境是 EFCore
Program.cs 添加数据库连接池
builder.Services.AddDbContextPool<ContextBase>(options =>
{
options.UseSqlServer("conn");
}, 7);
XOpsController 控制器
public class XOpsController(ContextBase cb, IServiceScopeFactory serviceScopeFactory)
{
public ContextBase db = cb;
public IServiceScopeFactory ssf = serviceScopeFactory;
/// <summary>
/// 删除记录每批次
/// </summary>
public int DeleteRecordMax { get; set; } = 100;
/// <summary>
/// 删除 Host(同时删除记录)
/// </summary>
/// <param name="id">唯一标识</param>
/// <returns></returns>
[HttpGet]
public async Task<ResultVM> HostDelete(long id)
{
var vm = new ResultVM();
if (id > 0)
{
// 删除主表记录
var num = await db.XopsHost.Where(x => x.HostId == id).ExecuteDeleteAsync();
vm.Set(num > 0);
//删除明细记录
if (num > 0)
{
var tag = $"{nameof(HostDelete)} {id}";
// 加入队列
QueueTo.AddWork(async () =>
{
var st = Stopwatch.StartNew();
// 两个数据库连接对象
using var scopeSelect = ssf.CreateScope();
using var scopeDelete = ssf.CreateScope();
using var dbSelect = scopeSelect.ServiceProvider.GetRequiredService<ContextBase>();
using var dbDelete = scopeDelete.ServiceProvider.GetRequiredService<ContextBase>();
var batchCount = 0;
var batchKeys = new List<long>(DeleteRecordMax);
// 流式按行读取明细表主键
var asyncKeys = dbSelect.RecordHost.Where(x => x.HostId == id).Select(x => x.RecordId).AsAsyncEnumerable();
await foreach (var keyItem in asyncKeys)
{
batchKeys.Add(keyItem);
if (batchKeys.Count == DeleteRecordMax)
{
batchCount += await dbDelete.RecordHost.Where(x => batchKeys.Contains(x.RecordId)).ExecuteDeleteAsync();
batchKeys.Clear();
Console.WriteLine($"{tag} 累计删除 {batchCount} 累计耗时 {st.Elapsed}");
await Task.Delay(100);
}
}
// 最后一批
if (batchKeys.Count > 0)
{
batchCount += await dbDelete.RecordHost.Where(x => batchKeys.Contains(x.RecordId)).ExecuteDeleteAsync();
batchKeys.Clear();
Console.WriteLine($"{tag} 累计删除 {batchCount} 累计耗时 {st.Elapsed}");
}
Console.WriteLine($"{tag} done!");
}, tag);
}
}
else
{
vm.Set(RCodeTypes.failure);
vm.Msg = "ID 或 地址(唯一标识)不能为空";
}
return vm;
}
}
QueueTo.cs 队列执行
/// <summary>
/// 队列
/// </summary>
public static class QueueTo
{
/// <summary>
/// 队列
/// </summary>
public static ConcurrentQueue<Func<Task>> QueueAction { get; set; } = [];
/// <summary>
/// 标记
/// </summary>
static int isRunning = 0;
/// <summary>
/// 消费
/// </summary>
public static void QueueRun()
{
if (Interlocked.CompareExchange(ref isRunning, 1, 0) == 0)
{
Task.Run(async () =>
{
while (QueueAction.TryDequeue(out var action))
{
try
{
await action.Invoke();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.WriteLine($"{nameof(QueueTo)} count {QueueAction.Count}");
await Task.Delay(0);
}
Interlocked.Exchange(ref isRunning, 0);
});
}
}
/// <summary>
/// 添加作业,已处理异常
/// </summary>
/// <param name="workItem"></param>
/// <param name="workRemark">作业备注,添加日志输出</param>
public static void AddWork(Func<Task> workItem, string workRemark = null)
{
QueueAction.Enqueue(workItem);
if (!string.IsNullOrEmpty(workRemark))
{
Console.WriteLine($"{nameof(QueueTo)}-{nameof(AddWork)}");
Console.WriteLine(workRemark);
}
QueueRun();
}
}