taskcompletionsource (tcs)
一个可以手动控制状态的 task。它允许你创建一个任务,并在稍后的某个时间点手动将其标记为“已完成”。
task.whenany
超时处理机制。通过将“结果任务”和“延迟任务(task.delay)”放在一起竞争,确保程序不会因为消息丢失而永久死锁。
private taskcompletionsource<string> _transtasksource;
private taskcompletionsource<bool> _transaviresultsource;
public bool transresult{get;set;}
public string imagefilename{get;set;}
private void onappmessagereceived(appmessageeventargs obj)
{
if (obj.messageid == "xxx")
{
dictionary<string, object> dic = jsonhelper.deserialize<dictionary<string, object>>(obj.value);
if (dic["filename"].tostring().contains("avi"))
{
bool result = dic["result"].tostring() == "1";
_transaviresultsource?.trysetresult(result);
}
else
{
string filename = dic["filename"].tostring();
_transtasksource?.trysetresult(filename);
}
}
}
protected virtual async task<string> ontransstandard(string fileformat)
{
switch (fileformat)
{
case "avi":
_transaviresultsource = new taskcompletionsource<bool>();
var result = _transservice.trans2avi();
var timeouttask = task.delay(timespan.fromseconds(15));
var completedtask = await task.whenany(_transaviresultsource.task, timeouttask);
if (completedtask == _transaviresultsource.task)
{
transresult = result;
}
break;
default:
_trasntasksource = new taskcompletionsource<string>();
_transservice.trans();
var timeouttask = task.delay(timespan.fromseconds(30), cancellationtoken);
var completedtask = await task.whenany(_trasntasksource.task, timeouttask);
if (completedtask == _trasntasksource.task)
{
imagefilename = await _trasntasksource.task;
}
else
{
// todo
}
break;
}
}优化版本
using system.collections.concurrent;
// 使用字典支持并发,key 为唯一标识(如文件名或请求id)
private readonly concurrentdictionary<string, taskcompletionsource<string>> _transtasks = new();
private readonly concurrentdictionary<string, taskcompletionsource<bool>> _avitasks = new();
private void onappmessagereceived(appmessageeventargs obj)
{
if (obj.messageid != "xxx") return;
var dic = jsonhelper.deserialize<dictionary<string, object>>(obj.value);
string filename = dic["filename"]?.tostring() ?? string.empty;
if (filename.contains("avi"))
{
bool result = dic["result"]?.tostring() == "1";
// 尝试从字典中移除并设置结果,确保只处理一次
if (_avitasks.tryremove(filename, out var tcs))
{
tcs.trysetresult(result);
}
}
else
{
if (_transtasks.tryremove(filename, out var tcs))
{
tcs.trysetresult(filename);
}
}
}
protected virtual async task<string> ontransstandard(string fileformat, string filename)
{
// 1. 设置超时取消令牌
using var cts = new cancellationtokensource(timespan.fromseconds(fileformat == "avi" ? 15 : 30));
try
{
if (fileformat == "avi")
{
var tcs = new taskcompletionsource<bool>(taskcreationoptions.runcontinuationsasynchronously);
_avitasks[filename] = tcs;
_transservice.trans2avi();
// 使用 register 绑定取消令牌到 tcs
using (cts.token.register(() => tcs.trysetcanceled()))
{
transresult = await tcs.task;
return filename;
}
}
else
{
var tcs = new taskcompletionsource<string>(taskcreationoptions.runcontinuationsasynchronously);
_transtasks[filename] = tcs;
_transservice.trans();
using (cts.token.register(() => tcs.trysetcanceled()))
{
imagefilename = await tcs.task;
return imagefilename;
}
}
}
catch (operationcanceledexception)
{
// 统一处理超时逻辑
handletimeout(filename, fileformat);
return string.empty;
}
finally
{
// 确保清理字典,防止内存泄漏
_avitasks.tryremove(filename, out _);
_transtasks.tryremove(filename, out _);
}
}同步阻塞式的等待机制(同步原语)
传统的线程同步对象 manualresetevent 来强制当前线程“停下”执行,直到收到特定的信号。
调用 ondatatransformation 的线程会被物理挂起,不消耗 cpu 周期,但会占用一个线程资源,直到 _event.set() 被调用。
| 特性 | manualresetevent (手动) | autoresetevent (自动) |
|---|---|---|
| 放行数量 | 调 set() 后,所有正在等待的线程都会被放行。 | 调用 set() 后,仅有一个线程会被放行。 |
| 复位方式 | 必须手动调用 reset() 才会再次阻塞线程。 | 只要有一个线程通过,它会自动回到阻塞状态。 |
| 典型场景 | 广播/通知:一个信号通知多个任务同时开始。 | 同步/排队:确保对某个资源的访问是串行的。 |
public bool transresult { get; set; }
public string filenames { get; set; }
private static readonly manualresetevent _event = new manualresetevent(false);
private void onappmessagereceived(appmessageeventargs obj)
{
if (obj.messageid == "xxx")
{
dictionary<string, object> dic = jsonhelper.deserialize<dictionary<string, object>>(obj.value);
if (dic["filename"].tostring().contains("avi"))
{
transresult = dic["result"].tostring() == "1";
_event.set();
}
else
{
filenames = dic["filename"].tostring();
_event.set();
}
}
}
public virtual void ondatatransformation()
{
cancellationtoken.throwifcancellationrequested();
_transervice.transimage();
_event.reset();
_event.waitone();
cancellationtoken.throwifcancellationrequested();
}选择 manualresetevent 还是 autoresetevent 取决于你的控制目标:是想给“一群人”发信号,还是想让“一个人”过闸机。
1. manualresetevent(手动重置)
适用场景:状态通知 / 广播 (broadcasting)
它通常用于表示一个“开关”或“阶段完成”的状态。一旦这个状态达成了,所有依赖它的线程都可以继续。
典型例子:系统初始化
- 你的主程序启动时有多个后台服务(数据库连接、缓存加载、配置文件读取)。
- 主线程调用
_initevent.waitone()。 - 只有当所有初始化工作全部完成后,才调用一次
set()。 - 此时,所有卡在
waitone处的逻辑都会同时被释放。
- 主线程调用
典型例子:暂停/恢复功能
- 在下载器或播放器中,按下“暂停”即
reset()(关闸),所有下载线程waitone;按下“开始”即set()(开闸),所有线程同时恢复工作。
2. autoresetevent(自动重置)
适用场景:独占资源 / 生产者-消费者 (queueing)
它通常用于确保任务的串行化执行,或者作为简单的线程间信号传递。
典型例子:任务队列(单线程处理)
- 你有一个后台线程专门处理发邮件的操作。
- 当有新邮件进入队列时,调用一次
set()。 - 后台处理线程
waitone()收到信号,起来发一封邮件。 - 发完后,由于是
autoreset,它会自动回到阻塞状态,等待下一次set()。
- 当有新邮件进入队列时,调用一次
典型例子:由于硬件限制的互斥访问
- 像你代码中这种“发送指令 -> 等待硬件回传”的模式。如果你想确保发一个收一个,且不希望第二个指令在第一个指令没返回前就跑掉,
autoresetevent更安全,因为它处理完一次会自动“关门”。
3. 对比
| 维度 | manualresetevent | autoresetevent |
|---|---|---|
| 形象比喻 | 大门的闸刀开关:拉上去,所有人都能进;拉下来,所有人都得停。 | 地铁的旋转闸机:刷一次卡(set),只能进去一个人,进去后闸机立刻锁死。 |
| 核心逻辑 | 状态驱动:侧重于“某个条件是否达成”。 | 事件驱动:侧重于“某个动作是否发生”。 |
| 重置时机 | 你认为这个状态不再有效时(手动)。 | 线程穿过 waitone 的那一刻(自动)。 |
4. 为什么用得少了?
在高性能开发中,这两个类正逐渐被以下方案取代:
- taskcompletionsource (tcs):
- 理由:它是异步非阻塞的(async/non-blocking)。前面的
manual/auto都会死死占住一个操作系统线程,非常浪费资源。
- 理由:它是异步非阻塞的(async/non-blocking)。前面的
- semaphoreslim (信号量):
- 理由:它比
autoresetevent更强大。它支持waitasync(异步等待),而且可以控制允许 n 个线程同时通过,而不仅仅是一个。
- 理由:它比
- manualreseteventslim:
- 理由:如果你非要用同步等待,请优先使用带
slim后缀的版本。它在等待时间很短时会先进行“自旋(spin)”,不直接切换到昂贵的内核模式,性能更好。
- 理由:如果你非要用同步等待,请优先使用带
异步等待 (await) vs 同步挂起 (wait)
| 维度 | taskcompletionsource | manual/autoresetevent |
|---|---|---|
| 编程模型 | 异步 (asynchronous)。基于 task,符合现代 .net 开发习惯。 | 同步 (synchronous)。基于操作系统的内核对象。 |
| 线程利用率 | 高。await 时线程会释放回线程池,去处理其他任务。 | 低。当前线程被彻底卡死(block),什么都干不了。 |
| 超时处理 | 灵活。通过 task.delay 或 cancellationtoken 轻松实现。 | 较硬。需给 waitone(timeout) 传参,且写法略显臃肿。 |
| ui 响应性 | 友好。如果在 ui 线程调用,界面不会卡死。 | 危险。如果在 ui 线程调用,界面会直接崩溃/无响应(deadlock)。 |
| 并发支持 | 较好。通过 tcs 实例可以区分不同的请求。 | 差。static 的 _event 意味着全局只能同时处理一个任务。 |
第一种 (taskcompletionsource) —— 手机短信提醒: 你该干嘛干嘛(去洗澡、打游戏),线程被释放回去了。快递到了,手机响了(task 完成),你再回到门口处理快递。这种是非阻塞的。
第二种 (manualresetevent) —— 站在门口死等: 你推掉了一切活动,就站在门口盯着路口(线程被挂起)。快递员没来,你哪也不去,也不说话。快递员一招手(set),你立刻动起来。这种是同步阻塞的。
同步方式 (eventwaithandle)
// 这是一条死胡同,除非有人开门,否则车(线程)就停死在这里 _event.waitone(); // 只有开门后,才能跑这一行 donextstep();
- 后果:如果你在 ui 线程(如点击按钮)里这么写,你的软件界面会直接“未响应”,因为 ui 线程被阻塞了,没法处理鼠标点击和界面刷新。
异步方式 (taskcompletionsource)
// 这是一条路口,车(线程)发现红灯,就先掉头去干别的活了 await _tcs.task; // 绿灯亮了(setresult),会有另一辆车(或原车)回来继续跑 donextstep();
- 后果:ui 依然丝滑。
await释放了当前线程,让它回消息循环里去处理界面绘制,等结果到了再回来。
.net中的位置
在 .net 专门的同步分类中,它们属于内核模式同步对象(kernel-mode objects)。
| 类别 | 代表组件 | 特点 |
|---|---|---|
| 内核模式 (同步) | manualresetevent, autoresetevent, mutex | 重型。涉及操作系统内核切换,跨进程可用,但性能开销大。 |
| 混合模式 (同步) | manualreseteventslim, semaphoreslim | 轻量。先自旋再挂起,性能极高,是现代同步的首选。 |
| 异步模式 | taskcompletionsource, task.whenany | 现代。完全不阻塞线程,支撑高并发的核心。 |
到此这篇关于c# 异步回调与等待机制全解的文章就介绍到这了,更多相关c# 异步回调与等待机制内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论