IPv4 と IPv6 で接続待ちを行う

接続待ちの別スレッド化を行なってみたので、次は IPv4 と IPv6 の双方で接続待ちを行うプログラムを書いてみました 🙂
接続待ちを IPv4 と IPv6 で平行して行う必要があるので、accept は非ブロッキングな BeginAcceptTcpClient を利用するようにしています。また、 接続要求を二箇所で受けることになることから、クライアントへの同時サービス数の制御が「サーバー側接続待ちの別スレッド化の試作」の方法では行えなくなるので、接続要求の受け付けとサービス処理を切り離し、両者をキューを介して結びつけるようにしています。

サーバー側をプロデューサー/コンシューマー パターンを利用して書きなおしたものを投稿したので、興味のある方はどうぞ[2013/04/06]。

プロジェクト・ファイルをダウンロードしたい方は、[BeginAcceptTest001.zip] をダウンロードして下さい.

ちなみに、並行処理の最大数を3にして、4つのクライアントから IPv4 と IPv6 での接続要求を交互に出したときのコンソールへの表示内容は次のようになります。IPv6 アドレスは「fe80::[インターフェースID]」と表記します。

Enter キー押下で Listen を終了します。
2012/02/05 14:51:06 Listen スレッドが開始しました。
2012/02/05 14:51:06 fe80::[インターフェースID] Port[2001]の Listen を開始しました。
2012/02/05 14:51:06 192.168.1.3 Port[2001]の Listen を開始しました。
2012/02/05 14:51:06 ipv6 の接続を待ちます
2012/02/05 14:51:06 ipv4 の接続を待ちます
2012/02/05 14:51:06 接続待ち待機
2012/02/05 14:51:06 キューのセット待ち
2012/02/05 14:51:23 Callback が呼び出されました。
2012/02/05 14:51:23 192.168.1.3 port[2001] クライアントが接続しました。
2012/02/05 14:51:23 client をキューに登録しました。
2012/02/05 14:51:23 キューのセット待ち通過
2012/02/05 14:51:23 接続待機のシグナルをセット
2012/02/05 14:51:23 接続待ち通過
2012/02/05 14:51:23 ipv4 の接続を待ちます
2012/02/05 14:51:23 接続待ち待機
2012/02/05 14:51:23 セマフォ待ち
2012/02/05 14:51:23 セマフォ待ち通過
2012/02/05 14:51:23 client をキューから取り出しました。
task数: 1
2012/02/05 14:51:23 キューのセット待ち
2012/02/05 14:51:23 [S] Server ready
2012/02/05 14:51:23 [C] abc0123あいうえお
2012/02/05 14:51:25 Callback が呼び出されました。
2012/02/05 14:51:25 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。
2012/02/05 14:51:25 client をキューに登録しました。
2012/02/05 14:51:25 接続待機のシグナルをセット
2012/02/05 14:51:25 接続待ち通過
2012/02/05 14:51:25 ipv6 の接続を待ちます
2012/02/05 14:51:25 接続待ち待機
2012/02/05 14:51:25 キューのセット待ち通過
2012/02/05 14:51:25 セマフォ待ち
2012/02/05 14:51:25 セマフォ待ち通過
2012/02/05 14:51:25 client をキューから取り出しました。
task数: 2
2012/02/05 14:51:25 キューのセット待ち
2012/02/05 14:51:25 [S] Server ready
2012/02/05 14:51:25 [C] abc0123あいうえお
2012/02/05 14:51:27 Callback が呼び出されました。
2012/02/05 14:51:27 192.168.1.3 port[2001] クライアントが接続しました。
2012/02/05 14:51:27 client をキューに登録しました。
2012/02/05 14:51:27 接続待機のシグナルをセット
2012/02/05 14:51:27 接続待ち通過
2012/02/05 14:51:27 ipv4 の接続を待ちます
2012/02/05 14:51:27 接続待ち待機
2012/02/05 14:51:27 キューのセット待ち通過
2012/02/05 14:51:27 セマフォ待ち
2012/02/05 14:51:27 セマフォ待ち通過
2012/02/05 14:51:27 client をキューから取り出しました。
task数: 3
2012/02/05 14:51:27 キューのセット待ち
2012/02/05 14:51:27 [S] Server ready
2012/02/05 14:51:27 [C] abc0123あいうえお
2012/02/05 14:51:29 Callback が呼び出されました。
2012/02/05 14:51:29 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。
2012/02/05 14:51:29 client をキューに登録しました。
2012/02/05 14:51:29 接続待機のシグナルをセット
2012/02/05 14:51:29 接続待ち通過
2012/02/05 14:51:29 ipv6 の接続を待ちます
2012/02/05 14:51:29 接続待ち待機
2012/02/05 14:51:29 キューのセット待ち通過
2012/02/05 14:51:29 セマフォ待ち
2012/02/05 14:51:38 [S] abc0123あいうえお
2012/02/05 14:51:38 192.168.1.3 port[2001] クライアントが切断しました。
task数: 2
2012/02/05 14:51:38 セマフォ待ち通過
2012/02/05 14:51:38 client をキューから取り出しました。
task数: 3
2012/02/05 14:51:38 キューのセット待ち
2012/02/05 14:51:38 [S] Server ready
2012/02/05 14:51:38 [C] abc0123あいうえお
2012/02/05 14:51:40 [S] abc0123あいうえお
2012/02/05 14:51:40 fe80::[インターフェースID]%11 port[2001] クライアントが切断しました。
task数: 2
2012/02/05 14:51:42 [S] abc0123あいうえお
2012/02/05 14:51:42 192.168.1.3 port[2001] クライアントが切断しました。
task数: 1
2012/02/05 14:51:53 [S] abc0123あいうえお
2012/02/05 14:51:53 fe80::[インターフェースID]%11 port[2001] クライアントが切断しました。
task数: 0

