• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

src/SocketSet.cpp

00001 /*
00002  *   This file is part of the Standard Portable Library (SPL).
00003  *
00004  *   SPL is free software: you can redistribute it and/or modify
00005  *   it under the terms of the GNU General Public License as published by
00006  *   the Free Software Foundation, either version 3 of the License, or
00007  *   (at your option) any later version.
00008  *
00009  *   SPL is distributed in the hope that it will be useful,
00010  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *   GNU General Public License for more details.
00013  *
00014  *   You should have received a copy of the GNU General Public License
00015  *   along with SPL.  If not, see <http://www.gnu.org/licenses/>.
00016  */
00017 /* on some platforms, threads overrides IO routines */
00018 #include <spl/threading/Thread.h>
00019 #include <spl/net/SocketSet.h>
00020 #include <spl/Log.h>
00021 
00022 
00023 ISocketService::~ISocketService()
00024 {
00025 }
00026 
00027 SocketListenerPair::SocketListenerPair(IStreamReadListenerPtr listener, TcpSocketPtr sp)
00028 : m_listener(listener), m_sp(sp)
00029 {
00030 }
00031 
00032 SocketSet::SocketSet(  )
00033 : m_vread(), m_svreadMutex(), m_buf(SOCKBUF_SIZE)
00034 {
00035         m_to = new struct timeval;
00036         m_to->tv_sec = 5;
00037         m_to->tv_usec = 0;
00038 
00039         FD_ZERO(&m_read);
00040         //FD_ZERO(&m_write);
00041         FD_ZERO(&m_excpt);
00042 
00043         // Poll every ten seconds, just in case
00044         m_sockAddedEvent = new Event( 10 * 1000 );
00045         
00046         Start();
00047 }
00048 
00049 SocketSet::~SocketSet()
00050 {
00051         Close();
00052         if ( NULL != m_to )
00053         {
00054                 delete m_to;
00055         }
00056         delete m_sockAddedEvent;
00057 }
00058 
00059 void SocketSet::Join(int timeoutms)
00060 {
00061         Thread::Join(timeoutms);
00062 }
00063 
00064 void SocketSet::Join()
00065 {
00066         Thread::Join();
00067 }
00068 
00069 int SocketSet::SocketCount() const
00070 { 
00071         return m_vread.Count();
00072 }
00073 
00074 void SocketSet::AddSocket( IStreamReadListenerPtr listener, TcpSocketPtr sp )
00075 {
00076         sp.ValidateMem();
00077 
00078         SocketListenerPair *pair = new SocketListenerPair(listener, sp);
00079 
00080         m_svreadMutex.Lock();
00081         m_vread.Add( pair );
00082 
00083         m_svreadMutex.Unlock();
00084 
00085         m_sockAddedEvent->Notify();
00086 }
00087 
00088 bool SocketSet::RemoveSocket( TcpSocket& sp )
00089 {
00090         sp.ValidateMem();
00091         ValidateMem();
00092 
00093         sp.Close();
00094 
00095         bool removed = false;
00096         int count = m_vread.Count();
00097         for ( int x = 0; x < count; x++ )
00098         {
00099                 if ( *m_vread.ElementAt(x)->m_sp == sp )
00100                 {
00101                         SocketListenerPair *pair = m_vread.RemoveAt( x );
00102                         if ( NULL == pair )
00103                         {
00104                                 throw new Exception("Internal error with Vector::RemoveAt");
00105                         }
00106                         ASSERT_MEM(pair, sizeof(SocketListenerPair));
00107                         pair->m_listener.ValidateMem();
00108                         ASSERT(*pair->m_sp == sp);
00109                         delete pair;
00110                         removed = true;
00111                         break;
00112                 }
00113         }
00114 
00115         ASSERT(removed);
00116         if ( !removed )
00117         {
00118                 return false;
00119         }
00120         /*
00121         m_swrite.Remove( (int)sp->getSocket() );
00122         count = m_vwrite.Count();
00123         for ( int x = 0; x < count; x++ )
00124         {
00125                 if ( m_vwrite.ElementAt(x) == sp )
00126                 {
00127                         m_vwrite.RemoveAt( x );
00128                         break;
00129                 }
00130         }*/
00131         //FD_CLR( sp->m_sock.GetFD(), &m_read );
00132         //FD_CLR( sp->m_sock.GetFD(), &m_write );
00133         //FD_CLR( sp->m_sock.GetFD(), &m_excpt );
00134 
00135         return true;
00136 }
00137 
00138 void SocketSet::WaitForIO(  ) 
00139 {
00140         //
00141         // should put copy of sockets in a vector
00142         //
00143         int x;
00144 
00145         m_svreadMutex.Lock();
00146 
00147         int readcount = m_vread.Count();
00148 
00149         FD_ZERO( &m_read );
00150         FD_ZERO( &m_excpt );
00151 
00152         for ( x = 0; x < readcount; x++ )
00153         {
00154                 TcpSocketPtr sp = m_vread.ElementAt(x)->m_sp;
00155                 sp.ValidateMem();
00156 
00157                 FD_SET( sp->m_sock->GetFD(), &m_read );
00158                 FD_SET( sp->m_sock->GetFD(), &m_excpt );
00159         }
00160         m_svreadMutex.Unlock();
00161 
00162         if ( readcount == 0 )
00163         {
00164                 return;
00165         }
00166 
00167         int count = select( FD_SETSIZE, &m_read, NULL, &m_excpt, m_to );
00168         if ( count < 0 )
00169         {
00170 #ifdef _WIN32
00171                 //int error = WSAGetLastError();
00172 #else
00173                 //int error = errno;
00174 #endif
00175                 //Log::WriteError("select error=%d readcount=%d", error, readcount);
00176 
00177                 for ( x = 0; x < readcount; x++ )
00178                 {
00179                         TcpSocketPtr sp = m_vread.ElementAt(x)->m_sp;
00180                         if ( sp->GetErrorCode() != 0 || sp->IsClosed() )
00181                         {
00182                                 RemoveSocket(sp);
00183                                 readcount--;
00184                                 x--;
00185                         }
00186                 }
00187                 return;
00188         }
00189         else if ( count == 0 )
00190         {
00191                 return;
00192         }
00193         if ( ! m_running )
00194         {
00195                 return;
00196         }
00197 
00198         m_svreadMutex.Lock();
00199                 
00200         for ( x = 0; x < readcount && count > 0 && m_running; x++ )
00201         {
00202                 if ( x >= m_vread.Count() )
00203                 {
00204                         break;
00205                 }
00206 
00207                 SocketListenerPair *sp = m_vread.ElementAt(x);
00208                 ASSERT_MEM( sp, sizeof( SocketListenerPair ) );
00209                 sp->m_sp.ValidateMem();
00210 
00211                 try
00212                 {
00213                         if ( FD_ISSET( sp->m_sp->m_sock->GetFD(), &m_read ) )
00214                         {
00215                                 count--;
00216                                 if ( sp->m_sp->IsClosed() )
00217                                 {
00218                                         // socket closed.  If zero, socket is shutdown.
00219                                         sp->m_listener->IStreamRead_OnClose();
00220                                         RemoveSocket( sp->m_sp );
00221                                         readcount--;
00222                                         continue;
00223                                 }
00224                                 int bytes = sp->m_sp->m_sock->GetBytesAvail();
00225                                 if (0 == bytes)
00226                                 {
00227                                         // Sockets from .net don't seem to close correctly.
00228                                         readcount--;
00229                                         sp->m_listener->IStreamRead_OnError( "select() indicates read, but no data available (.net socket?).  Closing socket." );
00230                                         sp->m_listener->IStreamRead_OnClose();
00231                                         RemoveSocket( sp->m_sp );
00232                                         continue;
00233                                 }
00234                                 while ( bytes > 0 )
00235                                 {
00236                                         if ( ! m_running )
00237                                         {
00238                                                 m_svreadMutex.Unlock();
00239                                                 return;
00240                                         }
00241                                         bytes = sp->m_sp->GetStream()->Read(m_buf, 0, SOCKBUF_SIZE);
00242                                         if ( 0 > bytes )
00243                                         {
00244                                                 // socket closed
00245                                                 sp->m_listener->IStreamRead_OnClose();
00246                                                 RemoveSocket( sp->m_sp );
00247                                                 readcount--;
00248                                                 break;
00249                                         }
00250                                         ASSERT(sp->m_listener.IsNotNull());
00251                                         sp->m_listener->IStreamRead_OnRead( m_buf, bytes );
00252 
00253                                         bytes = (sp->m_sp->m_sock->IsClosed()) ? -1 : sp->m_sp->m_sock->GetBytesAvail();
00254                                 }
00255                         }
00256                         else if ( FD_ISSET( sp->m_sp->m_sock->GetFD(), &m_excpt ) )
00257                         {
00258                                 count--;
00259                                 readcount--;
00260                                 sp->m_listener->IStreamRead_OnError( "Socket exception set" );
00261                                 sp->m_listener->IStreamRead_OnClose();
00262                                 RemoveSocket( sp->m_sp );
00263                         }
00264                 }
00265                 catch ( SocketException *se )
00266                 {
00267                         ASSERT_MEM( sp, sizeof(SocketListenerPair) );
00268                         sp->m_sp.ValidateMem();
00269                         sp->m_listener->IStreamRead_OnError( se->Message() );
00270                         sp->m_listener->IStreamRead_OnClose();
00271                         RemoveSocket( sp->m_sp );
00272                         delete se;
00273                 }
00274                 catch (Exception *ex)
00275                 {
00276                         Log::SWrite(ex);
00277                         ASSERT_MEM( sp, sizeof(SocketListenerPair) );
00278                         sp->m_sp.ValidateMem();
00279                         sp->m_listener->IStreamRead_OnError( ex->Message() );
00280                         sp->m_listener->IStreamRead_OnClose();
00281                         RemoveSocket( sp->m_sp );
00282                         delete ex;
00283                         Thread::YYield();
00284                 }
00285         }
00286 
00287         m_svreadMutex.Unlock();
00288 }
00289 
00290 void SocketSet::Close()
00291 {
00292         if ( ! m_running )
00293         {
00294                 return;
00295         }
00296         m_running = false;
00297         m_sockAddedEvent->Notify();
00298 
00299         m_svreadMutex.Lock();
00300         m_vread.ValidateMem();
00301 
00302         int count = m_vread.Count();
00303 
00304         for ( int x = 0; x < count; x++ )
00305         {
00306                 SocketListenerPair *sp = m_vread.ElementAt(x);
00307                 ASSERT_MEM( sp, sizeof( SocketListenerPair ) );
00308                 sp->m_sp->ValidateMem();
00309                 if ( sp->m_sp.IsNotNull() )
00310                 {
00311                         sp->m_sp->Close();
00312                 }
00313                 delete sp;
00314                 m_vread.ValidateMem();
00315         }
00316         m_vread.ValidateMem();
00317         m_vread.Clear();
00318         ASSERT( 0 == m_vread.Count() );
00319 
00320         //m_swrite.Clear();
00321         //m_vwrite.Clear();
00322 
00323         m_svreadMutex.Unlock();
00324 }
00325 
00326 void SocketSet::CloseAndDelete()
00327 {
00328         if ( ! m_running )
00329         {
00330                 return;
00331         }
00332         m_running = false;
00333 
00334         m_sockAddedEvent->Notify();
00335         m_svreadMutex.Lock();
00336 
00337         int count = m_vread.Count();
00338 
00339         for ( int x = 0; x < count; x++ )
00340         {
00341                 SocketListenerPair *sp = m_vread.ElementAt(x);
00342                 ASSERT_MEM( sp, sizeof( SocketListenerPair ) );
00343                 sp->m_sp->ValidateMem();
00344                 if ( sp->m_sp.IsNotNull() )
00345                 {
00346                         sp->m_sp->Close();
00347                 }
00348                 delete sp;
00349         }
00350         m_vread.Clear();
00351 
00352         m_svreadMutex.Unlock();
00353 }
00354 
00355 void SocketSet::Run()
00356 {
00357         m_running = true;
00358 
00359         while ( m_running )
00360         {
00361                 try
00362                 {
00363                         if ( SocketCount() == 0 )
00364                         {
00365                                 m_sockAddedEvent->Wait();
00366                         }
00367                         WaitForIO();
00368                 }
00369                 catch ( OutOfMemoryException mex )
00370                 {
00371                         Log::SWrite(mex);
00372                 }
00373                 catch ( Exception *ex )
00374                 {
00375                         Log::SWrite(ex);
00376                         delete ex;
00377                         Thread::YYield();
00378                 }
00379         }
00380 }
00381 
00382 void SocketSet::Broadcast( const Array<byte>& buf, const int len )
00383 {
00384         m_svreadMutex.Lock();
00385         int count = m_vread.Count();
00386         for ( int x = 0; x < count; x++ )
00387         {
00388                 SocketListenerPair *sp = m_vread.ElementAt(x);
00389                 if ( NULL != sp )
00390                 {
00391                         sp->m_sp->GetStream()->Write( buf, 0, len );
00392                 }
00393         }
00394         m_svreadMutex.Unlock();
00395 }
00396 
00397 #if defined(DEBUG) || defined(_DEBUG)
00398 void SocketSet::CheckMem() const
00399 {
00400         //m_svreadMutex.Lock();
00401 
00402         m_buf.CheckMem();
00403 
00404         if ( NULL != m_to )
00405         {
00406                 DEBUG_NOTE_MEM_ALLOCATION(m_to);
00407         }
00408 
00409         m_vread.CheckMem();
00410         int count = m_vread.Count();
00411         for ( int x = 0; x < count; x++ )
00412         {
00413                 SocketListenerPair *sp = m_vread.ElementAt(x);
00414                 if ( NULL != sp )
00415                 {
00416                         DEBUG_NOTE_MEM_ALLOCATION( sp );
00417                         sp->m_sp.CheckMem();
00418                 }
00419         }
00420         /*
00421         m_swrite.CheckMem();
00422         m_vwrite.CheckMem();
00423         count = m_vwrite.Count();
00424         for ( int x = 0; x < count; x++ )
00425         {
00426                 Socket *sp = m_vwrite.ElementAt(x);
00427                 if ( NULL != sp )
00428                 {
00429                         DEBUG_NOTE_MEM_ALLOCATION( sp );
00430                         sp->CheckMem();
00431                 }
00432         }*/
00433         DEBUG_NOTE_MEM_ALLOCATION(m_sockAddedEvent);
00434 
00435         //m_svreadMutex.Unlock();
00436 }
00437 
00438 void SocketSet::ValidateMem() const
00439 {
00440         //m_svreadMutex.Lock();
00441 
00442         m_buf.ValidateMem();
00443 
00444         if ( NULL != m_to )
00445         {
00446                 ASSERT_MEM(m_to, sizeof(struct timeval));
00447         }
00448         m_vread.ValidateMem();
00449         int count = m_vread.Count();
00450 
00451         for ( int x = 0; x < count; x++ )
00452         {
00453                 SocketListenerPair *sp = m_vread.ElementAt(x);
00454                 if ( NULL != sp )
00455                 {
00456                         ASSERT_MEM( sp, sizeof(SocketListenerPair) );
00457                         sp->m_sp.ValidateMem();
00458                 }
00459         }
00460         /*
00461         m_swrite.ValidateMem();
00462         m_vwrite.ValidateMem();
00463         count = m_vwrite.Count();
00464         for ( int x = 0; x < count; x++ )
00465         {
00466                 Socket *sp = m_vwrite.ElementAt(x);
00467                 if ( NULL != sp )
00468                 {
00469                         ASSERT_MEM( sp, sizeof(Socket) );
00470                         sp->ValidateMem();
00471                 }
00472         }*/
00473         ASSERT_MEM(m_sockAddedEvent, sizeof(Event));
00474 
00475         //m_svreadMutex.Unlock();
00476 }
00477 #endif
00478 
00479 IServerConnectionFactory::~IServerConnectionFactory()
00480 {
00481 }
00482 
00483 SocketSetServer::SocketSetServer
00484 (
00485         IServerConnectionFactory *conFactory, 
00486         int serverPort
00487 )
00488 : m_ss(), m_listener(serverPort), m_conFactory(conFactory)
00489 {
00490         m_listener.Delegates().Add(this);
00491 }
00492 
00493 SocketSetServer::~SocketSetServer()
00494 {
00495         m_listener.Stop();
00496 }
00497 
00498 void SocketSetServer::AddSocket( IStreamReadListenerPtr listener, TcpSocketPtr sp ) 
00499 { 
00500         m_ss.AddSocket(listener, sp); 
00501 }
00502 
00503 void SocketSetServer::Close() 
00504 { 
00505         m_listener.Stop(); m_ss.Close(); 
00506 }
00507 
00508 void SocketSetServer::CloseAndDelete() 
00509 { 
00510         m_listener.Stop(); 
00511         m_ss.CloseAndDelete(); 
00512 }
00513 
00514 int SocketSetServer::SocketCount() const
00515 { 
00516         return m_ss.SocketCount(); 
00517 }
00518 
00519 void SocketSetServer::Broadcast( const Array<byte>& buf, const int len ) 
00520 { 
00521         m_ss.Broadcast(buf, len); 
00522 }
00523 
00524 void SocketSetServer::Join(int timeoutms) 
00525 { 
00526         m_ss.Join(timeoutms); 
00527 }
00528 
00529 void SocketSetServer::Join()
00530 {
00531         m_ss.Join();
00532 }
00533 
00534 void SocketSetServer::IPortListener_OnConnect( TcpSocketPtr sock )
00535 {
00536         m_ss.AddSocket(m_conFactory->Create(sock), sock);
00537 }
00538 
00539 void SocketSetServer::IPortListener_OnStop()
00540 {
00541         m_ss.CloseAndDelete();
00542 }
00543 
00544 #if defined(DEBUG) || defined(_DEBUG)
00545 void SocketSetServer::CheckMem() const
00546 {
00547         m_ss.CheckMem();
00548         m_listener.CheckMem();
00549 }
00550 
00551 void SocketSetServer::ValidateMem() const
00552 {
00553         m_ss.ValidateMem();
00554         m_listener.ValidateMem();
00555 }
00556 #endif
00557