概述
从.net framework 4开始,.net使用统一的模型来协作取消异步或长时间运行的同步线程。该模型基于一个称为cancellationtoken的轻量级对象。这个对象在调用一个或多个取消线程时(例如通过创建新线程或任务),是通过将token传递给每个线程来完成的(通过链式的方式依次传递)。单个线程能够依次地将token的副本传递给其他线程。
之后,在适当的某个时机,创建token的对象就可以使用token来请求线程停止。只有请求对象可以发出取消请求,每个监听器负责监听到请求并以适当和及时的方式响应取消请求。
实现协作取消模型的一般模式是:
- 1、实例化一个cancellationtokensource对象,该对象管理cancellation并将cancellation通知发送给单独的cancellation token。
- 2、cancellationtokensource对象的token属性,可以返回一个token对象,我们可以将该token对象发送给每个监听该cancellation的进程或task。
- 3、为每个任务或线程提供响应取消的机制。
- 4、调用 cancellationtokensource.cancel() 方法,来取消线程或者task。
【tips】我们在使用cancellation的token取消线程后,应该确保调用cancellationtokensource.dispose()方法,以便于释放它持有的任何非托管资源。。
下图展示出了cancellationtokensource对象里的token属性对象,是如何传递到其他的线程里的。

合作取消模型使创建取消感知的应用程序和库变得更容易,它支持以下功能:
- 1、取消是合式的,不会强加给监听器。监听器确定如何优雅地终止以响应取消请求。
- 2、请求不同于监听。调用可取消的线程的对象,可以控制何时(如果有的话)取消被请求。
- 3、请求的对象,可以通过仅使用一个方法,即可发送取消请求到所有的token副本中。
- 4、监听器可以通过将多个token连接成一个linked token,来同时监听多个token。
- 5、用户代码可以注意到并响应library code的取消请求,而library code可以注意到并响应用户代码的取消请求。
- 6、可以通过轮询、回调注册或等待等待句柄的方式,来通知监听器执行取消请求。
与取消线程相关的类型
取消框架是作为一组相关类型实现的,这些类型在下表中列出。
| cancellationtokensource | 该对象创建cancellation token,并向 cancellation token的所有副本分发取消请求。 |
| cancellationtoken | 传递给一个或多个监听器的轻量级的值类型,通常作为方法参数。侦听器通过轮询、回调或等待句柄监视token的iscancellationrequested属性的值。 |
| operationcanceledexception | 此异常构造函数的重载,接受cancellationtoken作为参数。侦听器可以选择性地抛出此异常以验证取消的来源,并通知其他已响应取消请求监听器。 |
取消模型以几种类型集成到.net中。
最重要的是system.threading.tasks.parallel,system.threading.tasks.task、system.threading.tasks.task<tresult> 和 system.linq.parallelenumerable。
建议使用所有新的库和应用代码来实现合作市取消模式。
代码举例
在下面的示例中,请求对象创建一个cancellationtokensource对象,然后将该对象的token属性传递给可取消的进程。
接收请求的线程通过轮询来监视token的iscancellationrequested属性的值。
当该值变为true时,侦听器可以以任何合适的方式终止。在本例中,方法只是退出,这是许多情况下所需要的全部内容。
using system;
using system.threading;
public class example
{
public static void main()
{
// create the token source.
cancellationtokensource cts = new cancellationtokensource();
// pass the token to the cancelable operation.
threadpool.queueuserworkitem(new waitcallback(dosomework), cts.token);
thread.sleep(2500);
// request cancellation.
cts.cancel();
console.writeline("cancellation set in token source...");
thread.sleep(2500);
// cancellation should have happened, so call dispose.
cts.dispose();
}
// thread 2: the listener
static void dosomework(object? obj)
{
if (obj is null)
return;
cancellationtoken token = (cancellationtoken)obj;
for (int i = 0; i < 100000; i++)
{
if (token.iscancellationrequested)
{
console.writeline("in iteration {0}, cancellation has been requested...",
i + 1);
// perform cleanup if necessary.
//...
// terminate the operation.
break;
}
// simulate some work.
thread.spinwait(500000);
}
}
}
// the example displays output like the following:
// cancellation set in token source...
// in iteration 1430, cancellation has been requested...操作取消vs对象取消
在协作取消框架中,取消指的是操作(线程中执行的操作),而不是对象。取消请求意味着在执行任何所需的清理后,操作应尽快停止。一个cancellation token应该指向一个“可取消的操作”,无论该操作如何在您的程序中实现。
在token的iscancellationrequested属性被设置为true之后,它不能被重置为false。因此,取消令牌在被取消后不能被重用。
如果您需要对象取消机制,您可以通过调用cancellationtoken来基于操作取消机制。注册方法,如下例所示。
using system;
using system.threading;
class cancelableobject
{
public string id;
public cancelableobject(string id)
{
this.id = id;
}
public void cancel()
{
console.writeline("object {0} cancel callback", id);
// perform object cancellation here.
}
}
public class example1
{
public static void main()
{
cancellationtokensource cts = new cancellationtokensource();
cancellationtoken token = cts.token;
// user defined class with its own method for cancellation
var obj1 = new cancelableobject("1");
var obj2 = new cancelableobject("2");
var obj3 = new cancelableobject("3");
// register the object's cancel method with the token's
// cancellation request.
token.register(() => obj1.cancel());
token.register(() => obj2.cancel());
token.register(() => obj3.cancel());
// request cancellation on the token.
cts.cancel();
// call dispose when we're done with the cancellationtokensource.
cts.dispose();
}
}
// the example displays the following output:
// object 3 cancel callback
// object 2 cancel callback
// object 1 cancel callback如果一个对象支持多个并发的可取消操作,则可以给每个不同的可取消操作各自传入一个不同的token。这样,一个操作可以被取消而不会影响到其他操作。
监听并响应取消请求
在用户委托中,可取消操作的实现者决定如何终止该操作以响应取消请求。在许多情况下,用户委托可以只执行任何所需的清理,然后立即返回。
但是,在更复杂的情况下,可能需要用户委托通知库代码已发生cancellation。在这种情况下,终止操作的正确方法是委托调用throwifcancellationrequested方法,这将导致抛出operationcanceledexception异常。库代码可以在用户委托线程上捕获此异常,并检查异常的token,以确定该异常是否表示协作取消或其他异常情况。
在这种情况下,终止操作的正确方法是委托调用throwifcancellationrequested方法,这将导致抛出operationcanceledexception。库代码可以在用户委托线程上捕获此异常,并检查异常的token,以确定该异常是否表示协作取消或其他异常情况。
轮询监听
对于循环或递归的长时间运行的计算,可以通过定期轮询cancellationtoken.iscancellationrequested的值来监听取消请求。如果它的值为true,则该方法应该尽快清理并终止。轮询的最佳频率取决于应用程序的类型。开发人员可以为任何给定的程序确定最佳轮询频率。轮询本身不会显著影响性能。
下面的程序案例展示了一种可能的轮询方式。
static void nestedloops(rectangle rect, cancellationtoken token)
{
for (int col = 0; col < rect.columns && !token.iscancellationrequested; col++) {
// assume that we know that the inner loop is very fast.
// therefore, polling once per column in the outer loop condition
// is sufficient.
for (int row = 0; row < rect.rows; row++) {
// simulating work.
thread.spinwait(5_000);
console.write("{0},{1} ", col, row);
}
}
if (token.iscancellationrequested) {
// cleanup or undo here if necessary...
console.writeline("\r\noperation canceled");
console.writeline("press any key to exit.");
// if using task:
// token.throwifcancellationrequested();
}
}下面的程序代码是一个详细的实现:
using system;
using system.threading;
public class serverclass
{
public static void staticmethod(object obj)
{
cancellationtoken ct = (cancellationtoken)obj;
console.writeline("serverclass.staticmethod is running on another thread.");
// simulate work that can be canceled.
while (!ct.iscancellationrequested) {
thread.spinwait(50000);
}
console.writeline("the worker thread has been canceled. press any key to exit.");
console.readkey(true);
}
}
public class simple
{
public static void main()
{
// the simple class controls access to the token source.
cancellationtokensource cts = new cancellationtokensource();
console.writeline("press 'c' to terminate the application...\n");
// allow the ui thread to capture the token source, so that it
// can issue the cancel command.
thread t1 = new thread(() => { if (console.readkey(true).keychar.tostring().toupperinvariant() == "c")
cts.cancel(); } );
// serverclass sees only the token, not the token source.
thread t2 = new thread(new parameterizedthreadstart(serverclass.staticmethod));
// start the ui thread.
t1.start();
// start the worker thread and pass it the token.
t2.start(cts.token);
t2.join();
cts.dispose();
}
}
// the example displays the following output:
// press 'c' to terminate the application...
//
// serverclass.staticmethod is running on another thread.
// the worker thread has been canceled. press any key to exit.通过回调注册进行监听
以这种方式进行的某些操作可能会阻塞,从而无法及时检查cancellation token的值。对于这些情况,您可以注册一个回调方法,以便在收到取消请求时解除对该方法的阻塞。
register方法返回一个专门用于此目的的cancellationtokenregistration对象。下面的示例展示了如何使用register方法来取消异步web请求。
using system;
using system.net;
using system.threading;
class example4
{
static void main()
{
cancellationtokensource cts = new cancellationtokensource();
startwebrequest(cts.token);
// cancellation will cause the web
// request to be cancelled
cts.cancel();
}
static void startwebrequest(cancellationtoken token)
{
webclient wc = new webclient();
wc.downloadstringcompleted += (s, e) => console.writeline("request completed.");
// cancellation on the token will
// call cancelasync on the webclient.
token.register(() =>
{
wc.cancelasync();
console.writeline("request cancelled!");
});
console.writeline("starting request.");
wc.downloadstringasync(new uri("http://www.contoso.com"));
}
}cancellationtokenregistration对象管理线程同步,并确保回调将在精确的时间点停止执行。
为了确保系统响应性并避免死锁,在注册回调时必须遵循以下准则:
1、回调方法应该是快速的,因为它是同步调用的,因此对cancel的调用在回调返回之前不会返回。
2、如果在回调运行时调用dispose,并且持有回调等待的锁,则程序可能会死锁。dispose返回后,您可以释放回调所需的任何资源。
3、callbacks 不应该执行任何手动线程或在回调中使用synchronizationcontext。如果回调必须在特定线程上运行,则使用system.threading.cancellationtokenregistration构造函数,该构造函数使您能够指定目标synccontext是活动的synchronizationcontext.current。在回调中执行手动线程会导致死锁。
使用waithandle进行侦听
当一个可取消的操作在等待一个同步原语(如system.threading. manualresetevent或system.threading. semaphore)时可能会阻塞。
你可以使用cancellationtoken.waithandle属性,以使操作同时等待事件和取消请求。
cancellationtoken的 等待句柄 将在响应取消请求时发出信号,该方法可以使用waitany()方法的返回值来确定发出信号的是否是cancellation token。然后操作可以直接退出,或者抛出operationcanceledexception异常。
// wait on the event if it is not signaled.
int eventthatsignaledindex =
waithandle.waitany(new waithandle[] { mre, token.waithandle },
new timespan(0, 0, 20));system.threading.manualreseteventslim和system.threading.semaphoreslim都在它们的wait()方法中支持取消框架。
您可以将cancellationtoken传递给该方法,当请求取消时,事件将被唤醒并抛出operationcanceledexception。
try
{
// mres is a manualreseteventslim
mres.wait(token);
}
catch (operationcanceledexception)
{
// throw immediately to be responsive. the
// alternative is to do one more item of work,
// and throw on next iteration, because
// iscancellationrequested will be true.
console.writeline("the wait operation was canceled.");
throw;
}
console.write("working...");
// simulating work.
thread.spinwait(500000);下面的示例使用manualresetevent来演示如何解除阻塞不支持统一取消的等待句柄。
using system;
using system.threading;
using system.threading.tasks;
class canceloldstyleevents
{
// old-style mre that doesn't support unified cancellation.
static manualresetevent mre = new manualresetevent(false);
static void main()
{
var cts = new cancellationtokensource();
// pass the same token source to the delegate and to the task instance.
task.run(() => dowork(cts.token), cts.token);
console.writeline("press s to start/restart, p to pause, or c to cancel.");
console.writeline("or any other key to exit.");
// old-style ui thread.
bool goagain = true;
while (goagain)
{
char ch = console.readkey(true).keychar;
switch (ch)
{
case 'c':
cts.cancel();
break;
case 'p':
mre.reset();
break;
case 's':
mre.set();
break;
default:
goagain = false;
break;
}
thread.sleep(100);
}
cts.dispose();
}
static void dowork(cancellationtoken token)
{
while (true)
{
// wait on the event if it is not signaled.
int eventthatsignaledindex =
waithandle.waitany(new waithandle[] { mre, token.waithandle },
new timespan(0, 0, 20));
// were we canceled while waiting?
if (eventthatsignaledindex == 1)
{
console.writeline("the wait operation was canceled.");
throw new operationcanceledexception(token);
}
// were we canceled while running?
else if (token.iscancellationrequested)
{
console.writeline("i was canceled while running.");
token.throwifcancellationrequested();
}
// did we time out?
else if (eventthatsignaledindex == waithandle.waittimeout)
{
console.writeline("i timed out.");
break;
}
else
{
console.write("working... ");
// simulating work.
thread.spinwait(5000000);
}
}
}
}下面的示例使用manualreseteventslim来演示如何解除支持统一取消的协调原语的阻塞。同样的方法也可以用于其他轻量级协调原语,如semaphoreslim和countdownevent。
using system;
using system.threading;
using system.threading.tasks;
class cancelnewstyleevents
{
// new-style mreslim that supports unified cancellation
// in its wait methods.
static manualreseteventslim mres = new manualreseteventslim(false);
static void main()
{
var cts = new cancellationtokensource();
// pass the same token source to the delegate and to the task instance.
task.run(() => dowork(cts.token), cts.token);
console.writeline("press c to cancel, p to pause, or s to start/restart,");
console.writeline("or any other key to exit.");
// new-style ui thread.
bool goagain = true;
while (goagain)
{
char ch = console.readkey(true).keychar;
switch (ch)
{
case 'c':
// token can only be canceled once.
cts.cancel();
break;
case 'p':
mres.reset();
break;
case 's':
mres.set();
break;
default:
goagain = false;
break;
}
thread.sleep(100);
}
cts.dispose();
}
static void dowork(cancellationtoken token)
{
while (true)
{
if (token.iscancellationrequested)
{
console.writeline("canceled while running.");
token.throwifcancellationrequested();
}
// wait on the event to be signaled
// or the token to be canceled,
// whichever comes first. the token
// will throw an exception if it is canceled
// while the thread is waiting on the event.
try
{
// mres is a manualreseteventslim
mres.wait(token);
}
catch (operationcanceledexception)
{
// throw immediately to be responsive. the
// alternative is to do one more item of work,
// and throw on next iteration, because
// iscancellationrequested will be true.
console.writeline("the wait operation was canceled.");
throw;
}
console.write("working...");
// simulating work.
thread.spinwait(500000);
}
}
}同时监听多个令牌
在某些情况下,侦听器必须同时侦听多个cancellation token。
例如,一个可取消操作除了监控通过方法形参传入的外部token之外,还可能必须监视内部的cancellation token。为此,创建一个linked token源,它可以将两个或多个token连接到一个token中,如下面的示例所示。
using system;
using system.threading;
using system.threading.tasks;
class linkedtokensourcedemo
{
static void main()
{
workerwithtimer worker = new workerwithtimer();
cancellationtokensource cts = new cancellationtokensource();
// task for ui thread, so we can call task.wait wait on the main thread.
task.run(() =>
{
console.writeline("press 'c' to cancel within 3 seconds after work begins.");
console.writeline("or let the task time out by doing nothing.");
if (console.readkey(true).keychar == 'c')
cts.cancel();
});
// let the user read the ui message.
thread.sleep(1000);
// start the worker task.
task task = task.run(() => worker.dowork(cts.token), cts.token);
try
{
task.wait(cts.token);
}
catch (operationcanceledexception e)
{
if (e.cancellationtoken == cts.token)
console.writeline("canceled from ui thread throwing oce.");
}
catch (aggregateexception ae)
{
console.writeline("aggregateexception caught: " + ae.innerexception);
foreach (var inner in ae.innerexceptions)
{
console.writeline(inner.message + inner.source);
}
}
console.writeline("press any key to exit.");
console.readkey();
cts.dispose();
}
}
class workerwithtimer
{
cancellationtokensource internaltokensource = new cancellationtokensource();
cancellationtoken internaltoken;
cancellationtoken externaltoken;
timer timer;
public workerwithtimer()
{
// a toy cancellation trigger that times out after 3 seconds
// if the user does not press 'c'.
timer = new timer(new timercallback(cancelaftertimeout), null, 3000, 3000);
}
public void dowork(cancellationtoken externaltoken)
{
// create a new token that combines the internal and external tokens.
this.internaltoken = internaltokensource.token;
this.externaltoken = externaltoken;
using (cancellationtokensource linkedcts =
cancellationtokensource.createlinkedtokensource(internaltoken, externaltoken))
{
try
{
doworkinternal(linkedcts.token);
}
catch (operationcanceledexception)
{
if (internaltoken.iscancellationrequested)
{
console.writeline("operation timed out.");
}
else if (externaltoken.iscancellationrequested)
{
console.writeline("cancelling per user request.");
externaltoken.throwifcancellationrequested();
}
}
}
}
private void doworkinternal(cancellationtoken token)
{
for (int i = 0; i < 1000; i++)
{
if (token.iscancellationrequested)
{
// we need to dispose the timer if cancellation
// was requested by the external token.
timer.dispose();
// throw the exception.
token.throwifcancellationrequested();
}
// simulating work.
thread.spinwait(7500000);
console.write("working... ");
}
}
public void cancelaftertimeout(object? state)
{
console.writeline("\r\ntimer fired.");
internaltokensource.cancel();
timer.dispose();
}
}注意,当您完成对链接的令牌源的处理后,必须对它调用dispose。
当linked token抛出一个操作消连时,传递给异常的token就是linked token,而不是前任token。为了确定token的哪个被取消,请直接检查前任token的状态。
在本例中,aggregateexception不应该被抛出,但这里会捕获它,因为在实际场景中,除了从任务委托抛出的operationcanceledexception之外,任何其他异常都被包装在aggregateexception中。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论