2012/02/05 14:52:07 サービス終了が要求されました。
2012/02/05 14:52:07 fe80::[インターフェースID] Port[2001]の Listen を終了します。
2012/02/05 14:52:07 192.168.1.3 Port[2001]の Listen を終了します。
2012/02/05 14:52:07 キューのセット待ち通過
2012/02/05 14:52:07 セマフォ待ち
2012/02/05 14:52:07 セマフォ待ち通過
2012/02/05 14:52:07 子タスク・スレッドの終了を検知しました。
2012/02/05 14:52:07 Callback が呼び出されました。
2012/02/05 14:52:07 接続待機のシグナルをセット
2012/02/05 14:52:07 Callback が呼び出されました。
2012/02/05 14:52:07 接続待機のシグナルをセット
2012/02/05 14:52:07 接続待ち通過
2012/02/05 14:52:07 Accept 終了待ち待機
2012/02/05 14:52:07 Accept 終了待ち通過
2012/02/05 14:52:07 ListenEnded セット
2012/02/05 14:52:07 TakeQueue 終了待ち待機
2012/02/05 14:52:07 TakeQueue 終了待ち通過
2012/02/05 14:52:07 Listen スレッドの 終了を検知しました。
Enter キー押下で終了します。

クライアント1の表示

>BeginAcceptTest001.Client.exe ipv4
2012/02/05 14:51:23 サーバー(192.168.1.3 Port[2001])と接続しました。
2012/02/05 14:51:23 [S] Server ready
2012/02/05 14:51:23 [C] abc0123あいうえお
2012/02/05 14:51:38 [S] abc0123あいうえお
2012/02/05 14:51:38 切断しました。

クライアント2の表示

>BeginAcceptTest001.Client.exe ipv6
2012/02/05 14:51:25 サーバー(fe80::[インターフェースID] Port[2001])と接続しました。
2012/02/05 14:51:25 [S] Server ready
2012/02/05 14:51:25 [C] abc0123あいうえお
2012/02/05 14:51:40 [S] abc0123あいうえお
2012/02/05 14:51:40 切断しました。

クライアント3の表示

>BeginAcceptTest001.Client.exe ipv4
2012/02/05 14:51:27 サーバー(192.168.1.3 Port[2001])と接続しました。
2012/02/05 14:51:27 [S] Server ready
2012/02/05 14:51:27 [C] abc0123あいうえお
2012/02/05 14:51:42 [S] abc0123あいうえお
2012/02/05 14:51:42 切断しました。

クライアント4の表示

