00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <spl/threading/Thread.h>
00019 #include <spl/net/SocketSet.h>
00020 #include <spl/Log.h>
00021
00022
00023 ISocketService::~ISocketService()
00024 {
00025 }
00026
00027 SocketListenerPair::SocketListenerPair(IStreamReadListenerPtr listener, TcpSocketPtr sp)
00028 : m_listener(listener), m_sp(sp)
00029 {
00030 }
00031
00032 SocketSet::SocketSet( )
00033 : m_vread(), m_svreadMutex(), m_buf(SOCKBUF_SIZE)
00034 {
00035 m_to = new struct timeval;
00036 m_to->tv_sec = 5;
00037 m_to->tv_usec = 0;
00038
00039 FD_ZERO(&m_read);
00040
00041 FD_ZERO(&m_excpt);
00042
00043
00044 m_sockAddedEvent = new Event( 10 * 1000 );
00045
00046 Start();
00047 }
00048
00049 SocketSet::~SocketSet()
00050 {
00051 Close();
00052 if ( NULL != m_to )
00053 {
00054 delete m_to;
00055 }
00056 delete m_sockAddedEvent;
00057 }
00058
00059 void SocketSet::Join(int timeoutms)
00060 {
00061 Thread::Join(timeoutms);
00062 }
00063
00064 void SocketSet::Join()
00065 {
00066 Thread::Join();
00067 }
00068
00069 int SocketSet::SocketCount() const
00070 {
00071 return m_vread.Count();
00072 }
00073
00074 void SocketSet::AddSocket( IStreamReadListenerPtr listener, TcpSocketPtr sp )
00075 {
00076 sp.ValidateMem();
00077
00078 SocketListenerPair *pair = new SocketListenerPair(listener, sp);
00079
00080 m_svreadMutex.Lock();
00081 m_vread.Add( pair );
00082
00083 m_svreadMutex.Unlock();
00084
00085 m_sockAddedEvent->Notify();
00086 }
00087
00088 bool SocketSet::RemoveSocket( TcpSocket& sp )
00089 {
00090 sp.ValidateMem();
00091 ValidateMem();
00092
00093 sp.Close();
00094
00095 bool removed = false;
00096 int count = m_vread.Count();
00097 for ( int x = 0; x < count; x++ )
00098 {
00099 if ( *m_vread.ElementAt(x)->m_sp == sp )
00100 {
00101 SocketListenerPair *pair = m_vread.RemoveAt( x );
00102 if ( NULL == pair )
00103 {
00104 throw new Exception("Internal error with Vector::RemoveAt");
00105 }
00106 ASSERT_MEM(pair, sizeof(SocketListenerPair));
00107 pair->m_listener.ValidateMem();
00108 ASSERT(*pair->m_sp == sp);
00109 delete pair;
00110 removed = true;
00111 break;
00112 }
00113 }
00114
00115 ASSERT(removed);
00116 if ( !removed )
00117 {
00118 return false;
00119 }
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 return true;
00136 }
00137
00138 void SocketSet::WaitForIO( )
00139 {
00140
00141
00142
00143 int x;
00144
00145 m_svreadMutex.Lock();
00146
00147 int readcount = m_vread.Count();
00148
00149 FD_ZERO( &m_read );
00150 FD_ZERO( &m_excpt );
00151
00152 for ( x = 0; x < readcount; x++ )
00153 {
00154 TcpSocketPtr sp = m_vread.ElementAt(x)->m_sp;
00155 sp.ValidateMem();
00156
00157 FD_SET( sp->m_sock->GetFD(), &m_read );
00158 FD_SET( sp->m_sock->GetFD(), &m_excpt );
00159 }
00160 m_svreadMutex.Unlock();
00161
00162 if ( readcount == 0 )
00163 {
00164 return;
00165 }
00166
00167 int count = select( FD_SETSIZE, &m_read, NULL, &m_excpt, m_to );
00168 if ( count < 0 )
00169 {
00170 #ifdef _WIN32
00171
00172 #else
00173
00174 #endif
00175
00176
00177 for ( x = 0; x < readcount; x++ )
00178 {
00179 TcpSocketPtr sp = m_vread.ElementAt(x)->m_sp;
00180 if ( sp->GetErrorCode() != 0 || sp->IsClosed() )
00181 {
00182 RemoveSocket(sp);
00183 readcount--;
00184 x--;
00185 }
00186 }
00187 return;
00188 }
00189 else if ( count == 0 )
00190 {
00191 return;
00192 }
00193 if ( ! m_running )
00194 {
00195 return;
00196 }
00197
00198 m_svreadMutex.Lock();
00199
00200 for ( x = 0; x < readcount && count > 0 && m_running; x++ )
00201 {
00202 if ( x >= m_vread.Count() )
00203 {
00204 break;
00205 }
00206
00207 SocketListenerPair *sp = m_vread.ElementAt(x);
00208 ASSERT_MEM( sp, sizeof( SocketListenerPair ) );
00209 sp->m_sp.ValidateMem();
00210
00211 try
00212 {
00213 if ( FD_ISSET( sp->m_sp->m_sock->GetFD(), &m_read ) )
00214 {
00215 count--;
00216 if ( sp->m_sp->IsClosed() )
00217 {
00218
00219 sp->m_listener->IStreamRead_OnClose();
00220 RemoveSocket( sp->m_sp );
00221 readcount--;
00222 continue;
00223 }
00224 int bytes = sp->m_sp->m_sock->GetBytesAvail();
00225 if (0 == bytes)
00226 {
00227
00228 readcount--;
00229 sp->m_listener->IStreamRead_OnError( "select() indicates read, but no data available (.net socket?). Closing socket." );
00230 sp->m_listener->IStreamRead_OnClose();
00231 RemoveSocket( sp->m_sp );
00232 continue;
00233 }
00234 while ( bytes > 0 )
00235 {
00236 if ( ! m_running )
00237 {
00238 m_svreadMutex.Unlock();
00239 return;
00240 }
00241 bytes = sp->m_sp->GetStream()->Read(m_buf, 0, SOCKBUF_SIZE);
00242 if ( 0 > bytes )
00243 {
00244
00245 sp->m_listener->IStreamRead_OnClose();
00246 RemoveSocket( sp->m_sp );
00247 readcount--;
00248 break;
00249 }
00250 ASSERT(sp->m_listener.IsNotNull());
00251 sp->m_listener->IStreamRead_OnRead( m_buf, bytes );
00252
00253 bytes = (sp->m_sp->m_sock->IsClosed()) ? -1 : sp->m_sp->m_sock->GetBytesAvail();
00254 }
00255 }
00256 else if ( FD_ISSET( sp->m_sp->m_sock->GetFD(), &m_excpt ) )
00257 {
00258 count--;
00259 readcount--;
00260 sp->m_listener->IStreamRead_OnError( "Socket exception set" );
00261 sp->m_listener->IStreamRead_OnClose();
00262 RemoveSocket( sp->m_sp );
00263 }
00264 }
00265 catch ( SocketException *se )
00266 {
00267 ASSERT_MEM( sp, sizeof(SocketListenerPair) );
00268 sp->m_sp.ValidateMem();
00269 sp->m_listener->IStreamRead_OnError( se->Message() );
00270 sp->m_listener->IStreamRead_OnClose();
00271 RemoveSocket( sp->m_sp );
00272 delete se;
00273 }
00274 catch (Exception *ex)
00275 {
00276 Log::SWrite(ex);
00277 ASSERT_MEM( sp, sizeof(SocketListenerPair) );
00278 sp->m_sp.ValidateMem();
00279 sp->m_listener->IStreamRead_OnError( ex->Message() );
00280 sp->m_listener->IStreamRead_OnClose();
00281 RemoveSocket( sp->m_sp );
00282 delete ex;
00283 Thread::YYield();
00284 }
00285 }
00286
00287 m_svreadMutex.Unlock();
00288 }
00289
00290 void SocketSet::Close()
00291 {
00292 if ( ! m_running )
00293 {
00294 return;
00295 }
00296 m_running = false;
00297 m_sockAddedEvent->Notify();
00298
00299 m_svreadMutex.Lock();
00300 m_vread.ValidateMem();
00301
00302 int count = m_vread.Count();
00303
00304 for ( int x = 0; x < count; x++ )
00305 {
00306 SocketListenerPair *sp = m_vread.ElementAt(x);
00307 ASSERT_MEM( sp, sizeof( SocketListenerPair ) );
00308 sp->m_sp->ValidateMem();
00309 if ( sp->m_sp.IsNotNull() )
00310 {
00311 sp->m_sp->Close();
00312 }
00313 delete sp;
00314 m_vread.ValidateMem();
00315 }
00316 m_vread.ValidateMem();
00317 m_vread.Clear();
00318 ASSERT( 0 == m_vread.Count() );
00319
00320
00321
00322
00323 m_svreadMutex.Unlock();
00324 }
00325
00326 void SocketSet::CloseAndDelete()
00327 {
00328 if ( ! m_running )
00329 {
00330 return;
00331 }
00332 m_running = false;
00333
00334 m_sockAddedEvent->Notify();
00335 m_svreadMutex.Lock();
00336
00337 int count = m_vread.Count();
00338
00339 for ( int x = 0; x < count; x++ )
00340 {
00341 SocketListenerPair *sp = m_vread.ElementAt(x);
00342 ASSERT_MEM( sp, sizeof( SocketListenerPair ) );
00343 sp->m_sp->ValidateMem();
00344 if ( sp->m_sp.IsNotNull() )
00345 {
00346 sp->m_sp->Close();
00347 }
00348 delete sp;
00349 }
00350 m_vread.Clear();
00351
00352 m_svreadMutex.Unlock();
00353 }
00354
00355 void SocketSet::Run()
00356 {
00357 m_running = true;
00358
00359 while ( m_running )
00360 {
00361 try
00362 {
00363 if ( SocketCount() == 0 )
00364 {
00365 m_sockAddedEvent->Wait();
00366 }
00367 WaitForIO();
00368 }
00369 catch ( OutOfMemoryException mex )
00370 {
00371 Log::SWrite(mex);
00372 }
00373 catch ( Exception *ex )
00374 {
00375 Log::SWrite(ex);
00376 delete ex;
00377 Thread::YYield();
00378 }
00379 }
00380 }
00381
00382 void SocketSet::Broadcast( const Array<byte>& buf, const int len )
00383 {
00384 m_svreadMutex.Lock();
00385 int count = m_vread.Count();
00386 for ( int x = 0; x < count; x++ )
00387 {
00388 SocketListenerPair *sp = m_vread.ElementAt(x);
00389 if ( NULL != sp )
00390 {
00391 sp->m_sp->GetStream()->Write( buf, 0, len );
00392 }
00393 }
00394 m_svreadMutex.Unlock();
00395 }
00396
00397 #if defined(DEBUG) || defined(_DEBUG)
00398 void SocketSet::CheckMem() const
00399 {
00400
00401
00402 m_buf.CheckMem();
00403
00404 if ( NULL != m_to )
00405 {
00406 DEBUG_NOTE_MEM_ALLOCATION(m_to);
00407 }
00408
00409 m_vread.CheckMem();
00410 int count = m_vread.Count();
00411 for ( int x = 0; x < count; x++ )
00412 {
00413 SocketListenerPair *sp = m_vread.ElementAt(x);
00414 if ( NULL != sp )
00415 {
00416 DEBUG_NOTE_MEM_ALLOCATION( sp );
00417 sp->m_sp.CheckMem();
00418 }
00419 }
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433 DEBUG_NOTE_MEM_ALLOCATION(m_sockAddedEvent);
00434
00435
00436 }
00437
00438 void SocketSet::ValidateMem() const
00439 {
00440
00441
00442 m_buf.ValidateMem();
00443
00444 if ( NULL != m_to )
00445 {
00446 ASSERT_MEM(m_to, sizeof(struct timeval));
00447 }
00448 m_vread.ValidateMem();
00449 int count = m_vread.Count();
00450
00451 for ( int x = 0; x < count; x++ )
00452 {
00453 SocketListenerPair *sp = m_vread.ElementAt(x);
00454 if ( NULL != sp )
00455 {
00456 ASSERT_MEM( sp, sizeof(SocketListenerPair) );
00457 sp->m_sp.ValidateMem();
00458 }
00459 }
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473 ASSERT_MEM(m_sockAddedEvent, sizeof(Event));
00474
00475
00476 }
00477 #endif
00478
00479 IServerConnectionFactory::~IServerConnectionFactory()
00480 {
00481 }
00482
00483 SocketSetServer::SocketSetServer
00484 (
00485 IServerConnectionFactory *conFactory,
00486 int serverPort
00487 )
00488 : m_ss(), m_listener(serverPort), m_conFactory(conFactory)
00489 {
00490 m_listener.Delegates().Add(this);
00491 }
00492
00493 SocketSetServer::~SocketSetServer()
00494 {
00495 m_listener.Stop();
00496 }
00497
00498 void SocketSetServer::AddSocket( IStreamReadListenerPtr listener, TcpSocketPtr sp )
00499 {
00500 m_ss.AddSocket(listener, sp);
00501 }
00502
00503 void SocketSetServer::Close()
00504 {
00505 m_listener.Stop(); m_ss.Close();
00506 }
00507
00508 void SocketSetServer::CloseAndDelete()
00509 {
00510 m_listener.Stop();
00511 m_ss.CloseAndDelete();
00512 }
00513
00514 int SocketSetServer::SocketCount() const
00515 {
00516 return m_ss.SocketCount();
00517 }
00518
00519 void SocketSetServer::Broadcast( const Array<byte>& buf, const int len )
00520 {
00521 m_ss.Broadcast(buf, len);
00522 }
00523
00524 void SocketSetServer::Join(int timeoutms)
00525 {
00526 m_ss.Join(timeoutms);
00527 }
00528
00529 void SocketSetServer::Join()
00530 {
00531 m_ss.Join();
00532 }
00533
00534 void SocketSetServer::IPortListener_OnConnect( TcpSocketPtr sock )
00535 {
00536 m_ss.AddSocket(m_conFactory->Create(sock), sock);
00537 }
00538
00539 void SocketSetServer::IPortListener_OnStop()
00540 {
00541 m_ss.CloseAndDelete();
00542 }
00543
00544 #if defined(DEBUG) || defined(_DEBUG)
00545 void SocketSetServer::CheckMem() const
00546 {
00547 m_ss.CheckMem();
00548 m_listener.CheckMem();
00549 }
00550
00551 void SocketSetServer::ValidateMem() const
00552 {
00553 m_ss.ValidateMem();
00554 m_listener.ValidateMem();
00555 }
00556 #endif
00557