前回のプロデューサー/コンシューマー パターンを利用して、同時実行するタスクの数を制限できるプログラムを考えてみます。
実行時の画面は次のとおりです。
この画面では、コンシューマを3つ動かしていて、時刻の次の ProcessId: で 0 から 2 まで表示されています。4つめの "ddd" の表示は、コンシューマが全て塞がっている -> キューに置かれる -> "aaa" を処理していたコンシューマが空いてキューから取り出される -> "ddd" が処理される という手順を経ることにになり、コンシューマが実行する処理に5秒かかるようにしてあるので、最初の "aaa" の表示の5秒後となっています。
プログラムは次のとおりです。
まずは AsyncProducerConsumerCollection<T> クラスです(特に変更はありません)。
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ManagedTasksTest5
{
class AsyncProducerConsumerCollection<T>
{
// 渡すデータのキュー
private readonly Queue<T> _collection;
// 渡すデータ無かった時に未完了状態で返された TaskCompletionSource オブジェクトのキュー
private readonly Queue<TaskCompletionSource<T>> _waiting;
public AsyncProducerConsumerCollection()
{
_collection = new Queue<T>();
_waiting = new Queue<TaskCompletionSource<T>>();
}
public void Add(T item)
{
TaskCompletionSource<T> tcs = null;
lock (_collection)
{
if (_waiting.Count > 0)
{
tcs = _waiting.Dequeue();
}
else
{
_collection.Enqueue(item);
}
}
if (tcs != null)
{
// 未完了状態のオブジェクトにセットするとき
// item のセットともに、タスクの状態を「正常に完了」への遷移を試みる
tcs.TrySetResult(item);
}
}
public Task<T> Take()
{
return Take(default(CancellationToken));
}
public Task<T> Take(CancellationToken token)
{
if (token.WaitHandle != null)
{
token.Register(() => canceled());
}
lock (_collection)
{
if (_collection.Count > 0)
{
// データのキューが空でなければキューから取り出す
return Task.FromResult(_collection.Dequeue());
}
else
{
// キューが空だったら、未完了状態の TaskCompletionSource から作成されるタスクを返す
var tcs = new TaskCompletionSource<T>();
_waiting.Enqueue(tcs);
return tcs.Task;
}
}
}
private void canceled()
{
lock (_collection)
{
// キューに入っているデータをクリア
_collection.Clear();
while (_waiting.Count > 0)
{
// 未完了状態の TaskCompletionSource オブジェクトを取り出し
var tcs = _waiting.Dequeue();
// Canceld 状態への遷移を試みる
tcs.TrySetCanceled();
}
}
}
}
}
次に Program.cs
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ManagedTasksTest5
{
class Program
{
private const int ConcurrencyLimit = 3;
private static AsyncProducerConsumerCollection<string> _queue;
static void Main(string[] args)
{
_queue = new AsyncProducerConsumerCollection<string>();
var cts = new CancellationTokenSource();
// コンシューマを ConcurrencyLimit で指定された数立ち上げる
var taskList = new List<Task>();
for (var i = 0; i < ConcurrencyLimit; ++i)
{
taskList.Add(TaskConsumerAsync(i, cts.Token));
}
while (true)
{
Console.WriteLine("入力してください(終了: \"]\"キー)。");
var input = Console.ReadLine();
if (input == "]") break;
_queue.Add(input);
}
Console.WriteLine("Cancel を通知します。");
cts.Cancel();
Console.WriteLine("非同期処理の終了を待ちます。");
Task.WaitAll(taskList.ToArray());
Console.WriteLine("Enter キー押下で終了します。");
Console.ReadLine();
}
static async Task TaskConsumerAsync(int id, CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
var task = _queue.Take(token);
var item = await task;
var taskDoProcess = DoSomethingAsync(item, id, token);
var result = "";
try
{
result = await taskDoProcess;
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
}
if (taskDoProcess.Status == TaskStatus.RanToCompletion)
{
var message = string.Format("{0:T} {1}", DateTime.Now, result);
Console.WriteLine(message);
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("処理データの待機中にキャンセルが要求されました。");
}
}
static async Task<string> DoSomethingAsync(string s, int id, CancellationToken token)
{
await Task.Delay(2500);
if (token.IsCancellationRequested)
{
var message = string.Format("キャンセルしました: 処理中のデータ({0})", s);
throw new OperationCanceledException(message);
}
await Task.Delay(2500);
return string.Format("ProcessId: {0}, Input: {1}", id, s);
}
}
}
コンシューマを複数立ち上げて管理するために、19行目で List<Task> 型の taskList を作り、20行目からの for ループでコンシューマを ConcurrencyLimit で指定する数作成しています。
次回は、以前作ったクライアント・サーバー型のスケルトン的なものをプロデューサー/コンシューマー パターンを利用して書きなおしてみます。
「非同期処理(その5)」への1件のフィードバック