>BeginAcceptTest001.Client.exe ipv6
2012/02/05 14:51:29 サーバー(fe80::c8ce:a09b:8b18:b2a Port[2001])と接続しました。
2012/02/05 14:51:38 [S] Server ready
2012/02/05 14:51:38 [C] abc0123あいうえお
2012/02/05 14:51:53 [S] abc0123あいうえお
2012/02/05 14:51:53 切断しました。

サービス処理のタスクが3つ起動し、1つの接続要求がキューに入っている状態で Enter キーを押下して Listen を終了させたときのコンソールへの表示内容は次のようになります。

Enter キー押下で Listen を終了します。
2012/02/05 16:56:05 Listen スレッドが開始しました。
2012/02/05 16:56:05 キューのセット待ち
2012/02/05 16:56:05 fe80::[インターフェースID] Port[2001]の Listen を開始しました。
2012/02/05 16:56:05 192.168.1.3 Port[2001]の Listen を開始しました。
2012/02/05 16:56:05 ipv6 の接続を待ちます
2012/02/05 16:56:05 ipv4 の接続を待ちます
2012/02/05 16:56:05 接続待ち待機
2012/02/05 16:56:18 Callback が呼び出されました。
2012/02/05 16:56:18 192.168.1.3 port[2001] クライアントが接続しました。
2012/02/05 16:56:18 client をキューに登録しました。
2012/02/05 16:56:18 キューのセット待ち通過
2012/02/05 16:56:18 接続待機のシグナルをセット
2012/02/05 16:56:18 接続待ち通過
2012/02/05 16:56:18 ipv4 の接続を待ちます
2012/02/05 16:56:18 セマフォ待ち
2012/02/05 16:56:18 接続待ち待機
2012/02/05 16:56:18 セマフォ待ち通過
2012/02/05 16:56:18 client をキューから取り出しました。
task数: 1
2012/02/05 16:56:18 キューのセット待ち
2012/02/05 16:56:18 [S] Server ready
2012/02/05 16:56:19 [C] abc0123あいうえお
2012/02/05 16:56:21 Callback が呼び出されました。
2012/02/05 16:56:21 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。
2012/02/05 16:56:21 client をキューに登録しました。
2012/02/05 16:56:21 接続待機のシグナルをセット
2012/02/05 16:56:21 接続待ち通過
2012/02/05 16:56:21 ipv6 の接続を待ちます
2012/02/05 16:56:21 キューのセット待ち通過
2012/02/05 16:56:21 接続待ち待機
2012/02/05 16:56:21 セマフォ待ち
2012/02/05 16:56:21 セマフォ待ち通過
2012/02/05 16:56:21 client をキューから取り出しました。
task数: 2
2012/02/05 16:56:21 キューのセット待ち
2012/02/05 16:56:21 [S] Server ready
2012/02/05 16:56:21 [C] abc0123あいうえお
2012/02/05 16:56:23 Callback が呼び出されました。
2012/02/05 16:56:23 192.168.1.3 port[2001] クライアントが接続しました。
2012/02/05 16:56:23 client をキューに登録しました。
2012/02/05 16:56:23 接続待機のシグナルをセット
2012/02/05 16:56:23 接続待ち通過
2012/02/05 16:56:23 ipv4 の接続を待ちます
2012/02/05 16:56:23 キューのセット待ち通過
2012/02/05 16:56:23 接続待ち待機
2012/02/05 16:56:23 セマフォ待ち
2012/02/05 16:56:23 セマフォ待ち通過
2012/02/05 16:56:23 client をキューから取り出しました。
task数: 3
2012/02/05 16:56:23 キューのセット待ち
2012/02/05 16:56:23 [S] Server ready
2012/02/05 16:56:23 [C] abc0123あいうえお
2012/02/05 16:56:26 Callback が呼び出されました。
2012/02/05 16:56:26 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。
2012/02/05 16:56:26 client をキューに登録しました。
2012/02/05 16:56:26 接続待機のシグナルをセット
2012/02/05 16:56:26 キューのセット待ち通過
2012/02/05 16:56:26 セマフォ待ち
2012/02/05 16:56:26 接続待ち通過
2012/02/05 16:56:26 ipv6 の接続を待ちます
2012/02/05 16:56:26 接続待ち待機

