当前位置: 代码网 > it编程>编程语言>C# > C#如何优雅地取消进程的执行之Cancellation详解

C#如何优雅地取消进程的执行之Cancellation详解

2025年02月14日 C# 我要评论
概述从.net framework 4开始,.net使用统一的模型来协作取消异步或长时间运行的同步线程。该模型基于一个称为cancellationtoken的轻量级对象。这个对象在调用一个或多个取消线

概述

从.net framework 4开始,.net使用统一的模型来协作取消异步或长时间运行的同步线程。该模型基于一个称为cancellationtoken的轻量级对象。这个对象在调用一个或多个取消线程时(例如通过创建新线程或任务),是通过将token传递给每个线程来完成的(通过链式的方式依次传递)。单个线程能够依次地将token的副本传递给其他线程。

之后,在适当的某个时机,创建token的对象就可以使用token来请求线程停止。只有请求对象可以发出取消请求,每个监听器负责监听到请求并以适当和及时的方式响应取消请求。

实现协作取消模型的一般模式是:

  • 1、实例化一个cancellationtokensource对象,该对象管理cancellation并将cancellation通知发送给单独的cancellation token。
  • 2、cancellationtokensource对象的token属性,可以返回一个token对象,我们可以将该token对象发送给每个监听该cancellation的进程或task。
  • 3、为每个任务或线程提供响应取消的机制。
  • 4、调用 cancellationtokensource.cancel() 方法,来取消线程或者task。

【tips】我们在使用cancellation的token取消线程后,应该确保调用cancellationtokensource.dispose()方法,以便于释放它持有的任何非托管资源。。

下图展示出了cancellationtokensource对象里的token属性对象,是如何传递到其他的线程里的。

合作取消模型使创建取消感知的应用程序和库变得更容易,它支持以下功能:

  • 1、取消是合式的,不会强加给监听器。监听器确定如何优雅地终止以响应取消请求。
  • 2、请求不同于监听。调用可取消的线程的对象,可以控制何时(如果有的话)取消被请求。
  • 3、请求的对象,可以通过仅使用一个方法,即可发送取消请求到所有的token副本中。
  • 4、监听器可以通过将多个token连接成一个linked token,来同时监听多个token。
  • 5、用户代码可以注意到并响应library code的取消请求,而library code可以注意到并响应用户代码的取消请求。
  • 6、可以通过轮询、回调注册或等待等待句柄的方式,来通知监听器执行取消请求。

与取消线程相关的类型

取消框架是作为一组相关类型实现的,这些类型在下表中列出。

cancellationtokensource该对象创建cancellation token,并向 cancellation token的所有副本分发取消请求。
cancellationtoken传递给一个或多个监听器的轻量级的值类型,通常作为方法参数。侦听器通过轮询、回调或等待句柄监视token的iscancellationrequested属性的值。
operationcanceledexception此异常构造函数的重载,接受cancellationtoken作为参数。侦听器可以选择性地抛出此异常以验证取消的来源,并通知其他已响应取消请求监听器。

取消模型以几种类型集成到.net中。

最重要的是system.threading.tasks.parallel,system.threading.tasks.task、system.threading.tasks.task<tresult> 和 system.linq.parallelenumerable。

建议使用所有新的库和应用代码来实现合作市取消模式。

代码举例

在下面的示例中,请求对象创建一个cancellationtokensource对象,然后将该对象的token属性传递给可取消的进程。

接收请求的线程通过轮询来监视token的iscancellationrequested属性的值。

当该值变为true时,侦听器可以以任何合适的方式终止。在本例中,方法只是退出,这是许多情况下所需要的全部内容。

using system;
using system.threading;

public class example
{
    public static void main()
    {
        // create the token source.
        cancellationtokensource cts = new cancellationtokensource();

        // pass the token to the cancelable operation.
        threadpool.queueuserworkitem(new waitcallback(dosomework), cts.token);
        thread.sleep(2500);

        // request cancellation.
        cts.cancel();
        console.writeline("cancellation set in token source...");
        thread.sleep(2500);
        // cancellation should have happened, so call dispose.
        cts.dispose();
    }

