接続待ちの別スレッド化を行なってみたので、次は IPv4 と IPv6 の双方で接続待ちを行うプログラムを書いてみました 🙂
接続待ちを IPv4 と IPv6 で平行して行う必要があるので、accept は非ブロッキングな BeginAcceptTcpClient を利用するようにしています。また、 接続要求を二箇所で受けることになることから、クライアントへの同時サービス数の制御が「サーバー側接続待ちの別スレッド化の試作」の方法では行えなくなるので、接続要求の受け付けとサービス処理を切り離し、両者をキューを介して結びつけるようにしています。
サーバー側をプロデューサー/コンシューマー パターンを利用して書きなおしたものを投稿したので、興味のある方はどうぞ[2013/04/06]。
プロジェクト・ファイルをダウンロードしたい方は、[BeginAcceptTest001.zip] をダウンロードして下さい.
ちなみに、並行処理の最大数を3にして、4つのクライアントから IPv4 と IPv6 での接続要求を交互に出したときのコンソールへの表示内容は次のようになります。IPv6 アドレスは「fe80::[インターフェースID]」と表記します。
Enter キー押下で Listen を終了します。 2012/02/05 14:51:06 Listen スレッドが開始しました。 2012/02/05 14:51:06 fe80::[インターフェースID] Port[2001]の Listen を開始しました。 2012/02/05 14:51:06 192.168.1.3 Port[2001]の Listen を開始しました。 2012/02/05 14:51:06 ipv6 の接続を待ちます 2012/02/05 14:51:06 ipv4 の接続を待ちます 2012/02/05 14:51:06 接続待ち待機 2012/02/05 14:51:06 キューのセット待ち 2012/02/05 14:51:23 Callback が呼び出されました。 2012/02/05 14:51:23 192.168.1.3 port[2001] クライアントが接続しました。 2012/02/05 14:51:23 client をキューに登録しました。 2012/02/05 14:51:23 キューのセット待ち通過 2012/02/05 14:51:23 接続待機のシグナルをセット 2012/02/05 14:51:23 接続待ち通過 2012/02/05 14:51:23 ipv4 の接続を待ちます 2012/02/05 14:51:23 接続待ち待機 2012/02/05 14:51:23 セマフォ待ち 2012/02/05 14:51:23 セマフォ待ち通過 2012/02/05 14:51:23 client をキューから取り出しました。 task数: 1 2012/02/05 14:51:23 キューのセット待ち 2012/02/05 14:51:23 [S] Server ready 2012/02/05 14:51:23 [C] abc0123あいうえお 2012/02/05 14:51:25 Callback が呼び出されました。 2012/02/05 14:51:25 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。 2012/02/05 14:51:25 client をキューに登録しました。 2012/02/05 14:51:25 接続待機のシグナルをセット 2012/02/05 14:51:25 接続待ち通過 2012/02/05 14:51:25 ipv6 の接続を待ちます 2012/02/05 14:51:25 接続待ち待機 2012/02/05 14:51:25 キューのセット待ち通過 2012/02/05 14:51:25 セマフォ待ち 2012/02/05 14:51:25 セマフォ待ち通過 2012/02/05 14:51:25 client をキューから取り出しました。 task数: 2 2012/02/05 14:51:25 キューのセット待ち 2012/02/05 14:51:25 [S] Server ready 2012/02/05 14:51:25 [C] abc0123あいうえお 2012/02/05 14:51:27 Callback が呼び出されました。 2012/02/05 14:51:27 192.168.1.3 port[2001] クライアントが接続しました。 2012/02/05 14:51:27 client をキューに登録しました。 2012/02/05 14:51:27 接続待機のシグナルをセット 2012/02/05 14:51:27 接続待ち通過 2012/02/05 14:51:27 ipv4 の接続を待ちます 2012/02/05 14:51:27 接続待ち待機 2012/02/05 14:51:27 キューのセット待ち通過 2012/02/05 14:51:27 セマフォ待ち 2012/02/05 14:51:27 セマフォ待ち通過 2012/02/05 14:51:27 client をキューから取り出しました。 task数: 3 2012/02/05 14:51:27 キューのセット待ち 2012/02/05 14:51:27 [S] Server ready 2012/02/05 14:51:27 [C] abc0123あいうえお 2012/02/05 14:51:29 Callback が呼び出されました。 2012/02/05 14:51:29 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。 2012/02/05 14:51:29 client をキューに登録しました。 2012/02/05 14:51:29 接続待機のシグナルをセット 2012/02/05 14:51:29 接続待ち通過 2012/02/05 14:51:29 ipv6 の接続を待ちます 2012/02/05 14:51:29 接続待ち待機 2012/02/05 14:51:29 キューのセット待ち通過 2012/02/05 14:51:29 セマフォ待ち 2012/02/05 14:51:38 [S] abc0123あいうえお 2012/02/05 14:51:38 192.168.1.3 port[2001] クライアントが切断しました。 task数: 2 2012/02/05 14:51:38 セマフォ待ち通過 2012/02/05 14:51:38 client をキューから取り出しました。 task数: 3 2012/02/05 14:51:38 キューのセット待ち 2012/02/05 14:51:38 [S] Server ready 2012/02/05 14:51:38 [C] abc0123あいうえお 2012/02/05 14:51:40 [S] abc0123あいうえお 2012/02/05 14:51:40 fe80::[インターフェースID]%11 port[2001] クライアントが切断しました。 task数: 2 2012/02/05 14:51:42 [S] abc0123あいうえお 2012/02/05 14:51:42 192.168.1.3 port[2001] クライアントが切断しました。 task数: 1 2012/02/05 14:51:53 [S] abc0123あいうえお 2012/02/05 14:51:53 fe80::[インターフェースID]%11 port[2001] クライアントが切断しました。 task数: 0 2012/02/05 14:52:07 サービス終了が要求されました。 2012/02/05 14:52:07 fe80::[インターフェースID] Port[2001]の Listen を終了します。 2012/02/05 14:52:07 192.168.1.3 Port[2001]の Listen を終了します。 2012/02/05 14:52:07 キューのセット待ち通過 2012/02/05 14:52:07 セマフォ待ち 2012/02/05 14:52:07 セマフォ待ち通過 2012/02/05 14:52:07 子タスク・スレッドの終了を検知しました。 2012/02/05 14:52:07 Callback が呼び出されました。 2012/02/05 14:52:07 接続待機のシグナルをセット 2012/02/05 14:52:07 Callback が呼び出されました。 2012/02/05 14:52:07 接続待機のシグナルをセット 2012/02/05 14:52:07 接続待ち通過 2012/02/05 14:52:07 Accept 終了待ち待機 2012/02/05 14:52:07 Accept 終了待ち通過 2012/02/05 14:52:07 ListenEnded セット 2012/02/05 14:52:07 TakeQueue 終了待ち待機 2012/02/05 14:52:07 TakeQueue 終了待ち通過 2012/02/05 14:52:07 Listen スレッドの 終了を検知しました。 Enter キー押下で終了します。
クライアント1の表示
>BeginAcceptTest001.Client.exe ipv4 2012/02/05 14:51:23 サーバー(192.168.1.3 Port[2001])と接続しました。 2012/02/05 14:51:23 [S] Server ready 2012/02/05 14:51:23 [C] abc0123あいうえお 2012/02/05 14:51:38 [S] abc0123あいうえお 2012/02/05 14:51:38 切断しました。
クライアント2の表示
>BeginAcceptTest001.Client.exe ipv6 2012/02/05 14:51:25 サーバー(fe80::[インターフェースID] Port[2001])と接続しました。 2012/02/05 14:51:25 [S] Server ready 2012/02/05 14:51:25 [C] abc0123あいうえお 2012/02/05 14:51:40 [S] abc0123あいうえお 2012/02/05 14:51:40 切断しました。
クライアント3の表示
>BeginAcceptTest001.Client.exe ipv4 2012/02/05 14:51:27 サーバー(192.168.1.3 Port[2001])と接続しました。 2012/02/05 14:51:27 [S] Server ready 2012/02/05 14:51:27 [C] abc0123あいうえお 2012/02/05 14:51:42 [S] abc0123あいうえお 2012/02/05 14:51:42 切断しました。
クライアント4の表示
>BeginAcceptTest001.Client.exe ipv6 2012/02/05 14:51:29 サーバー(fe80::c8ce:a09b:8b18:b2a Port[2001])と接続しました。 2012/02/05 14:51:38 [S] Server ready 2012/02/05 14:51:38 [C] abc0123あいうえお 2012/02/05 14:51:53 [S] abc0123あいうえお 2012/02/05 14:51:53 切断しました。
サービス処理のタスクが3つ起動し、1つの接続要求がキューに入っている状態で Enter キーを押下して Listen を終了させたときのコンソールへの表示内容は次のようになります。
Enter キー押下で Listen を終了します。 2012/02/05 16:56:05 Listen スレッドが開始しました。 2012/02/05 16:56:05 キューのセット待ち 2012/02/05 16:56:05 fe80::[インターフェースID] Port[2001]の Listen を開始しました。 2012/02/05 16:56:05 192.168.1.3 Port[2001]の Listen を開始しました。 2012/02/05 16:56:05 ipv6 の接続を待ちます 2012/02/05 16:56:05 ipv4 の接続を待ちます 2012/02/05 16:56:05 接続待ち待機 2012/02/05 16:56:18 Callback が呼び出されました。 2012/02/05 16:56:18 192.168.1.3 port[2001] クライアントが接続しました。 2012/02/05 16:56:18 client をキューに登録しました。 2012/02/05 16:56:18 キューのセット待ち通過 2012/02/05 16:56:18 接続待機のシグナルをセット 2012/02/05 16:56:18 接続待ち通過 2012/02/05 16:56:18 ipv4 の接続を待ちます 2012/02/05 16:56:18 セマフォ待ち 2012/02/05 16:56:18 接続待ち待機 2012/02/05 16:56:18 セマフォ待ち通過 2012/02/05 16:56:18 client をキューから取り出しました。 task数: 1 2012/02/05 16:56:18 キューのセット待ち 2012/02/05 16:56:18 [S] Server ready 2012/02/05 16:56:19 [C] abc0123あいうえお 2012/02/05 16:56:21 Callback が呼び出されました。 2012/02/05 16:56:21 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。 2012/02/05 16:56:21 client をキューに登録しました。 2012/02/05 16:56:21 接続待機のシグナルをセット 2012/02/05 16:56:21 接続待ち通過 2012/02/05 16:56:21 ipv6 の接続を待ちます 2012/02/05 16:56:21 キューのセット待ち通過 2012/02/05 16:56:21 接続待ち待機 2012/02/05 16:56:21 セマフォ待ち 2012/02/05 16:56:21 セマフォ待ち通過 2012/02/05 16:56:21 client をキューから取り出しました。 task数: 2 2012/02/05 16:56:21 キューのセット待ち 2012/02/05 16:56:21 [S] Server ready 2012/02/05 16:56:21 [C] abc0123あいうえお 2012/02/05 16:56:23 Callback が呼び出されました。 2012/02/05 16:56:23 192.168.1.3 port[2001] クライアントが接続しました。 2012/02/05 16:56:23 client をキューに登録しました。 2012/02/05 16:56:23 接続待機のシグナルをセット 2012/02/05 16:56:23 接続待ち通過 2012/02/05 16:56:23 ipv4 の接続を待ちます 2012/02/05 16:56:23 キューのセット待ち通過 2012/02/05 16:56:23 接続待ち待機 2012/02/05 16:56:23 セマフォ待ち 2012/02/05 16:56:23 セマフォ待ち通過 2012/02/05 16:56:23 client をキューから取り出しました。 task数: 3 2012/02/05 16:56:23 キューのセット待ち 2012/02/05 16:56:23 [S] Server ready 2012/02/05 16:56:23 [C] abc0123あいうえお 2012/02/05 16:56:26 Callback が呼び出されました。 2012/02/05 16:56:26 fe80::[インターフェースID]%11 port[2001] クライアントが接続しました。 2012/02/05 16:56:26 client をキューに登録しました。 2012/02/05 16:56:26 接続待機のシグナルをセット 2012/02/05 16:56:26 キューのセット待ち通過 2012/02/05 16:56:26 セマフォ待ち 2012/02/05 16:56:26 接続待ち通過 2012/02/05 16:56:26 ipv6 の接続を待ちます 2012/02/05 16:56:26 接続待ち待機 2012/02/05 16:56:27 サービス終了が要求されました。 2012/02/05 16:56:27 fe80::[インターフェースID] Port[2001]の Listen を終了します。 2012/02/05 16:56:27 Callback が呼び出されました。 2012/02/05 16:56:27 接続待機のシグナルをセット 2012/02/05 16:56:27 接続待ち通過 2012/02/05 16:56:27 192.168.1.3 Port[2001]の Listen を終了します。 2012/02/05 16:56:27 Callback が呼び出されました。 2012/02/05 16:56:27 接続待機のシグナルをセット 2012/02/05 16:56:27 Accept 終了待ち待機 2012/02/05 16:56:27 Accept 終了待ち通過 2012/02/05 16:56:27 TakeQueue 終了待ち待機 2012/02/05 16:56:27 ListenEnded セット 2012/02/05 16:56:28 子タスクをキャンセルしました。 2012/02/05 16:56:28 192.168.1.3 port[2001] クライアントが切断しました。 task数: 2 2012/02/05 16:56:28 セマフォ待ち通過 2012/02/05 16:56:28 fe80::[インターフェースID]%11 からの接続要求を閉じます 2012/02/05 16:56:29 子タスクをキャンセルしました。 2012/02/05 16:56:29 192.168.1.3 port[2001] クライアントが切断しました。 task数: 1 2012/02/05 16:56:31 子タスクをキャンセルしました。 2012/02/05 16:56:31 fe80::[インターフェースID]%11 port[2001] クライアントが切断しました。 task数: 0 2012/02/05 16:56:31 子タスク・スレッドの終了を検知しました。 2012/02/05 16:56:31 TakeQueue 終了待ち通過 2012/02/05 16:56:31 Listen スレッドの 終了を検知しました。 Enter キー押下で終了します。
まずは Main メソッドから。
ServerService クラスの ServiceStart メソッドを使用することでクライアントへのサービスを開始し、ServiceEnd メソッドを使用することでクライアントへのサービスを終了します。
IPv6 アドレスにリンクローカル・アドレスを指定するときには、’%インターフェース番号’付きで指定してください。
//
private const string ipv4Addr = "192.168.1.3";
private const string ipv6Addr = "fe80::[インターフェースID]";
private const int port = 2001;
static void Main(string[] args)
{
try
{
var serverService = new ServerService()
{
Ipv4Addr = ipv4Addr,
Ipv6Addr = ipv6Addr,
Port = port,
Encoding = System.Text.Encoding.UTF8
};
serverService.StartService();
Console.WriteLine("Enter キー押下で Listen を終了します。");
Console.ReadLine();
serverService.StopService();
}
catch (ServerServiceException e)
{
Console.WriteLine(e.Message);
}
Console.WriteLine("Enter キー押下で終了します。");
Console.ReadLine();
}
で、その ServerService クラスです。
プロパティ等やコンストラクタ、StartService メソッド、StopService メソッドです。
//
public string Ipv4Addr { get; set; }
public string Ipv6Addr { get; set; }
public int Port { get; set; }
public System.Text.Encoding Encoding { get; set; }
private System.Net.Sockets.TcpListener v4Listner = null;
private System.Net.Sockets.TcpListener v6Listner = null;
private System.Threading.Tasks.Task taskListen = null;
private volatile bool isServiceEnd = false;
private bool isV4Connected = true;
private bool isV6Connected = true;
private System.Threading.ManualResetEvent clientConnected =
new System.Threading.ManualResetEvent(false);
private System.Threading.ManualResetEvent ListenEnded;
private int acceptWaitCounter = 0;
private object acceptWaitCounterLock = new object();
private Action notifyCancel;
private TakeClientQueue takeQueue;
public ServerService()
{
}
public void StartService()
{
if (string.IsNullOrEmpty(Ipv4Addr))
throw new ServerServiceException("IPv4アドレスが設定されていません。",
ServerServiceException.Errors.IpAddressError);
if (string.IsNullOrEmpty(Ipv6Addr))
throw new ServerServiceException("IPv6アドレスが設定されていません。",
ServerServiceException.Errors.IpAddressError);
if (Port < 1024)
throw new ServerServiceException("ポート番号に予約ポートが設定されています。",
ServerServiceException.Errors.PortOutOfRange);
if (Encoding == null)
throw new ServerServiceException("Encoding がセットされていません。",
ServerServiceException.Errors.EncodingNotSet);
// 指定された IP アドレスが自ホストのものであることを確認する
var hostName = System.Net.Dns.GetHostName();
var addrList = System.Net.Dns.GetHostAddresses(hostName);
try
{
if (!addrList.Contains(System.Net.IPAddress.Parse(Ipv4Addr)))
throw new ServerServiceException("指定されたIPv4アドレスは自ホストのアドレスではありません。",
ServerServiceException.Errors.IpAddressError);
}
catch (FormatException)
{
throw new ServerServiceException("指定されたIPv4アドレスは有効なアドレスではありません。",
ServerServiceException.Errors.IpAddressError);
}
try
{
if (!addrList.Contains(System.Net.IPAddress.Parse(Ipv6Addr)))
throw new ServerServiceException("指定されたIPv6アドレスは自ホストのアドレスではありません。",
ServerServiceException.Errors.IpAddressError);
}
catch
{
throw new ServerServiceException("指定されたIPv6アドレスは有効なアドレスではありません。",
ServerServiceException.Errors.IpAddressError);
}
taskListen = System.Threading.Tasks.Task.Factory.StartNew(() =>
{
startListen();
});
}
public void StopService()
{
Console.WriteLine("{0} サービス終了が要求されました。", DateTime.Now);
ListenEnded = new System.Threading.ManualResetEvent(false); // ListenEnded を非シグナル状態で作成
isServiceEnd = true;
if (notifyCancel != null)
notifyCancel();
endListen();
taskListen.Wait();
Console.WriteLine("{0} Listen スレッドの 終了を検知しました。",
DateTime.Now.ToString());
}
次は startListen メソッドです。IPv4 と IPv6 で接続要求待ちを行い、接続要求が来たら acceptCallback がスケジュールされるようにしています。また、サービス処理を行う TakeClientQueue クラスのインスタンスを生成し、Start メソッドを別スレッドで起動しています。
//
private void startListen()
{
var tokenSource = new System.Threading.CancellationTokenSource();
notifyCancel = new Action(() => tokenSource.Cancel());
var token = tokenSource.Token;
Console.WriteLine("{0} Listen スレッドが開始しました。",
DateTime.Now.ToString());
takeQueue = new TakeClientQueue(token, Encoding);
System.Threading.Tasks.Task takeTask = null;
try
{
takeTask = new System.Threading.Tasks.Task(() => takeQueue.Start(), token);
takeTask.Start();
var ipv6Address = System.Net.IPAddress.Parse(Ipv6Addr);
v6Listner = new System.Net.Sockets.TcpListener(ipv6Address, Port);
v6Listner.Start();
Console.WriteLine("{0} {1} Port[{2}]の Listen を開始しました。",
DateTime.Now.ToString(), ipv6Address.ToString(), Port);
var ipv4Address = System.Net.IPAddress.Parse(Ipv4Addr);
v4Listner = new System.Net.Sockets.TcpListener(ipv4Address, Port);
v4Listner.Start();
Console.WriteLine("{0} {1} Port[{2}]の Listen を開始しました。",
DateTime.Now.ToString(), ipv4Address.ToString(), Port);
while (true)
{
clientConnected.Reset();
if (isServiceEnd) // 終了要求時の対応
break;
if (isV6Connected)
{ // V6接続の Callback が行われたら、新たなV6接続待ちを行う
isV6Connected = false;
Console.WriteLine("{0} ipv6 の接続を待ちます", DateTime.Now.ToString());
lock (acceptWaitCounterLock)
{
acceptWaitCounter++;
}
v6Listner.BeginAcceptTcpClient(new AsyncCallback(acceptCallback), v6Listner);
}
if (isV4Connected)
{ // V4接続の Callback が行われたら、新たなV4接続待ちを行う
isV4Connected = false;
Console.WriteLine("{0} ipv4 の接続を待ちます", DateTime.Now.ToString());
lock (acceptWaitCounterLock)
{
acceptWaitCounter++;
}
v4Listner.BeginAcceptTcpClient(new AsyncCallback(acceptCallback), v4Listner);
}
Console.WriteLine("{0} 接続待ち待機", DateTime.Now);
clientConnected.WaitOne(); // 接続が行われるまで待機する
Console.WriteLine("{0} 接続待ち通過", DateTime.Now);
}
}
finally
{
Console.WriteLine("{0} Accept 終了待ち待機", DateTime.Now);
ListenEnded.WaitOne(); // 別スレッドの Accept 待ちが終了するのを待つ
Console.WriteLine("{0} Accept 終了待ち通過", DateTime.Now);
Console.WriteLine("{0} TakeQueue 終了待ち待機", DateTime.Now);
if (takeTask != null)
takeTask.Wait();
Console.WriteLine("{0} TakeQueue 終了待ち通過", DateTime.Now);
clientConnected.Close();
tokenSource.Dispose();
}
}
次は endListen メソッドです。
//
private void endListen()
{
Console.WriteLine("{0} {1} Port[{2}]の Listen を終了します。",
DateTime.Now.ToString(), Ipv6Addr, Port);
if (v6Listner != null)
v6Listner.Stop();
Console.WriteLine("{0} {1} Port[{2}]の Listen を終了します。",
DateTime.Now.ToString(), Ipv4Addr, Port);
if (v4Listner != null)
v4Listner.Stop();
takeQueue.Stop();
}
次は acceptCallback メソッドです。接続してきたクライアントとの通信を行う TcpClient を取得して、キューにセットします。
//
private void acceptCallback(IAsyncResult ar)
{
Console.WriteLine("{0} Callback が呼び出されました。", DateTime.Now);
if (!isServiceEnd)
{
if (ar.IsCompleted)
{
var listner = (System.Net.Sockets.TcpListener)ar.AsyncState;
var client = listner.EndAcceptTcpClient(ar);
Console.WriteLine("{0} {1} port[{2}] クライアントが接続しました。",
DateTime.Now.ToString(),
((System.Net.IPEndPoint)listner.LocalEndpoint).Address.ToString(),
((System.Net.IPEndPoint)listner.LocalEndpoint).Port.ToString());
// client をキューにセット
takeQueue.AddQueue(client);
if (((System.Net.IPEndPoint)listner.LocalEndpoint).Address.AddressFamily ==
System.Net.Sockets.AddressFamily.InterNetwork)
{
isV4Connected = true;
}
else
{
isV6Connected = true;
}
}
}
lock (acceptWaitCounterLock)
{
acceptWaitCounter--;
}
Console.WriteLine("{0} 接続待機のシグナルをセット", DateTime.Now);
clientConnected.Set(); // 接続待機をシグナル状態にする
checkEndListen();
}
次は checkEndListen メソッドです。
//
private void checkEndListen()
{
if (isServiceEnd && acceptWaitCounter <= 0)
{
ListenEnded.Set(); // 終了時に Accept 待ちがなくなったらシグナル状態にする
Console.WriteLine("{0} ListenEnded セット", DateTime.Now);
}
}
そして、サービス処理を行う TakeClientQueue クラスです。クラスのアクセス修飾子は internal にしています。
プロパティ等やコンストラクタです。
//
internal class TakeClientQueue
{
internal System.Text.Encoding Encoding { get; private set; }
private const int LIMIT_CONCURRENCY = 3;
private Queue<System.Net.Sockets.TcpClient> clientQueue;
private object queueLock = new object();
private System.Threading.ManualResetEvent takenSignal;
private System.Threading.CancellationToken cancelToken;
private bool isStart;
internal TakeClientQueue(System.Threading.CancellationToken token, System.Text.Encoding enc)
{
cancelToken = token;
Encoding = enc;
isStart = false;
}
次は AddQueue メソッドです。TcpClient をキューにセットして、checkQueue メソッドを呼び出します。
//
internal void AddQueue(System.Net.Sockets.TcpClient client)
{
if (!isStart)
throw new InvalidOperationException(
"AddQueue メソッドを呼び出すことはできません(Start メソッドが呼び出されていないか、終了が要求されています)。");
lock (queueLock)
{
clientQueue.Enqueue(client);
Console.WriteLine("{0} client をキューに登録しました。", DateTime.Now);
}
checkQueue();
}
次は Start メソッドです。このメソッドの中のループでキューからの取り出しと別スレッドでのサービス処理をコントロールしています。
//
internal void Start()
{
System.Threading.Semaphore pool = null;
List<System.Threading.Tasks.Task> taskList = null;
object taskListLock = new object();
try
{
clientQueue = new Queue<System.Net.Sockets.TcpClient>();
takenSignal = new System.Threading.ManualResetEvent(false); // 非シグナル状態で作成
pool = new System.Threading.Semaphore(LIMIT_CONCURRENCY, LIMIT_CONCURRENCY);
taskList = new List<System.Threading.Tasks.Task>();
isStart = true;
while (true)
{
Console.WriteLine("{0} キューのセット待ち", DateTime.Now);
takenSignal.WaitOne(); // キューにセットされるまで待機する
Console.WriteLine("{0} キューのセット待ち通過", DateTime.Now);
Console.WriteLine("{0} セマフォ待ち", DateTime.Now);
pool.WaitOne(); // セマフォ待機(taskの並行処理数)
Console.WriteLine("{0} セマフォ待ち通過", DateTime.Now);
if (cancelToken.IsCancellationRequested) // 終了要求時の対応
break;
// 別スレッドで doProcess() を動かすようにセット
System.Net.Sockets.TcpClient client;
lock (queueLock)
{
client = clientQueue.Dequeue(); // キューから取り出し
Console.WriteLine("{0} client をキューから取り出しました。", DateTime.Now);
}
var task = new System.Threading.Tasks.Task(() => doProcess(client, cancelToken), cancelToken);
lock (taskListLock)
{
taskList.Add(task);
Console.WriteLine("task数: {0}", taskList.Count);
}
//スレッド終了時に TcpClient のクローズとセマフォの開放を行うようにセット
task.ContinueWith((t) =>
{
client.Close();
//if (!isServiceEnd)
pool.Release();
lock (taskListLock)
{
taskList.Remove(task);
Console.WriteLine("task数: {0}", taskList.Count);
}
});
task.Start();
checkQueue(); // キューの状態をチェックしてシグナルをセットする
}
}
finally
{
lock (queueLock)
{
while (clientQueue.Count != 0)
{ // キューに接続要求が残っていたら、後始末
var client = clientQueue.Dequeue();
Console.WriteLine("{0} {1} からの接続要求を閉じます", DateTime.Now,
((System.Net.IPEndPoint)client.Client.RemoteEndPoint).Address);
client.Close(); // キューにセットされている接続要求をクローズ
}
}
System.Threading.Tasks.Task[] tasks;
lock (taskListLock)
{
tasks = taskList.ToArray();
}
System.Threading.Tasks.Task.WaitAll(tasks);
Console.WriteLine("{0} 子タスク・スレッドの終了を検知しました。",
DateTime.Now.ToString());
if (pool != null)
pool.Close();
if (takenSignal != null)
{
takenSignal.Dispose();
takenSignal = null;
}
}
}
次は Stop メソッドです。
//
internal void Stop()
{
if (!isStart)
throw new InvalidOperationException("Start メソッドより前に Stop メソッドを呼び出すことはできません。");
takenSignal.Set();
isStart = false;
}
次は checkQueue メソッドです。
//
private void checkQueue()
{
lock (queueLock)
{
// キャンセル要求が来ているか、キューが空でなかったらシグナル状態へ
if (cancelToken.IsCancellationRequested || clientQueue.Count != 0)
takenSignal.Set();
else
takenSignal.Reset();
}
}
次は doProcess メソッドです。
//
private void doProcess(System.Net.Sockets.TcpClient client, System.Threading.CancellationToken token)
{
System.Net.Sockets.NetworkStream stream = null;
try
{
// ネットワークストリームを取得
stream = client.GetStream();
sendData(stream, "Server ready");
var rString = receiveData(stream);
token.ThrowIfCancellationRequested(); // キャンセル通知の確認 & キャンセル実行
// サーバー側処理の代わりに15秒待つ
//(並行処理の様子の観察に都合が良い)
System.Threading.Thread.Sleep(5000);
token.ThrowIfCancellationRequested(); // キャンセル通知の確認 & キャンセル実行
System.Threading.Thread.Sleep(5000);
token.ThrowIfCancellationRequested(); // キャンセル通知の確認 & キャンセル実行
System.Threading.Thread.Sleep(5000);
token.ThrowIfCancellationRequested(); // キャンセル通知の確認 & キャンセル実行
sendData(stream, rString);
}
catch (System.IO.IOException e)
{
if (e.InnerException.GetType() == typeof(System.Net.Sockets.SocketException) &&
(e.InnerException as System.Net.Sockets.SocketException).ErrorCode == 10054)
Console.WriteLine("{0} クライアントが切断していたためデータを送信できませんでした。",
DateTime.Now.ToString());
else
throw;
}
catch (OperationCanceledException)
{ // キャンセル実施をキャッチ
Console.WriteLine("{0} 子タスクをキャンセルしました。", DateTime.Now.ToString());
}
finally
{
if (stream != null)
{
Console.WriteLine("{0} {1} port[{2}] クライアントが切断しました。",
DateTime.Now.ToString(),
((System.Net.IPEndPoint)client.Client.LocalEndPoint).Address.ToString(),
((System.Net.IPEndPoint)client.Client.LocalEndPoint).Port.ToString());
stream.Close();
}
}
}
次は receiveData メソッドと sendData メソッドです。
//
private string receiveData(System.IO.Stream stream)
{
var receiveMessage = "";
using (var memStream = new System.IO.MemoryStream())
{
var rBuff = new byte[256];
int rSize;
do
{
// データの一部を受信する
rSize = stream.Read(rBuff, 0, rBuff.Length);
// rSize が 0 のときにはクライアントが切断したと判断
if (rSize == 0)
{
Console.WriteLine("クライアントが切断しました。");
throw new ServerServiceException("ストリームの読み出しの際、クライアントが切断していました。",
ServerServiceException.Errors.ConnectionClose);
}
// 受信したデータを蓄積する
memStream.Write(rBuff, 0, rSize);
} while ((stream as System.Net.Sockets.NetworkStream).DataAvailable);
// 受信したデータを文字列に変換
receiveMessage = Encoding.GetString(memStream.ToArray());
memStream.Close();
}
Console.WriteLine("{0} [C] {1}", DateTime.Now.ToString(), receiveMessage);
return receiveMessage;
}
private void sendData(System.IO.Stream stream, string str)
{
// 文字列をバイト配列へ
var data = Encoding.GetBytes(str);
stream.Write(data, 0, data.Length);
Console.WriteLine("{0} [S] {1}", DateTime.Now.ToString(), str);
}
そして、ServerServiceException クラスです。
//
class ServerServiceException : ApplicationException
{
public enum Errors
{
IpAddressError = 0, PortOutOfRange, EncodingNotSet, ConnectionClose
}
public int ErrorCode { get; private set; }
public ServerServiceException(string message, Errors error)
: base(message)
{
ErrorCode = (int)error;
}
}
クライアント側の変更は IPv4 で接続するか、IPv6 で接続するかくらいなので、ソースを見たい方はプロジェクト・ファイルをダウンロードして見てください 😉
「IPv4 と IPv6 で接続待ちを行う」への2件のフィードバック