2012/02/05 16:56:27 サービス終了が要求されました。
2012/02/05 16:56:27 fe80::[インターフェースID] Port[2001]の Listen を終了します。
2012/02/05 16:56:27 Callback が呼び出されました。
2012/02/05 16:56:27 接続待機のシグナルをセット
2012/02/05 16:56:27 接続待ち通過
2012/02/05 16:56:27 192.168.1.3 Port[2001]の Listen を終了します。
2012/02/05 16:56:27 Callback が呼び出されました。
2012/02/05 16:56:27 接続待機のシグナルをセット
2012/02/05 16:56:27 Accept 終了待ち待機
2012/02/05 16:56:27 Accept 終了待ち通過
2012/02/05 16:56:27 TakeQueue 終了待ち待機
2012/02/05 16:56:27 ListenEnded セット
2012/02/05 16:56:28 子タスクをキャンセルしました。
2012/02/05 16:56:28 192.168.1.3 port[2001] クライアントが切断しました。
task数: 2
2012/02/05 16:56:28 セマフォ待ち通過
2012/02/05 16:56:28 fe80::[インターフェースID]%11 からの接続要求を閉じます
2012/02/05 16:56:29 子タスクをキャンセルしました。
2012/02/05 16:56:29 192.168.1.3 port[2001] クライアントが切断しました。
task数: 1
2012/02/05 16:56:31 子タスクをキャンセルしました。
2012/02/05 16:56:31 fe80::[インターフェースID]%11 port[2001] クライアントが切断しました。
task数: 0
2012/02/05 16:56:31 子タスク・スレッドの終了を検知しました。
2012/02/05 16:56:31 TakeQueue 終了待ち通過
2012/02/05 16:56:31 Listen スレッドの 終了を検知しました。
Enter キー押下で終了します。

まずは Main メソッドから。
ServerService クラスの ServiceStart メソッドを使用することでクライアントへのサービスを開始し、ServiceEnd メソッドを使用することでクライアントへのサービスを終了します。
IPv6 アドレスにリンクローカル・アドレスを指定するときには、’%インターフェース番号’付きで指定してください。

//
        private const string ipv4Addr = "192.168.1.3";
        private const string ipv6Addr = "fe80::[インターフェースID]";
        private const int port = 2001;

        static void Main(string[] args)
        {
            try
            {
                var serverService = new ServerService()
                {
                    Ipv4Addr = ipv4Addr,
                    Ipv6Addr = ipv6Addr,
                    Port = port,
                    Encoding = System.Text.Encoding.UTF8
                };

                serverService.StartService();

                Console.WriteLine("Enter キー押下で Listen を終了します。");
                Console.ReadLine();

                serverService.StopService();
            }
            catch (ServerServiceException e)
            {
                Console.WriteLine(e.Message);
            }

            Console.WriteLine("Enter キー押下で終了します。");
            Console.ReadLine();
        }

で、その ServerService クラスです。
プロパティ等やコンストラクタ、StartService メソッド、StopService メソッドです。