    // thread 2: the listener
    static void dosomework(object? obj)
    {
        if (obj is null)
            return;

        cancellationtoken token = (cancellationtoken)obj;

        for (int i = 0; i < 100000; i++)
        {
            if (token.iscancellationrequested)
            {
                console.writeline("in iteration {0}, cancellation has been requested...",
                                  i + 1);
                // perform cleanup if necessary.
                //...
                // terminate the operation.
                break;
            }
            // simulate some work.
            thread.spinwait(500000);
        }
    }
}
// the example displays output like the following:
//       cancellation set in token source...
//       in iteration 1430, cancellation has been requested...

操作取消vs对象取消

在协作取消框架中,取消指的是操作(线程中执行的操作),而不是对象。取消请求意味着在执行任何所需的清理后,操作应尽快停止。一个cancellation token应该指向一个“可取消的操作”,无论该操作如何在您的程序中实现。

在token的iscancellationrequested属性被设置为true之后,它不能被重置为false。因此,取消令牌在被取消后不能被重用。

如果您需要对象取消机制,您可以通过调用cancellationtoken来基于操作取消机制。注册方法,如下例所示。

using system;
using system.threading;

class cancelableobject
{
    public string id;

    public cancelableobject(string id)
    {
        this.id = id;
    }

    public void cancel()
    {
        console.writeline("object {0} cancel callback", id);
        // perform object cancellation here.
    }
}

public class example1
{
    public static void main()
    {
        cancellationtokensource cts = new cancellationtokensource();
        cancellationtoken token = cts.token;

        // user defined class with its own method for cancellation
        var obj1 = new cancelableobject("1");
        var obj2 = new cancelableobject("2");
        var obj3 = new cancelableobject("3");

        // register the object's cancel method with the token's
        // cancellation request.
        token.register(() => obj1.cancel());
        token.register(() => obj2.cancel());
        token.register(() => obj3.cancel());

        // request cancellation on the token.
        cts.cancel();
        // call dispose when we're done with the cancellationtokensource.
        cts.dispose();
    }
}
// the example displays the following output:
//       object 3 cancel callback
//       object 2 cancel callback
//       object 1 cancel callback

如果一个对象支持多个并发的可取消操作,则可以给每个不同的可取消操作各自传入一个不同的token。这样,一个操作可以被取消而不会影响到其他操作。

监听并响应取消请求

在用户委托中,可取消操作的实现者决定如何终止该操作以响应取消请求。在许多情况下,用户委托可以只执行任何所需的清理,然后立即返回。

但是,在更复杂的情况下,可能需要用户委托通知库代码已发生cancellation。在这种情况下,终止操作的正确方法是委托调用throwifcancellationrequested方法,这将导致抛出operationcanceledexception异常。库代码可以在用户委托线程上捕获此异常,并检查异常的token,以确定该异常是否表示协作取消或其他异常情况。

在这种情况下,终止操作的正确方法是委托调用throwifcancellationrequested方法,这将导致抛出operationcanceledexception。库代码可以在用户委托线程上捕获此异常,并检查异常的token,以确定该异常是否表示协作取消或其他异常情况。

轮询监听

对于循环或递归的长时间运行的计算,可以通过定期轮询cancellationtoken.iscancellationrequested的值来监听取消请求。如果它的值为true,则该方法应该尽快清理并终止。轮询的最佳频率取决于应用程序的类型。开发人员可以为任何给定的程序确定最佳轮询频率。轮询本身不会显著影响性能。

下面的程序案例展示了一种可能的轮询方式。

static void nestedloops(rectangle rect, cancellationtoken token)
{
   for (int col = 0; col < rect.columns && !token.iscancellationrequested; col++) {
      // assume that we know that the inner loop is very fast.
      // therefore, polling once per column in the outer loop condition
      // is sufficient.
      for (int row = 0; row < rect.rows; row++) {
         // simulating work.
         thread.spinwait(5_000);
         console.write("{0},{1} ", col, row);
      }
   }

   if (token.iscancellationrequested) {
      // cleanup or undo here if necessary...
      console.writeline("\r\noperation canceled");
      console.writeline("press any key to exit.");

      // if using task:
      // token.throwifcancellationrequested();
   }
}

