概述
从.net framework 4开始,.net使用统一的模型来协作取消异步或长时间运行的同步线程。该模型基于一个称为cancellationtoken的轻量级对象。这个对象在调用一个或多个取消线程时(例如通过创建新线程或任务),是通过将token传递给每个线程来完成的(通过链式的方式依次传递)。单个线程能够依次地将token的副本传递给其他线程。
之后,在适当的某个时机,创建token的对象就可以使用token来请求线程停止。只有请求对象可以发出取消请求,每个监听器负责监听到请求并以适当和及时的方式响应取消请求。
实现协作取消模型的一般模式是:
- 1、实例化一个cancellationtokensource对象,该对象管理cancellation并将cancellation通知发送给单独的cancellation token。
- 2、cancellationtokensource对象的token属性,可以返回一个token对象,我们可以将该token对象发送给每个监听该cancellation的进程或task。
- 3、为每个任务或线程提供响应取消的机制。
- 4、调用 cancellationtokensource.cancel() 方法,来取消线程或者task。
【tips】我们在使用cancellation的token取消线程后,应该确保调用cancellationtokensource.dispose()方法,以便于释放它持有的任何非托管资源。。
下图展示出了cancellationtokensource对象里的token属性对象,是如何传递到其他的线程里的。
合作取消模型使创建取消感知的应用程序和库变得更容易,它支持以下功能:
- 1、取消是合式的,不会强加给监听器。监听器确定如何优雅地终止以响应取消请求。
- 2、请求不同于监听。调用可取消的线程的对象,可以控制何时(如果有的话)取消被请求。
- 3、请求的对象,可以通过仅使用一个方法,即可发送取消请求到所有的token副本中。
- 4、监听器可以通过将多个token连接成一个linked token,来同时监听多个token。
- 5、用户代码可以注意到并响应library code的取消请求,而library code可以注意到并响应用户代码的取消请求。
- 6、可以通过轮询、回调注册或等待等待句柄的方式,来通知监听器执行取消请求。
与取消线程相关的类型
取消框架是作为一组相关类型实现的,这些类型在下表中列出。
cancellationtokensource | 该对象创建cancellation token,并向 cancellation token的所有副本分发取消请求。 |
cancellationtoken | 传递给一个或多个监听器的轻量级的值类型,通常作为方法参数。侦听器通过轮询、回调或等待句柄监视token的iscancellationrequested属性的值。 |
operationcanceledexception | 此异常构造函数的重载,接受cancellationtoken作为参数。侦听器可以选择性地抛出此异常以验证取消的来源,并通知其他已响应取消请求监听器。 |
取消模型以几种类型集成到.net中。
最重要的是system.threading.tasks.parallel,system.threading.tasks.task、system.threading.tasks.task<tresult> 和 system.linq.parallelenumerable。
建议使用所有新的库和应用代码来实现合作市取消模式。
代码举例
在下面的示例中,请求对象创建一个cancellationtokensource对象,然后将该对象的token属性传递给可取消的进程。
接收请求的线程通过轮询来监视token的iscancellationrequested属性的值。
当该值变为true时,侦听器可以以任何合适的方式终止。在本例中,方法只是退出,这是许多情况下所需要的全部内容。
using system; using system.threading; public class example { public static void main() { // create the token source. cancellationtokensource cts = new cancellationtokensource(); // pass the token to the cancelable operation. threadpool.queueuserworkitem(new waitcallback(dosomework), cts.token); thread.sleep(2500); // request cancellation. cts.cancel(); console.writeline("cancellation set in token source..."); thread.sleep(2500); // cancellation should have happened, so call dispose. cts.dispose(); } // thread 2: the listener static void dosomework(object? obj) { if (obj is null) return; cancellationtoken token = (cancellationtoken)obj; for (int i = 0; i < 100000; i++) { if (token.iscancellationrequested) { console.writeline("in iteration {0}, cancellation has been requested...", i + 1); // perform cleanup if necessary. //... // terminate the operation. break; } // simulate some work. thread.spinwait(500000); } } } // the example displays output like the following: // cancellation set in token source... // in iteration 1430, cancellation has been requested...
操作取消vs对象取消
在协作取消框架中,取消指的是操作(线程中执行的操作),而不是对象。取消请求意味着在执行任何所需的清理后,操作应尽快停止。一个cancellation token应该指向一个“可取消的操作”,无论该操作如何在您的程序中实现。
在token的iscancellationrequested属性被设置为true之后,它不能被重置为false。因此,取消令牌在被取消后不能被重用。
如果您需要对象取消机制,您可以通过调用cancellationtoken来基于操作取消机制。注册方法,如下例所示。
using system; using system.threading; class cancelableobject { public string id; public cancelableobject(string id) { this.id = id; } public void cancel() { console.writeline("object {0} cancel callback", id); // perform object cancellation here. } } public class example1 { public static void main() { cancellationtokensource cts = new cancellationtokensource(); cancellationtoken token = cts.token; // user defined class with its own method for cancellation var obj1 = new cancelableobject("1"); var obj2 = new cancelableobject("2"); var obj3 = new cancelableobject("3"); // register the object's cancel method with the token's // cancellation request. token.register(() => obj1.cancel()); token.register(() => obj2.cancel()); token.register(() => obj3.cancel()); // request cancellation on the token. cts.cancel(); // call dispose when we're done with the cancellationtokensource. cts.dispose(); } } // the example displays the following output: // object 3 cancel callback // object 2 cancel callback // object 1 cancel callback
如果一个对象支持多个并发的可取消操作,则可以给每个不同的可取消操作各自传入一个不同的token。这样,一个操作可以被取消而不会影响到其他操作。
监听并响应取消请求
在用户委托中,可取消操作的实现者决定如何终止该操作以响应取消请求。在许多情况下,用户委托可以只执行任何所需的清理,然后立即返回。
但是,在更复杂的情况下,可能需要用户委托通知库代码已发生cancellation。在这种情况下,终止操作的正确方法是委托调用throwifcancellationrequested方法,这将导致抛出operationcanceledexception异常。库代码可以在用户委托线程上捕获此异常,并检查异常的token,以确定该异常是否表示协作取消或其他异常情况。
在这种情况下,终止操作的正确方法是委托调用throwifcancellationrequested方法,这将导致抛出operationcanceledexception。库代码可以在用户委托线程上捕获此异常,并检查异常的token,以确定该异常是否表示协作取消或其他异常情况。
轮询监听
对于循环或递归的长时间运行的计算,可以通过定期轮询cancellationtoken.iscancellationrequested的值来监听取消请求。如果它的值为true,则该方法应该尽快清理并终止。轮询的最佳频率取决于应用程序的类型。开发人员可以为任何给定的程序确定最佳轮询频率。轮询本身不会显著影响性能。
下面的程序案例展示了一种可能的轮询方式。
static void nestedloops(rectangle rect, cancellationtoken token) { for (int col = 0; col < rect.columns && !token.iscancellationrequested; col++) { // assume that we know that the inner loop is very fast. // therefore, polling once per column in the outer loop condition // is sufficient. for (int row = 0; row < rect.rows; row++) { // simulating work. thread.spinwait(5_000); console.write("{0},{1} ", col, row); } } if (token.iscancellationrequested) { // cleanup or undo here if necessary... console.writeline("\r\noperation canceled"); console.writeline("press any key to exit."); // if using task: // token.throwifcancellationrequested(); } }
下面的程序代码是一个详细的实现:
using system; using system.threading; public class serverclass { public static void staticmethod(object obj) { cancellationtoken ct = (cancellationtoken)obj; console.writeline("serverclass.staticmethod is running on another thread."); // simulate work that can be canceled. while (!ct.iscancellationrequested) { thread.spinwait(50000); } console.writeline("the worker thread has been canceled. press any key to exit."); console.readkey(true); } } public class simple { public static void main() { // the simple class controls access to the token source. cancellationtokensource cts = new cancellationtokensource(); console.writeline("press 'c' to terminate the application...\n"); // allow the ui thread to capture the token source, so that it // can issue the cancel command. thread t1 = new thread(() => { if (console.readkey(true).keychar.tostring().toupperinvariant() == "c") cts.cancel(); } ); // serverclass sees only the token, not the token source. thread t2 = new thread(new parameterizedthreadstart(serverclass.staticmethod)); // start the ui thread. t1.start(); // start the worker thread and pass it the token. t2.start(cts.token); t2.join(); cts.dispose(); } } // the example displays the following output: // press 'c' to terminate the application... // // serverclass.staticmethod is running on another thread. // the worker thread has been canceled. press any key to exit.
通过回调注册进行监听
以这种方式进行的某些操作可能会阻塞,从而无法及时检查cancellation token的值。对于这些情况,您可以注册一个回调方法,以便在收到取消请求时解除对该方法的阻塞。
register方法返回一个专门用于此目的的cancellationtokenregistration对象。下面的示例展示了如何使用register方法来取消异步web请求。
using system; using system.net; using system.threading; class example4 { static void main() { cancellationtokensource cts = new cancellationtokensource(); startwebrequest(cts.token); // cancellation will cause the web // request to be cancelled cts.cancel(); } static void startwebrequest(cancellationtoken token) { webclient wc = new webclient(); wc.downloadstringcompleted += (s, e) => console.writeline("request completed."); // cancellation on the token will // call cancelasync on the webclient. token.register(() => { wc.cancelasync(); console.writeline("request cancelled!"); }); console.writeline("starting request."); wc.downloadstringasync(new uri("http://www.contoso.com")); } }
cancellationtokenregistration对象管理线程同步,并确保回调将在精确的时间点停止执行。
为了确保系统响应性并避免死锁,在注册回调时必须遵循以下准则:
1、回调方法应该是快速的,因为它是同步调用的,因此对cancel的调用在回调返回之前不会返回。
2、如果在回调运行时调用dispose,并且持有回调等待的锁,则程序可能会死锁。dispose返回后,您可以释放回调所需的任何资源。
3、callbacks 不应该执行任何手动线程或在回调中使用synchronizationcontext。如果回调必须在特定线程上运行,则使用system.threading.cancellationtokenregistration构造函数,该构造函数使您能够指定目标synccontext是活动的synchronizationcontext.current。在回调中执行手动线程会导致死锁。
使用waithandle进行侦听
当一个可取消的操作在等待一个同步原语(如system.threading. manualresetevent或system.threading. semaphore)时可能会阻塞。
你可以使用cancellationtoken.waithandle属性,以使操作同时等待事件和取消请求。
cancellationtoken的 等待句柄 将在响应取消请求时发出信号,该方法可以使用waitany()方法的返回值来确定发出信号的是否是cancellation token。然后操作可以直接退出,或者抛出operationcanceledexception异常。
// wait on the event if it is not signaled. int eventthatsignaledindex = waithandle.waitany(new waithandle[] { mre, token.waithandle }, new timespan(0, 0, 20));
system.threading.manualreseteventslim和system.threading.semaphoreslim都在它们的wait()方法中支持取消框架。
您可以将cancellationtoken传递给该方法,当请求取消时,事件将被唤醒并抛出operationcanceledexception。
try { // mres is a manualreseteventslim mres.wait(token); } catch (operationcanceledexception) { // throw immediately to be responsive. the // alternative is to do one more item of work, // and throw on next iteration, because // iscancellationrequested will be true. console.writeline("the wait operation was canceled."); throw; } console.write("working..."); // simulating work. thread.spinwait(500000);
下面的示例使用manualresetevent来演示如何解除阻塞不支持统一取消的等待句柄。
using system; using system.threading; using system.threading.tasks; class canceloldstyleevents { // old-style mre that doesn't support unified cancellation. static manualresetevent mre = new manualresetevent(false); static void main() { var cts = new cancellationtokensource(); // pass the same token source to the delegate and to the task instance. task.run(() => dowork(cts.token), cts.token); console.writeline("press s to start/restart, p to pause, or c to cancel."); console.writeline("or any other key to exit."); // old-style ui thread. bool goagain = true; while (goagain) { char ch = console.readkey(true).keychar; switch (ch) { case 'c': cts.cancel(); break; case 'p': mre.reset(); break; case 's': mre.set(); break; default: goagain = false; break; } thread.sleep(100); } cts.dispose(); } static void dowork(cancellationtoken token) { while (true) { // wait on the event if it is not signaled. int eventthatsignaledindex = waithandle.waitany(new waithandle[] { mre, token.waithandle }, new timespan(0, 0, 20)); // were we canceled while waiting? if (eventthatsignaledindex == 1) { console.writeline("the wait operation was canceled."); throw new operationcanceledexception(token); } // were we canceled while running? else if (token.iscancellationrequested) { console.writeline("i was canceled while running."); token.throwifcancellationrequested(); } // did we time out? else if (eventthatsignaledindex == waithandle.waittimeout) { console.writeline("i timed out."); break; } else { console.write("working... "); // simulating work. thread.spinwait(5000000); } } } }
下面的示例使用manualreseteventslim来演示如何解除支持统一取消的协调原语的阻塞。同样的方法也可以用于其他轻量级协调原语,如semaphoreslim和countdownevent。
using system; using system.threading; using system.threading.tasks; class cancelnewstyleevents { // new-style mreslim that supports unified cancellation // in its wait methods. static manualreseteventslim mres = new manualreseteventslim(false); static void main() { var cts = new cancellationtokensource(); // pass the same token source to the delegate and to the task instance. task.run(() => dowork(cts.token), cts.token); console.writeline("press c to cancel, p to pause, or s to start/restart,"); console.writeline("or any other key to exit."); // new-style ui thread. bool goagain = true; while (goagain) { char ch = console.readkey(true).keychar; switch (ch) { case 'c': // token can only be canceled once. cts.cancel(); break; case 'p': mres.reset(); break; case 's': mres.set(); break; default: goagain = false; break; } thread.sleep(100); } cts.dispose(); } static void dowork(cancellationtoken token) { while (true) { if (token.iscancellationrequested) { console.writeline("canceled while running."); token.throwifcancellationrequested(); } // wait on the event to be signaled // or the token to be canceled, // whichever comes first. the token // will throw an exception if it is canceled // while the thread is waiting on the event. try { // mres is a manualreseteventslim mres.wait(token); } catch (operationcanceledexception) { // throw immediately to be responsive. the // alternative is to do one more item of work, // and throw on next iteration, because // iscancellationrequested will be true. console.writeline("the wait operation was canceled."); throw; } console.write("working..."); // simulating work. thread.spinwait(500000); } } }
同时监听多个令牌
在某些情况下,侦听器必须同时侦听多个cancellation token。
例如,一个可取消操作除了监控通过方法形参传入的外部token之外,还可能必须监视内部的cancellation token。为此,创建一个linked token源,它可以将两个或多个token连接到一个token中,如下面的示例所示。
using system; using system.threading; using system.threading.tasks; class linkedtokensourcedemo { static void main() { workerwithtimer worker = new workerwithtimer(); cancellationtokensource cts = new cancellationtokensource(); // task for ui thread, so we can call task.wait wait on the main thread. task.run(() => { console.writeline("press 'c' to cancel within 3 seconds after work begins."); console.writeline("or let the task time out by doing nothing."); if (console.readkey(true).keychar == 'c') cts.cancel(); }); // let the user read the ui message. thread.sleep(1000); // start the worker task. task task = task.run(() => worker.dowork(cts.token), cts.token); try { task.wait(cts.token); } catch (operationcanceledexception e) { if (e.cancellationtoken == cts.token) console.writeline("canceled from ui thread throwing oce."); } catch (aggregateexception ae) { console.writeline("aggregateexception caught: " + ae.innerexception); foreach (var inner in ae.innerexceptions) { console.writeline(inner.message + inner.source); } } console.writeline("press any key to exit."); console.readkey(); cts.dispose(); } } class workerwithtimer { cancellationtokensource internaltokensource = new cancellationtokensource(); cancellationtoken internaltoken; cancellationtoken externaltoken; timer timer; public workerwithtimer() { // a toy cancellation trigger that times out after 3 seconds // if the user does not press 'c'. timer = new timer(new timercallback(cancelaftertimeout), null, 3000, 3000); } public void dowork(cancellationtoken externaltoken) { // create a new token that combines the internal and external tokens. this.internaltoken = internaltokensource.token; this.externaltoken = externaltoken; using (cancellationtokensource linkedcts = cancellationtokensource.createlinkedtokensource(internaltoken, externaltoken)) { try { doworkinternal(linkedcts.token); } catch (operationcanceledexception) { if (internaltoken.iscancellationrequested) { console.writeline("operation timed out."); } else if (externaltoken.iscancellationrequested) { console.writeline("cancelling per user request."); externaltoken.throwifcancellationrequested(); } } } } private void doworkinternal(cancellationtoken token) { for (int i = 0; i < 1000; i++) { if (token.iscancellationrequested) { // we need to dispose the timer if cancellation // was requested by the external token. timer.dispose(); // throw the exception. token.throwifcancellationrequested(); } // simulating work. thread.spinwait(7500000); console.write("working... "); } } public void cancelaftertimeout(object? state) { console.writeline("\r\ntimer fired."); internaltokensource.cancel(); timer.dispose(); } }
注意,当您完成对链接的令牌源的处理后,必须对它调用dispose。
当linked token抛出一个操作消连时,传递给异常的token就是linked token,而不是前任token。为了确定token的哪个被取消,请直接检查前任token的状态。
在本例中,aggregateexception不应该被抛出,但这里会捕获它,因为在实际场景中,除了从任务委托抛出的operationcanceledexception之外,任何其他异常都被包装在aggregateexception中。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论