//
        public string Ipv4Addr { get; set; }
        public string Ipv6Addr { get; set; }
        public int Port { get; set; }
        public System.Text.Encoding Encoding { get; set; }

        private System.Net.Sockets.TcpListener v4Listner = null;
        private System.Net.Sockets.TcpListener v6Listner = null;
        private System.Threading.Tasks.Task taskListen = null;
        private volatile bool isServiceEnd = false;
        private bool isV4Connected = true;
        private bool isV6Connected = true;
        private System.Threading.ManualResetEvent clientConnected =
            new System.Threading.ManualResetEvent(false);
        private System.Threading.ManualResetEvent ListenEnded;
        private int acceptWaitCounter = 0;
        private object acceptWaitCounterLock = new object();
        private Action notifyCancel;
        private TakeClientQueue takeQueue;

        public ServerService()
        {
        }

        public void StartService()
        {
            if (string.IsNullOrEmpty(Ipv4Addr))
                throw new ServerServiceException("IPv4アドレスが設定されていません。",
                    ServerServiceException.Errors.IpAddressError);
            if (string.IsNullOrEmpty(Ipv6Addr))
                throw new ServerServiceException("IPv6アドレスが設定されていません。",
                    ServerServiceException.Errors.IpAddressError);
            if (Port < 1024)
                throw new ServerServiceException("ポート番号に予約ポートが設定されています。",
                    ServerServiceException.Errors.PortOutOfRange);
            if (Encoding == null)
                throw new ServerServiceException("Encoding がセットされていません。",
                    ServerServiceException.Errors.EncodingNotSet);

            // 指定された IP アドレスが自ホストのものであることを確認する
            var hostName = System.Net.Dns.GetHostName();
            var addrList = System.Net.Dns.GetHostAddresses(hostName);
            try
            {
                if (!addrList.Contains(System.Net.IPAddress.Parse(Ipv4Addr)))
                    throw new ServerServiceException("指定されたIPv4アドレスは自ホストのアドレスではありません。",
                        ServerServiceException.Errors.IpAddressError);
            }
            catch (FormatException)
            {
                throw new ServerServiceException("指定されたIPv4アドレスは有効なアドレスではありません。",
                    ServerServiceException.Errors.IpAddressError);
            }
            try
            {
                if (!addrList.Contains(System.Net.IPAddress.Parse(Ipv6Addr)))
                    throw new ServerServiceException("指定されたIPv6アドレスは自ホストのアドレスではありません。",
                        ServerServiceException.Errors.IpAddressError);
            }
            catch
            {
                throw new ServerServiceException("指定されたIPv6アドレスは有効なアドレスではありません。",
                    ServerServiceException.Errors.IpAddressError);
            }

            taskListen = System.Threading.Tasks.Task.Factory.StartNew(() =>
            {
                startListen();
            });

        }

        public void StopService()
        {
            Console.WriteLine("{0} サービス終了が要求されました。", DateTime.Now);
            ListenEnded = new System.Threading.ManualResetEvent(false); // ListenEnded を非シグナル状態で作成
            isServiceEnd = true;
            if (notifyCancel != null)
                notifyCancel();
            endListen();
            taskListen.Wait();
            Console.WriteLine("{0} Listen スレッドの 終了を検知しました。",
                DateTime.Now.ToString());
        }

次は startListen メソッドです。IPv4 と IPv6 で接続要求待ちを行い、接続要求が来たら acceptCallback がスケジュールされるようにしています。また、サービス処理を行う TakeClientQueue クラスのインスタンスを生成し、Start メソッドを別スレッドで起動しています。

//
        private void startListen()
        {
            var tokenSource = new System.Threading.CancellationTokenSource();
            notifyCancel = new Action(() => tokenSource.Cancel());
            var token = tokenSource.Token;

            Console.WriteLine("{0} Listen スレッドが開始しました。",
                DateTime.Now.ToString());

            takeQueue = new TakeClientQueue(token, Encoding);
            System.Threading.Tasks.Task takeTask = null;

            try
            {
                takeTask = new System.Threading.Tasks.Task(() => takeQueue.Start(), token);
                takeTask.Start();

                var ipv6Address = System.Net.IPAddress.Parse(Ipv6Addr);
                v6Listner = new System.Net.Sockets.TcpListener(ipv6Address, Port);
                v6Listner.Start();
                Console.WriteLine("{0} {1} Port[{2}]の Listen を開始しました。",
                    DateTime.Now.ToString(), ipv6Address.ToString(), Port);
                var ipv4Address = System.Net.IPAddress.Parse(Ipv4Addr);
                v4Listner = new System.Net.Sockets.TcpListener(ipv4Address, Port);
                v4Listner.Start();
                Console.WriteLine("{0} {1} Port[{2}]の Listen を開始しました。",
                    DateTime.Now.ToString(), ipv4Address.ToString(), Port);

                while (true)
                {
                    clientConnected.Reset();

                    if (isServiceEnd)     // 終了要求時の対応
                        break;

                    if (isV6Connected)
                    {   // V6接続の Callback が行われたら、新たなV6接続待ちを行う
                        isV6Connected = false;
                        Console.WriteLine("{0} ipv6 の接続を待ちます", DateTime.Now.ToString());
                        lock (acceptWaitCounterLock)
                        {
                            acceptWaitCounter++;
                        }
                        v6Listner.BeginAcceptTcpClient(new AsyncCallback(acceptCallback), v6Listner);
                    }

                    if (isV4Connected)
                    {   // V4接続の Callback が行われたら、新たなV4接続待ちを行う
                        isV4Connected = false;
                        Console.WriteLine("{0} ipv4 の接続を待ちます", DateTime.Now.ToString());
                        lock (acceptWaitCounterLock)
                        {
                            acceptWaitCounter++;
                        }
                        v4Listner.BeginAcceptTcpClient(new AsyncCallback(acceptCallback), v4Listner);
                    }

                    Console.WriteLine("{0} 接続待ち待機", DateTime.Now);
                    clientConnected.WaitOne();  // 接続が行われるまで待機する
                    Console.WriteLine("{0} 接続待ち通過", DateTime.Now);
                }
            }
            finally
            {
                Console.WriteLine("{0} Accept 終了待ち待機", DateTime.Now);
                ListenEnded.WaitOne();  // 別スレッドの Accept 待ちが終了するのを待つ
                Console.WriteLine("{0} Accept 終了待ち通過", DateTime.Now);

                Console.WriteLine("{0} TakeQueue 終了待ち待機", DateTime.Now);
                if (takeTask != null)
                    takeTask.Wait();
                Console.WriteLine("{0} TakeQueue 終了待ち通過", DateTime.Now);

                clientConnected.Close();

                tokenSource.Dispose();
            }

        }