下面的程序代码是一个详细的实现:

using system;
using system.threading;

public class serverclass
{
   public static void staticmethod(object obj)
   {
      cancellationtoken ct = (cancellationtoken)obj;
      console.writeline("serverclass.staticmethod is running on another thread.");

      // simulate work that can be canceled.
      while (!ct.iscancellationrequested) {
         thread.spinwait(50000);
      }
      console.writeline("the worker thread has been canceled. press any key to exit.");
      console.readkey(true);
   }
}

public class simple
{
   public static void main()
   {
      // the simple class controls access to the token source.
      cancellationtokensource cts = new cancellationtokensource();

      console.writeline("press 'c' to terminate the application...\n");
      // allow the ui thread to capture the token source, so that it
      // can issue the cancel command.
      thread t1 = new thread(() => { if (console.readkey(true).keychar.tostring().toupperinvariant() == "c")
                                     cts.cancel(); } );

      // serverclass sees only the token, not the token source.
      thread t2 = new thread(new parameterizedthreadstart(serverclass.staticmethod));
      // start the ui thread.

      t1.start();

      // start the worker thread and pass it the token.
      t2.start(cts.token);

      t2.join();
      cts.dispose();
   }
}
// the example displays the following output:
//       press 'c' to terminate the application...
//
//       serverclass.staticmethod is running on another thread.
//       the worker thread has been canceled. press any key to exit.

通过回调注册进行监听

以这种方式进行的某些操作可能会阻塞,从而无法及时检查cancellation token的值。对于这些情况,您可以注册一个回调方法,以便在收到取消请求时解除对该方法的阻塞。

register方法返回一个专门用于此目的的cancellationtokenregistration对象。下面的示例展示了如何使用register方法来取消异步web请求。

using system;
using system.net;
using system.threading;

class example4
{
    static void main()
    {
        cancellationtokensource cts = new cancellationtokensource();

        startwebrequest(cts.token);

        // cancellation will cause the web
        // request to be cancelled
        cts.cancel();
    }

    static void startwebrequest(cancellationtoken token)
    {
        webclient wc = new webclient();
        wc.downloadstringcompleted += (s, e) => console.writeline("request completed.");

        // cancellation on the token will
        // call cancelasync on the webclient.
        token.register(() =>
        {
            wc.cancelasync();
            console.writeline("request cancelled!");
        });

        console.writeline("starting request.");
        wc.downloadstringasync(new uri("http://www.contoso.com"));
    }
}

cancellationtokenregistration对象管理线程同步,并确保回调将在精确的时间点停止执行。

为了确保系统响应性并避免死锁,在注册回调时必须遵循以下准则:

1、回调方法应该是快速的,因为它是同步调用的,因此对cancel的调用在回调返回之前不会返回。

2、如果在回调运行时调用dispose,并且持有回调等待的锁,则程序可能会死锁。dispose返回后,您可以释放回调所需的任何资源。

3、callbacks 不应该执行任何手动线程或在回调中使用synchronizationcontext。如果回调必须在特定线程上运行,则使用system.threading.cancellationtokenregistration构造函数,该构造函数使您能够指定目标synccontext是活动的synchronizationcontext.current。在回调中执行手动线程会导致死锁。

使用waithandle进行侦听

当一个可取消的操作在等待一个同步原语(如system.threading. manualresetevent或system.threading. semaphore)时可能会阻塞。

你可以使用cancellationtoken.waithandle属性,以使操作同时等待事件和取消请求。

cancellationtoken的 等待句柄 将在响应取消请求时发出信号,该方法可以使用waitany()方法的返回值来确定发出信号的是否是cancellation token。然后操作可以直接退出,或者抛出operationcanceledexception异常。

// wait on the event if it is not signaled.
int eventthatsignaledindex =
       waithandle.waitany(new waithandle[] { mre, token.waithandle },
                          new timespan(0, 0, 20));

