Dot-Net

Parallel.ForEach - 優雅取消

  • January 13, 2011

關於等待任務完成和執行緒同步的主題。

我目前有一個包含在 Parallel.ForEach 中的迭代。在下面的範例中,我在評論中提出了一些關於如何最好地處理循環優雅終止的問題(.NET 4.0);

private void myFunction()
   {

       IList<string> iListOfItems = new List<string>();
       // populate iListOfItems

       CancellationTokenSource cts = new CancellationTokenSource();

       ParallelOptions po = new ParallelOptions();
       po.MaxDegreeOfParallelism = 20; // max threads
       po.CancellationToken = cts.Token;

       try
       {
           var myWcfProxy = new myWcfClientSoapClient();

           if (Parallel.ForEach(iListOfItems, po, (item, loopsate) =>
           {
               try
               {
                   if (_requestedToStop)
                       loopsate.Stop();
                   // long running blocking WS call, check before and after
                   var response = myWcfProxy.ProcessIntervalConfiguration(item);
                   if (_requestedToStop)
                       loopsate.Stop();

                   // perform some local processing of the response object
               }
               catch (Exception ex)
               {
                   // cannot continue game over.
                   if (myWcfProxy.State == CommunicationState.Faulted)
                   {
                       loopsate.Stop();
                       throw;
                   }
               }

               // else carry on..
               // raise some events and other actions that could all risk an unhanded error.

           }
           ).IsCompleted)
           {
               RaiseAllItemsCompleteEvent();
           }
       }
       catch (Exception ex)
       {
           // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the
           // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception?
           // Do I need to call cts.Cancel here?

           // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that?

           // do i need to call cts.Dispose() ?

           MessageBox.Show(Logging.FormatException(ex));
       }
       finally
       {

           if (myWcfProxy != null)
           {
           // possible race condition with the for-each threads here unless we wait for them to terminate.
               if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted)
                   myWcfProxy.Abort();

               myWcfProxy.Close();
           }

           // possible race condition with the for-each threads here unless we wait for them to terminate.
           _requestedToStop = false;

       }

   }

非常感激任何的幫助。MSDN 文件談到了 ManualResetEventSlim 和 cancelToken.WaitHandle。但不確定如何將它們連接起來,似乎很難理解 MSDN 範例,因為大多數範例都不適用。

我在下面模擬了一些程式碼,可以回答您的問題。基本點是您可以使用 Parallel.ForEach 獲得 fork/join 並行性,因此您無需擔心並行任務之外的競爭條件(呼叫執行緒阻塞,直到任務成功或其他方式完成)。您只想確保使用 LoopState 變數(lambda 的第二個參數)來控制循環狀態。

如果循環的任何迭代拋出未處理的異常,則整個循環將引發最後擷取的 AggregateException。

提及此主題的其他連結:

Parallel.ForEach 在處理非常大的數據集時拋出異常

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Parallel.ForEach 是否限制活動執行緒的數量?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.ServiceModel;

namespace Temp
{
   public class Class1
   {
       private class MockWcfProxy
       {
           internal object ProcessIntervalConfiguration(string item)
           {
               return new Object();
           }

           public CommunicationState State { get; set; }
       }

       private void myFunction()
       {

           IList<string> iListOfItems = new List<string>();
           // populate iListOfItems

           CancellationTokenSource cts = new CancellationTokenSource();

           ParallelOptions po = new ParallelOptions();
           po.MaxDegreeOfParallelism = 20; // max threads
           po.CancellationToken = cts.Token;

           try
           {
               var myWcfProxy = new MockWcfProxy();

               if (Parallel.ForEach(iListOfItems, po, (item, loopState) =>
                   {
                       try
                       {
                           if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                               loopState.Stop();

                           // long running blocking WS call, check before and after
                           var response = myWcfProxy.ProcessIntervalConfiguration(item);

                           if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
                               loopState.Stop();

                           // perform some local processing of the response object
                       }
                       catch (Exception ex)
                       {
                           // cannot continue game over.
                           if (myWcfProxy.State == CommunicationState.Faulted)
                           {
                               loopState.Stop();
                               throw;
                           }

                           // FYI you are swallowing all other exceptions here...
                       }

                       // else carry on..
                       // raise some events and other actions that could all risk an unhanded error.
                   }
               ).IsCompleted)
               {
                   RaiseAllItemsCompleteEvent();
               }
           }
           catch (AggregateException aggEx)
           {
               // This section will be entered if any of the loops threw an unhandled exception.  
               // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
               // to see those (if you want).
           }
           // Execution will not get to this point until all of the iterations have completed (or one 
           // has failed, and all that were running when that failure occurred complete).
       }

       private void RaiseAllItemsCompleteEvent()
       {
           // Everything completed...
       }
   }
}

引用自:https://stackoverflow.com/questions/4671865