Dot-Net
Parallel.ForEach - 優雅取消
關於等待任務完成和執行緒同步的主題。
我目前有一個包含在 Parallel.ForEach 中的迭代。在下面的範例中,我在評論中提出了一些關於如何最好地處理循環優雅終止的問題(.NET 4.0);
private void myFunction() { IList<string> iListOfItems = new List<string>(); // populate iListOfItems CancellationTokenSource cts = new CancellationTokenSource(); ParallelOptions po = new ParallelOptions(); po.MaxDegreeOfParallelism = 20; // max threads po.CancellationToken = cts.Token; try { var myWcfProxy = new myWcfClientSoapClient(); if (Parallel.ForEach(iListOfItems, po, (item, loopsate) => { try { if (_requestedToStop) loopsate.Stop(); // long running blocking WS call, check before and after var response = myWcfProxy.ProcessIntervalConfiguration(item); if (_requestedToStop) loopsate.Stop(); // perform some local processing of the response object } catch (Exception ex) { // cannot continue game over. if (myWcfProxy.State == CommunicationState.Faulted) { loopsate.Stop(); throw; } } // else carry on.. // raise some events and other actions that could all risk an unhanded error. } ).IsCompleted) { RaiseAllItemsCompleteEvent(); } } catch (Exception ex) { // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception? // Do I need to call cts.Cancel here? // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that? // do i need to call cts.Dispose() ? MessageBox.Show(Logging.FormatException(ex)); } finally { if (myWcfProxy != null) { // possible race condition with the for-each threads here unless we wait for them to terminate. if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted) myWcfProxy.Abort(); myWcfProxy.Close(); } // possible race condition with the for-each threads here unless we wait for them to terminate. _requestedToStop = false; } }非常感激任何的幫助。MSDN 文件談到了 ManualResetEventSlim 和 cancelToken.WaitHandle。但不確定如何將它們連接起來,似乎很難理解 MSDN 範例,因為大多數範例都不適用。
我在下面模擬了一些程式碼,可以回答您的問題。基本點是您可以使用 Parallel.ForEach 獲得 fork/join 並行性,因此您無需擔心並行任務之外的競爭條件(呼叫執行緒阻塞,直到任務成功或其他方式完成)。您只想確保使用 LoopState 變數(lambda 的第二個參數)來控制循環狀態。
如果循環的任何迭代拋出未處理的異常,則整個循環將引發最後擷取的 AggregateException。
提及此主題的其他連結:
Parallel.ForEach 在處理非常大的數據集時拋出異常
http://msdn.microsoft.com/en-us/library/dd460720.aspx
Parallel.ForEach 是否限制活動執行緒的數量?
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.ServiceModel; namespace Temp { public class Class1 { private class MockWcfProxy { internal object ProcessIntervalConfiguration(string item) { return new Object(); } public CommunicationState State { get; set; } } private void myFunction() { IList<string> iListOfItems = new List<string>(); // populate iListOfItems CancellationTokenSource cts = new CancellationTokenSource(); ParallelOptions po = new ParallelOptions(); po.MaxDegreeOfParallelism = 20; // max threads po.CancellationToken = cts.Token; try { var myWcfProxy = new MockWcfProxy(); if (Parallel.ForEach(iListOfItems, po, (item, loopState) => { try { if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) loopState.Stop(); // long running blocking WS call, check before and after var response = myWcfProxy.ProcessIntervalConfiguration(item); if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) loopState.Stop(); // perform some local processing of the response object } catch (Exception ex) { // cannot continue game over. if (myWcfProxy.State == CommunicationState.Faulted) { loopState.Stop(); throw; } // FYI you are swallowing all other exceptions here... } // else carry on.. // raise some events and other actions that could all risk an unhanded error. } ).IsCompleted) { RaiseAllItemsCompleteEvent(); } } catch (AggregateException aggEx) { // This section will be entered if any of the loops threw an unhandled exception. // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here // to see those (if you want). } // Execution will not get to this point until all of the iterations have completed (or one // has failed, and all that were running when that failure occurred complete). } private void RaiseAllItemsCompleteEvent() { // Everything completed... } } }