system.threading.manualreseteventslim和system.threading.semaphoreslim都在它们的wait()方法中支持取消框架。

您可以将cancellationtoken传递给该方法,当请求取消时,事件将被唤醒并抛出operationcanceledexception。

try
{
    // mres is a manualreseteventslim
    mres.wait(token);
}
catch (operationcanceledexception)
{
    // throw immediately to be responsive. the
    // alternative is to do one more item of work,
    // and throw on next iteration, because
    // iscancellationrequested will be true.
    console.writeline("the wait operation was canceled.");
    throw;
}

console.write("working...");
// simulating work.
thread.spinwait(500000);

下面的示例使用manualresetevent来演示如何解除阻塞不支持统一取消的等待句柄。

using system;
using system.threading;
using system.threading.tasks;

class canceloldstyleevents
{
    // old-style mre that doesn't support unified cancellation.
    static manualresetevent mre = new manualresetevent(false);

    static void main()
    {
        var cts = new cancellationtokensource();

        // pass the same token source to the delegate and to the task instance.
        task.run(() => dowork(cts.token), cts.token);
        console.writeline("press s to start/restart, p to pause, or c to cancel.");
        console.writeline("or any other key to exit.");

        // old-style ui thread.
        bool goagain = true;
        while (goagain)
        {
            char ch = console.readkey(true).keychar;

            switch (ch)
            {
                case 'c':
                    cts.cancel();
                    break;
                case 'p':
                    mre.reset();
                    break;
                case 's':
                    mre.set();
                    break;
                default:
                    goagain = false;
                    break;
            }

            thread.sleep(100);
        }
        cts.dispose();
    }

    static void dowork(cancellationtoken token)
    {
        while (true)
        {
            // wait on the event if it is not signaled.
            int eventthatsignaledindex =
                   waithandle.waitany(new waithandle[] { mre, token.waithandle },
                                      new timespan(0, 0, 20));

            // were we canceled while waiting?
            if (eventthatsignaledindex == 1)
            {
                console.writeline("the wait operation was canceled.");
                throw new operationcanceledexception(token);
            }
            // were we canceled while running?
            else if (token.iscancellationrequested)
            {
                console.writeline("i was canceled while running.");
                token.throwifcancellationrequested();
            }
            // did we time out?
            else if (eventthatsignaledindex == waithandle.waittimeout)
            {
                console.writeline("i timed out.");
                break;
            }
            else
            {
                console.write("working... ");
                // simulating work.
                thread.spinwait(5000000);
            }
        }
    }
}

下面的示例使用manualreseteventslim来演示如何解除支持统一取消的协调原语的阻塞。同样的方法也可以用于其他轻量级协调原语,如semaphoreslim和countdownevent。

using system;
using system.threading;
using system.threading.tasks;

class cancelnewstyleevents
{
   // new-style mreslim that supports unified cancellation
   // in its wait methods.
   static manualreseteventslim mres = new manualreseteventslim(false);

   static void main()
   {
      var cts = new cancellationtokensource();

      // pass the same token source to the delegate and to the task instance.
      task.run(() => dowork(cts.token), cts.token);
      console.writeline("press c to cancel, p to pause, or s to start/restart,");
      console.writeline("or any other key to exit.");

      // new-style ui thread.
         bool goagain = true;
         while (goagain)
         {
             char ch = console.readkey(true).keychar;

             switch (ch)
             {
                 case 'c':
                     // token can only be canceled once.
                     cts.cancel();
                     break;
                 case 'p':
                     mres.reset();
                     break;
                 case 's':
                     mres.set();
                     break;
                 default:
                     goagain = false;
                     break;
             }

             thread.sleep(100);
         }
         cts.dispose();
     }

     static void dowork(cancellationtoken token)
     {

         while (true)
         {
             if (token.iscancellationrequested)
             {
                 console.writeline("canceled while running.");
                 token.throwifcancellationrequested();
             }

             // wait on the event to be signaled
             // or the token to be canceled,
             // whichever comes first. the token
             // will throw an exception if it is canceled
             // while the thread is waiting on the event.
             try
             {
                 // mres is a manualreseteventslim
                 mres.wait(token);
             }
             catch (operationcanceledexception)
             {
                 // throw immediately to be responsive. the
                 // alternative is to do one more item of work,
                 // and throw on next iteration, because
                 // iscancellationrequested will be true.
                 console.writeline("the wait operation was canceled.");
                 throw;
             }

             console.write("working...");
             // simulating work.
             thread.spinwait(500000);
         }
     }
 }

