プロデューサー/コンシューマー パターンを利用した IPv4 及び IPv6 接続待ち

前回はプロデューサー/コンシューマー パターンを利用して IPv4 で接続待ちを行うプログラムを書いてみましたが、今回は IPv4 と IPv6 の両方で接続待ちを行うプログラムを書いてみます。以前「IPv4 と IPv6 で接続待ちを行う」で BeginAcceptTcpClient を使って書いたものをプロデューサー/コンシューマー パターンを利用して書きなおしてみます。前回のものと違い、プロデューサ側が2つになるので、その点への対応が必要になります。

プロジェクト・ファイルをダウンロードしたい方は、[IpV4V6AcceptTest.zip] をダウンロードして下さい。

動作させたときのキューへの登録状況を表示する Trace 出力は次のとおりです。
Trace 出力

この画像はサーバー側でキュー登録の上限を2としコンシューマを2つ動作させ、クライアントを6つ立ち上げたときのものです。3つめ、4つめがキューに入り、5つめ、6つめがキュー登録待ちとなり、1つめの処理が終了して3つめがキューから取り出されると、5つめがキューに登録されています。

それでは、プログラムです。
まず、AsyncProducerConsumerCollection クラスです。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace IpV4V6AcceptTest.Server
{
    class AsyncProducerConsumerCollection<T>
    {
        // キューへの投入を待機させるためのキュー(TResult の bool 型はダミー)
        // このキューにセットされるオブジェクトの最大数はプロデューサの稼働数
        private readonly Queue<TaskCompletionSource<bool>> _queueLimitTaskQueue;
        // 渡すデータのキュー
        private readonly Queue<T> _collection;
        // 渡すデータが無かった時に未完了状態で返した TaskCompletionSource オブジェクトのキュー
        private readonly Queue<TaskCompletionSource<T>> _waiting;

        // queueEntriesLimit が 0 のときにはキューへの投入制限を行わない(デフォルト値 0)
        public AsyncProducerConsumerCollection(int queueEntriesLimit = 0)
        {
            _collection = new Queue<T>();
            _waiting = new Queue<TaskCompletionSource<T>>();
            QueueEntriesLimit = queueEntriesLimit;
            _queueLimitTaskQueue = new Queue<TaskCompletionSource<bool>>();
        }

        public int QueueEntriesLimit { get; private set; }

        public void Add(T item)
        {
            Add(item, default(CancellationToken));
        }

        public void Add(T item, CancellationToken token)
        {
            TaskCompletionSource<bool> queueLimitTask = null;

            lock (_collection)
            {
                // キューへの投入制限値に達していた場合、待機用の TaskCompletionSource オブジェクトを生成
                if (QueueEntriesLimit != 0 &&
                    _collection.Count >= QueueEntriesLimit)
                {
                    queueLimitTask = new TaskCompletionSource<bool>();
                    _queueLimitTaskQueue.Enqueue(queueLimitTask);
                    System.Diagnostics.Trace.WriteLine(
                        string.Format("{0} データキュー登録待ちの数: {1}",
                        DateTime.Now.ToLongTimeString(), _queueLimitTaskQueue.Count));
                }
            }
            // キューへの投入待機用オブジェクトが生成されていた場合には完了まで待機する
            if (queueLimitTask != null)
            {
                queueLimitTask.Task.Wait();
            }
            // キャンセル受付後に進入してきたら登録せずにリターン
            if (token.WaitHandle != null && token.IsCancellationRequested)
            {
                disposeItem(item);
                return;
            }

            TaskCompletionSource<T> tcs = null;
            lock (_collection) // キュー操作の排他制御
            {
                if (_waiting.Count > 0)
                {
                    tcs = _waiting.Dequeue();
                }
                else
                {
                    _collection.Enqueue(item);
                    System.Diagnostics.Trace.WriteLine(
                        string.Format("{0} データキューの数: {1}",
                        DateTime.Now.ToLongTimeString(), _collection.Count));
                }
            }
            if (tcs != null)
            {
                // 未完了状態のオブジェクトにセットするとき
                // item のセットとともに、タスクの状態を「正常に完了」への遷移を試みる
                tcs.TrySetResult(item);
            }
        }

        public Task<T> Take()
        {
            return Take(default(CancellationToken));
        }

        public Task<T> Take(CancellationToken token)
        {
            if (token.WaitHandle != null)
            {
                token.Register(() => canceled());
            }

            lock (_collection)
            {
                if (_collection.Count > 0)
                {
                    // データのキューが空でなければキューから取り出す
                    return Task<T>.Factory.StartNew(() =>
                    {
                        var item = _collection.Dequeue();
                        System.Diagnostics.Trace.WriteLine(
                            string.Format("{0} データキューの数: {1}",
                            DateTime.Now.ToLongTimeString(), _collection.Count));
                        // キューへの投入待機用オブジェクトが生成されていた場合には一つを進行させる
                        queueLimitDequeue();
                        return item;
                    });
                }
                else
                {
                    // キューが空だったら、未完了状態の TaskCompletionSource から作成されるタスクを返す
                    var tcs = new TaskCompletionSource<T>();
                    _waiting.Enqueue(tcs);
                    return tcs.Task;
                }
            }
        }

        private void canceled()
        {
            lock (_collection)
            {
                while (_queueLimitTaskQueue.Count > 0)
                {
                    // キューへの投入待機を進行させる
                    queueLimitDequeue();
                }
                
                // キューに入っているデータをクリア
                while (_collection.Count > 0)
                {
                    disposeItem(_collection.Dequeue());
                    System.Diagnostics.Trace.WriteLine(
                        string.Format("{0} データキューの数: {1}",
                        DateTime.Now.ToLongTimeString(), _collection.Count));
                }

                while (_waiting.Count > 0)
                {
                    // 未完了状態の TaskCompletionSource オブジェクトを取り出し
                    var tcs = _waiting.Dequeue();
                    // Canceld 状態への遷移を試みる
                    tcs.TrySetCanceled();
                }
            }
        }

        private void queueLimitDequeue()
        {
            if (_queueLimitTaskQueue.Count != 0)
            {
                _queueLimitTaskQueue.Dequeue().TrySetResult(true);
                System.Diagnostics.Trace.WriteLine(
                    string.Format("{0} データキュー登録待ちの数: {1}",
                    DateTime.Now.ToLongTimeString(), _queueLimitTaskQueue.Count));
            }
        }

        private void disposeItem(T item)
        {
            if (item as IDisposable != null)
            {
                (item as IDisposable).Dispose();
            }
        }
    }
}