次は endListen メソッドです。

//
        private void endListen()
        {
            Console.WriteLine("{0} {1} Port[{2}]の Listen を終了します。",
                DateTime.Now.ToString(), Ipv6Addr, Port);
            if (v6Listner != null)
                v6Listner.Stop();
            Console.WriteLine("{0} {1} Port[{2}]の Listen を終了します。",
                DateTime.Now.ToString(), Ipv4Addr, Port);
            if (v4Listner != null)
                v4Listner.Stop();

            takeQueue.Stop();
        }

次は acceptCallback メソッドです。接続してきたクライアントとの通信を行う TcpClient を取得して、キューにセットします。

//
        private void acceptCallback(IAsyncResult ar)
        {
            Console.WriteLine("{0} Callback が呼び出されました。", DateTime.Now);
            if (!isServiceEnd)
            {
                if (ar.IsCompleted)
                {
                    var listner = (System.Net.Sockets.TcpListener)ar.AsyncState;
                    var client = listner.EndAcceptTcpClient(ar);
                    Console.WriteLine("{0} {1} port[{2}] クライアントが接続しました。",
                        DateTime.Now.ToString(),
                        ((System.Net.IPEndPoint)listner.LocalEndpoint).Address.ToString(),
                        ((System.Net.IPEndPoint)listner.LocalEndpoint).Port.ToString());

                    // client をキューにセット
                    takeQueue.AddQueue(client);

                    if (((System.Net.IPEndPoint)listner.LocalEndpoint).Address.AddressFamily ==
                        System.Net.Sockets.AddressFamily.InterNetwork)
                    {
                        isV4Connected = true;
                    }
                    else
                    {
                        isV6Connected = true;
                    }
                }
            }

            lock (acceptWaitCounterLock)
            {
                acceptWaitCounter--;
            }
            Console.WriteLine("{0} 接続待機のシグナルをセット", DateTime.Now);
            clientConnected.Set();  // 接続待機をシグナル状態にする
            checkEndListen();
        }

次は checkEndListen メソッドです。

//
        private void checkEndListen()
        {
            if (isServiceEnd && acceptWaitCounter <= 0)
            {
                ListenEnded.Set();  // 終了時に Accept 待ちがなくなったらシグナル状態にする
                Console.WriteLine("{0} ListenEnded セット", DateTime.Now);
            }
        }

そして、サービス処理を行う TakeClientQueue クラスです。クラスのアクセス修飾子は internal にしています。
プロパティ等やコンストラクタです。