同时监听多个令牌

在某些情况下,侦听器必须同时侦听多个cancellation token。

例如,一个可取消操作除了监控通过方法形参传入的外部token之外,还可能必须监视内部的cancellation token。为此,创建一个linked token源,它可以将两个或多个token连接到一个token中,如下面的示例所示。

using system;
using system.threading;
using system.threading.tasks;

class linkedtokensourcedemo
{
    static void main()
    {
        workerwithtimer worker = new workerwithtimer();
        cancellationtokensource cts = new cancellationtokensource();

        // task for ui thread, so we can call task.wait wait on the main thread.
        task.run(() =>
        {
            console.writeline("press 'c' to cancel within 3 seconds after work begins.");
            console.writeline("or let the task time out by doing nothing.");
            if (console.readkey(true).keychar == 'c')
                cts.cancel();
        });

        // let the user read the ui message.
        thread.sleep(1000);

        // start the worker task.
        task task = task.run(() => worker.dowork(cts.token), cts.token);

        try
        {
            task.wait(cts.token);
        }
        catch (operationcanceledexception e)
        {
            if (e.cancellationtoken == cts.token)
                console.writeline("canceled from ui thread throwing oce.");
        }
        catch (aggregateexception ae)
        {
            console.writeline("aggregateexception caught: " + ae.innerexception);
            foreach (var inner in ae.innerexceptions)
            {
                console.writeline(inner.message + inner.source);
            }
        }

        console.writeline("press any key to exit.");
        console.readkey();
        cts.dispose();
    }
}

class workerwithtimer
{
    cancellationtokensource internaltokensource = new cancellationtokensource();
    cancellationtoken internaltoken;
    cancellationtoken externaltoken;
    timer timer;

    public workerwithtimer()
    {
        // a toy cancellation trigger that times out after 3 seconds
        // if the user does not press 'c'.
        timer = new timer(new timercallback(cancelaftertimeout), null, 3000, 3000);
    }

    public void dowork(cancellationtoken externaltoken)
    {
        // create a new token that combines the internal and external tokens.
        this.internaltoken = internaltokensource.token;
        this.externaltoken = externaltoken;

        using (cancellationtokensource linkedcts =
                cancellationtokensource.createlinkedtokensource(internaltoken, externaltoken))
        {
            try
            {
                doworkinternal(linkedcts.token);
            }
            catch (operationcanceledexception)
            {
                if (internaltoken.iscancellationrequested)
                {
                    console.writeline("operation timed out.");
                }
                else if (externaltoken.iscancellationrequested)
                {
                    console.writeline("cancelling per user request.");
                    externaltoken.throwifcancellationrequested();
                }
            }
        }
    }

    private void doworkinternal(cancellationtoken token)
    {
        for (int i = 0; i < 1000; i++)
        {
            if (token.iscancellationrequested)
            {
                // we need to dispose the timer if cancellation
                // was requested by the external token.
                timer.dispose();

                // throw the exception.
                token.throwifcancellationrequested();
            }

            // simulating work.
            thread.spinwait(7500000);
            console.write("working... ");
        }
    }

    public void cancelaftertimeout(object? state)
    {
        console.writeline("\r\ntimer fired.");
        internaltokensource.cancel();
        timer.dispose();
    }
}

注意,当您完成对链接的令牌源的处理后,必须对它调用dispose。

当linked token抛出一个操作消连时,传递给异常的token就是linked token,而不是前任token。为了确定token的哪个被取消,请直接检查前任token的状态。

在本例中,aggregateexception不应该被抛出,但这里会捕获它,因为在实际场景中,除了从任务委托抛出的operationcanceledexception之外,任何其他异常都被包装在aggregateexception中。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com