プロデューサ側が IPv4 の接続待ちを行なっているものと IPv6 の接続待ちを行なっているものの2つになることから、「キューへの投入を待機させるためのキュー」を設けています。ソースコード中にコメントを記述したので、何をやっているのかは分かるかと思います。

列挙型の ServerError と ServerServiceException クラスは前回のものと同じなので省略します。

次に、ServerService クラスです。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace IpV4V6AcceptTest.Server
{
    class ServerService : IDisposable
    {
        private int _concurrencyLimit;
        private Encoding _encoding;
        private AsyncProducerConsumerCollection<TcpClient> _queue;
        private List<Task> _tasks;
        private TcpListener _listenerIpv4;
        private TcpListener _listenerIpv6;
        private bool _disposed = false;
        private const int QueueLimit = 2; // キュー投入の制限数

        // スレッドで例外が発生した場合に発生するイベント
        public event ThreadExceptionEventHandler ThreadExceptionOccurred;

        public ServerService(int concurrencyLimit, Encoding encoding)
        {
            _concurrencyLimit = concurrencyLimit;
            _encoding = encoding;
            _queue = new AsyncProducerConsumerCollection<TcpClient>(QueueLimit);
            _tasks = new List<Task>();
            _listenerIpv4 = null;
            _listenerIpv6 = null;
        }

        ~ServerService()
        {
            Dispose(false);
        }

        private Action cancel;

        public void StartService(string hostAddressIpv4, string hostAddressIpv6, int portNo)
        {
            try
            {
                var ipAddressV4 = checkHostAddress(hostAddressIpv4);
                var ipAddressV6 = checkHostAddress(hostAddressIpv6);
                checkPortNo(portNo);

                var cts = new CancellationTokenSource();
                cancel = () => cts.Cancel();

                // コンシューマを ConcurrencyLimit で指定された数立ち上げる
                for (var i = 0; i < _concurrencyLimit; ++i)
                {
                    _tasks.Add(TaskConsumerAsync(cts.Token));
                }

                if (_listenerIpv4 == null)
                {
                    _tasks.Add(TaskProducerAsync(ipAddressV4, portNo, cts.Token));
                }
                if (_listenerIpv6 == null)
                {
                    _tasks.Add(TaskProducerAsync(ipAddressV6, portNo, cts.Token));
                }
            }
            catch (ServerServiceException e)
            {
                raiseThreadExceptionOccurred(e);
            }
        }

        public void StopService()
        {
            if (cancel != null)
            {
                cancel();
                cancel = null;
            }

            if (_listenerIpv4 != null)
            {
                _listenerIpv4.Stop();
                _listenerIpv4 = null;
            }
            if (_listenerIpv6 != null)
            {
                _listenerIpv6.Stop();
                _listenerIpv6 = null;
            }
        }

        public void WaitTasksEnd()
        {
            Task.WaitAll(_tasks.ToArray());
        }

        private IPAddress checkHostAddress(string hostAddress)
        {
            if (string.IsNullOrEmpty(hostAddress))
            {
                throw new ServerServiceException(ServerError.IpAddressError,
                    "IP アドレスが設定されていません。");
            }

            var hostName = Dns.GetHostName();
            var addresses = Dns.GetHostAddresses(hostName);
            IPAddress ipAddress;
            if (!IPAddress.TryParse(hostAddress, out ipAddress))
            {
                throw new ServerServiceException(ServerError.IpAddressError,
                    "無効なアドレスが設定されています。");
            }
            if (!IPAddress.IsLoopback(ipAddress) && !addresses.Contains(ipAddress))
            {
                throw new ServerServiceException(ServerError.IpAddressError,
                    "設定されたアドレスは自ホストのアドレスではありません。");
            }

            return ipAddress;
        }

        private void checkPortNo(int portNo)
        {
            if (portNo < 0 || portNo > 65535)
            {
                throw new ServerServiceException(ServerError.PortOutOfRange,
                    "無効なポート番号が設定されています。");
            }
            if (portNo < 1024)
            {
                throw new ServerServiceException(ServerError.PortOutOfRange,
                    "ポート番号に予約ポートが設定されています。");
            }
        }

        private async Task TaskConsumerAsync(CancellationToken token)
        {
            try
            {
                while (!token.IsCancellationRequested)
                {
                    var client = await _queue.Take(token);
                    try
                    {
                        await doProcessAsync(client, token);
                    }
                    catch (IOException e)
                    {
                        if (e.InnerException.GetType() == typeof(SocketException) &&
                            (e.InnerException as SocketException).ErrorCode == 10054)
                        {
                            Console.WriteLine("{0} [コンシューマ] クライアントが切断していたためデータを送信できませんでした。",
                                DateTime.Now.ToLongTimeString());
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        Console.WriteLine("{0} [コンシューマ] 処理中にキャンセルが要求されました。",
                            DateTime.Now.ToLongTimeString());
                    }
                    finally
                    {
                        client.Close();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("{0} [コンシューマ] 処理データの待機中にキャンセルが要求されました。",
                    DateTime.Now.ToLongTimeString());
            }
        }

        private async Task doProcessAsync(TcpClient client, CancellationToken token)
        {
            using (var stream = client.GetStream())
            {
                Console.WriteLine("{0} [S] Server ready", DateTime.Now.ToLongTimeString());
                await sendDataAsync(stream, _encoding.GetBytes("Server ready"));

                var rData = await receiveDataAsync(stream);
                Console.WriteLine("{0} [C] {1}", DateTime.Now.ToLongTimeString(), _encoding.GetString(rData));

                token.ThrowIfCancellationRequested();

                // サーバー側処理の代わりに15秒待つ
                //(並行処理の様子の観察に都合が良い)
                await Task.Delay(5000);
                token.ThrowIfCancellationRequested();

                await Task.Delay(5000);
                token.ThrowIfCancellationRequested();

                await Task.Delay(5000);
                token.ThrowIfCancellationRequested();

                Console.WriteLine("{0} [S] {1}", DateTime.Now.ToLongTimeString(), _encoding.GetString(rData));
                await sendDataAsync(stream, rData);
            }
        }

        private async Task TaskProducerAsync(IPAddress ipAddress, int portNo, CancellationToken token)
        {
            TcpListener listener = null;
            try
            {
                listener = new TcpListener(ipAddress, portNo);
                listener.Start();
                if (ipAddress.AddressFamily == AddressFamily.InterNetwork)
                {
                    _listenerIpv4 = listener;
                }
                else
                {
                    _listenerIpv6 = listener;
                }
                Console.WriteLine("{0} {1}/Port[{2}]の Listen を開始しました。",
                    DateTime.Now.ToLongTimeString(), ipAddress.ToString(), portNo);

                while (!token.IsCancellationRequested)
                {
                    var client = await listener.AcceptTcpClientAsync();
                    Console.WriteLine("{0} 接続しました。(Local {1}, Remote {2})",
                        DateTime.Now.ToLongTimeString(), client.Client.LocalEndPoint.ToString(),
                        client.Client.RemoteEndPoint.ToString());
                    // 接続してきたクライアントをキューにセットする
                    _queue.Add(client, token);

                }
            }
            catch (SocketException e)
            {
                switch (e.ErrorCode)
                {
                    case 10004:
                        Console.WriteLine("{0} {1}/Port[{2}]の Listen が強制終了されました。",
                    DateTime.Now.ToLongTimeString(), ipAddress.ToString(), portNo);
                        break;
                    default:
                        Console.WriteLine("{0} Socket exception: {1}, errCode({2})",
                            DateTime.Now.ToLongTimeString(), e.Message, e.ErrorCode);
                        break;
                }
            }
            catch (ObjectDisposedException)
            {
                Console.WriteLine("{0} {1}/Port[{2}]の Listen が破棄されました。",
                    DateTime.Now.ToLongTimeString(), ipAddress.ToString(), portNo);
            }
            finally
            {
                if (listener != null)
                {
                    listener.Stop();
                    if (ipAddress.AddressFamily == AddressFamily.InterNetwork)
                    {
                        _listenerIpv4 = null;
                    }
                    else
                    {
                        _listenerIpv6 = null;
                    }
                }
            }
        }

        private async Task<byte[]> receiveDataAsync(NetworkStream stream)
        {
            byte[] result;
            using (var memStream = new MemoryStream())
            {
                var rBuff = new byte[1024];
                int rSize;
                do
                {
                    // データの一部を受信する
                    rSize = await stream.ReadAsync(rBuff, 0, rBuff.Length);
                    // rSize が 0 のときにはクライアントが切断したと判断
                    if (rSize == 0)
                    {
                        Console.WriteLine("クライアントが切断しました。");
                        throw new ServerServiceException(ServerError.ConnectionClose,
                            "ストリームの読み出しの際、クライアントが切断していました。");
                    }
                    // 受信したデータを蓄積する
                    memStream.Write(rBuff, 0, rSize);
                } while (stream.DataAvailable);

                result = memStream.ToArray();
            }
            Console.WriteLine("RecieveData(hex):{0}", bytes2Hex(result));
            return result;
        }

        private async Task sendDataAsync(NetworkStream stream, byte[] message)
        {
            await stream.WriteAsync(message, 0, message.Length);
            Console.WriteLine("SendData(hex):{0}", bytes2Hex(message));
        }

        private static string bytes2Hex(byte[] bytes)
        {
            var val = new StringBuilder();
            foreach (var n in bytes)
            {
                val.Append(string.Format("{0:X2} ", n));
            }
            return val.ToString();
        }

        // ThreadExceptionOccurred イベントを発火させる
        private void raiseThreadExceptionOccurred(Exception e)
        {
            var handler = ThreadExceptionOccurred;
            if (handler != null)
            {
                handler(this, new ThreadExceptionEventArgs(e));
            }
        }

        #region IDisposable

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (_disposed) return;

            _disposed = true;

            if (disposing)
            {
                // マネージ リソースの解放処理
            }
            // アンマネージ リソースの解放処理
            StopService();
        }

        #endregion
    }
}

こちらは IPv6 のプロデューサと IPv6 のプロデューサを管理できるようにしています。

次に、Server クラスです。

using System;
using System.Text;
using System.Threading;

namespace IpV4V6AcceptTest.Server
{
    class Server
    {
        private const string HostAddressIpv4 = "127.0.0.1";
        private const string HostAddressIpv6 = "::1";
        private const int PortNo = 10050;
        private const int ConcurrencyLimit = 2;
        private static readonly Encoding ServerEncoding = Encoding.UTF8;

        static void Main(string[] args)
        {
            var service = new ServerService(ConcurrencyLimit, ServerEncoding);
            service.ThreadExceptionOccurred += onException;
            service.StartService(HostAddressIpv4, HostAddressIpv6, PortNo);

            Console.WriteLine("Enter キー押下で Listen を終了します。");
            Console.ReadLine();
            service.StopService();

            Console.WriteLine("タスクの終了を待ちます。");
            service.WaitTasksEnd(); ;

            Console.WriteLine("Enter キー押下で終了します。");
            Console.ReadLine();
        }

        static void onException(object sender, ThreadExceptionEventArgs e)
        {
            Console.WriteLine("例外が発生しました。: {0}, {1}",
                e.Exception.GetType().Name, e.Exception.Message);
        }
    }
}

こちらは、IPv4 アドレスと IPv6 アドレスの2つを扱うように変更しています。

サーバー側のプログラムは以上です。長くなったので前回と同じくクライアント側は掲載しません。プログラムを動かしてみたい方は、先頭付近の Trace 出力画像の上にあるリンクから zip ファイルをダウンロードして下さい。動かす方法は、サーバー側をデバッグ実行(あるいは「デバッグ無しで開始」)して、クライアント側はコンパイル後の実行ファイルをエクスプローラーから起動します。


非同期処理(インデックス)


コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です