//
    internal class TakeClientQueue
    {
        internal System.Text.Encoding Encoding { get; private set; }
        
        private const int LIMIT_CONCURRENCY = 3;
        private Queue<System.Net.Sockets.TcpClient> clientQueue;
        private object queueLock = new object();
        private System.Threading.ManualResetEvent takenSignal;
        private System.Threading.CancellationToken cancelToken;
        private bool isStart;

        internal TakeClientQueue(System.Threading.CancellationToken token, System.Text.Encoding enc)
        {
            cancelToken = token;
            Encoding = enc;
            isStart = false;
        }

次は AddQueue メソッドです。TcpClient をキューにセットして、checkQueue メソッドを呼び出します。

//
        internal void AddQueue(System.Net.Sockets.TcpClient client)
        {
            if (!isStart)
                throw new InvalidOperationException(
                    "AddQueue メソッドを呼び出すことはできません(Start メソッドが呼び出されていないか、終了が要求されています)。");

            lock (queueLock)
            {
                clientQueue.Enqueue(client);
                Console.WriteLine("{0} client をキューに登録しました。", DateTime.Now);
            }
            checkQueue();
        }

次は Start メソッドです。このメソッドの中のループでキューからの取り出しと別スレッドでのサービス処理をコントロールしています。

//
        internal void Start()
        {
            System.Threading.Semaphore pool = null;
            List<System.Threading.Tasks.Task> taskList = null;
            object taskListLock = new object();

            try
            {
                clientQueue = new Queue<System.Net.Sockets.TcpClient>();
                takenSignal = new System.Threading.ManualResetEvent(false); // 非シグナル状態で作成
                pool = new System.Threading.Semaphore(LIMIT_CONCURRENCY, LIMIT_CONCURRENCY);
                taskList = new List<System.Threading.Tasks.Task>();
                isStart = true;

                while (true)
                {
                    Console.WriteLine("{0} キューのセット待ち", DateTime.Now);
                    takenSignal.WaitOne();  // キューにセットされるまで待機する
                    Console.WriteLine("{0} キューのセット待ち通過", DateTime.Now);
                    Console.WriteLine("{0} セマフォ待ち", DateTime.Now);
                    pool.WaitOne();         // セマフォ待機(taskの並行処理数)
                    Console.WriteLine("{0} セマフォ待ち通過", DateTime.Now);

                    if (cancelToken.IsCancellationRequested)     // 終了要求時の対応
                        break;

                    // 別スレッドで doProcess() を動かすようにセット
                    System.Net.Sockets.TcpClient client;
                    lock (queueLock)
                    {
                        client = clientQueue.Dequeue(); // キューから取り出し
                        Console.WriteLine("{0} client をキューから取り出しました。", DateTime.Now);
                    }
                    var task = new System.Threading.Tasks.Task(() => doProcess(client, cancelToken), cancelToken);
                    lock (taskListLock)
                    {
                        taskList.Add(task);
                        Console.WriteLine("task数: {0}", taskList.Count);
                    }
                    //スレッド終了時に TcpClient のクローズとセマフォの開放を行うようにセット
                    task.ContinueWith((t) =>
                    {
                        client.Close();

                        //if (!isServiceEnd)
                        pool.Release();
                        lock (taskListLock)
                        {
                            taskList.Remove(task);
                            Console.WriteLine("task数: {0}", taskList.Count);
                        }
                    });
                    task.Start();

                    checkQueue();   // キューの状態をチェックしてシグナルをセットする
                }
            }
            finally
            {
                lock (queueLock)
                {
                    while (clientQueue.Count != 0)
                    {   // キューに接続要求が残っていたら、後始末
                        var client = clientQueue.Dequeue();
                        Console.WriteLine("{0} {1} からの接続要求を閉じます", DateTime.Now,
                            ((System.Net.IPEndPoint)client.Client.RemoteEndPoint).Address);
                        client.Close();     // キューにセットされている接続要求をクローズ
                    }
                }

                System.Threading.Tasks.Task[] tasks;
                lock (taskListLock)
                {
                    tasks = taskList.ToArray();
                }
                System.Threading.Tasks.Task.WaitAll(tasks);
                Console.WriteLine("{0} 子タスク・スレッドの終了を検知しました。",
                    DateTime.Now.ToString());

                if (pool != null)
                    pool.Close();

                if (takenSignal != null)
                {
                    takenSignal.Dispose();
                    takenSignal = null;
                }
            }

        }

次は Stop メソッドです。

//
        internal void Stop()
        {
            if (!isStart)
                throw new InvalidOperationException("Start メソッドより前に Stop メソッドを呼び出すことはできません。");

            takenSignal.Set();
            isStart = false;
        }

次は checkQueue メソッドです。

//
        private void checkQueue()
        {
            lock (queueLock)
            {
                // キャンセル要求が来ているか、キューが空でなかったらシグナル状態へ
                if (cancelToken.IsCancellationRequested || clientQueue.Count != 0)
                    takenSignal.Set();
                else
                    takenSignal.Reset();
            }
        }

次は doProcess メソッドです。

//
        private void doProcess(System.Net.Sockets.TcpClient client, System.Threading.CancellationToken token)
        {
            System.Net.Sockets.NetworkStream stream = null;
            try
            {
                // ネットワークストリームを取得
                stream = client.GetStream();
                sendData(stream, "Server ready");

                var rString = receiveData(stream);

                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                // サーバー側処理の代わりに15秒待つ
                //(並行処理の様子の観察に都合が良い)
                System.Threading.Thread.Sleep(5000);
                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                System.Threading.Thread.Sleep(5000);
                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                System.Threading.Thread.Sleep(5000);
                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                sendData(stream, rString);
            }
            catch (System.IO.IOException e)
            {
                if (e.InnerException.GetType() == typeof(System.Net.Sockets.SocketException) &&
                    (e.InnerException as System.Net.Sockets.SocketException).ErrorCode == 10054)
                    Console.WriteLine("{0} クライアントが切断していたためデータを送信できませんでした。",
                        DateTime.Now.ToString());
                else
                    throw;
            }
            catch (OperationCanceledException)
            {   // キャンセル実施をキャッチ
                Console.WriteLine("{0} 子タスクをキャンセルしました。", DateTime.Now.ToString());
            }
            finally
            {
                if (stream != null)
                {
                    Console.WriteLine("{0} {1} port[{2}] クライアントが切断しました。",
                        DateTime.Now.ToString(),
                        ((System.Net.IPEndPoint)client.Client.LocalEndPoint).Address.ToString(),
                        ((System.Net.IPEndPoint)client.Client.LocalEndPoint).Port.ToString());
                    stream.Close();
                }
            }
        }

次は receiveData メソッドと sendData メソッドです。

//
        private string receiveData(System.IO.Stream stream)
        {
            var receiveMessage = "";
            using (var memStream = new System.IO.MemoryStream())
            {
                var rBuff = new byte[256];
                int rSize;
                do
                {
                    // データの一部を受信する
                    rSize = stream.Read(rBuff, 0, rBuff.Length);
                    // rSize が 0 のときにはクライアントが切断したと判断
                    if (rSize == 0)
                    {
                        Console.WriteLine("クライアントが切断しました。");
                        throw new ServerServiceException("ストリームの読み出しの際、クライアントが切断していました。",
                            ServerServiceException.Errors.ConnectionClose);
                    }
                    // 受信したデータを蓄積する
                    memStream.Write(rBuff, 0, rSize);
                } while ((stream as System.Net.Sockets.NetworkStream).DataAvailable);

                // 受信したデータを文字列に変換
                receiveMessage = Encoding.GetString(memStream.ToArray());
                memStream.Close();
            }

            Console.WriteLine("{0} [C] {1}", DateTime.Now.ToString(), receiveMessage);
            return receiveMessage;
        }

        private void sendData(System.IO.Stream stream, string str)
        {
            // 文字列をバイト配列へ
            var data = Encoding.GetBytes(str);
            stream.Write(data, 0, data.Length);
            Console.WriteLine("{0} [S] {1}", DateTime.Now.ToString(), str);
        }

そして、ServerServiceException クラスです。

//
    class ServerServiceException : ApplicationException
    {
        public enum Errors
        {
            IpAddressError = 0, PortOutOfRange, EncodingNotSet, ConnectionClose
        }

        public int ErrorCode { get; private set; }

        public ServerServiceException(string message, Errors error)
            : base(message)
        {
            ErrorCode = (int)error;
        }
    }

クライアント側の変更は IPv4 で接続するか、IPv6 で接続するかくらいなので、ソースを見たい方はプロジェクト・ファイルをダウンロードして見てください 😉


IPv4 と IPv6 で接続待ちを行う」への2件のフィードバック

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です