router.cc

Go to the documentation of this file.
00001 ///
00002 /// \file       router.cc
00003 ///             Support classes for the pluggable socket routing system.
00004 ///
00005 
00006 /*
00007     Copyright (C) 2008-2011, Net Direct Inc. (http://www.netdirect.ca/)
00008 
00009     This program is free software; you can redistribute it and/or modify
00010     it under the terms of the GNU General Public License as published by
00011     the Free Software Foundation; either version 2 of the License, or
00012     (at your option) any later version.
00013 
00014     This program is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00017 
00018     See the GNU General Public License in the COPYING file at the
00019     root directory of this project for more details.
00020 */
00021 
00022 #include "router.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "protostructs.h"
00026 #include "usbwrap.h"
00027 #include "endian.h"
00028 #include "debug.h"
00029 #include <unistd.h>
00030 
00031 namespace Barry {
00032 
00033 ///////////////////////////////////////////////////////////////////////////////
00034 // SocketDataHandler default methods
00035 
00036 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
00037 {
00038         // Just log the error
00039         eout("SocketDataHandler: Error: " << error.what());
00040         (void) error;
00041 }
00042 
00043 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
00044 {
00045         // Nothing to destroy
00046 }
00047 
00048 ///////////////////////////////////////////////////////////////////////////////
00049 // SocketRoutingQueue constructors
00050 
00051 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count,
00052                                         int default_read_timeout)
00053         : m_dev(0)
00054         , m_writeEp(0)
00055         , m_readEp(0)
00056         , m_interest(false)
00057         , m_seen_usb_error(false)
00058         , m_timeout(default_read_timeout)
00059         , m_continue_reading(false)
00060 {
00061         pthread_mutex_init(&m_mutex, NULL);
00062 
00063         pthread_mutex_init(&m_readwaitMutex, NULL);
00064         pthread_cond_init(&m_readwaitCond, NULL);
00065 
00066         AllocateBuffers(prealloc_buffer_count);
00067 }
00068 
00069 SocketRoutingQueue::~SocketRoutingQueue()
00070 {
00071         // thread running?
00072         if( m_continue_reading ) {
00073                 m_continue_reading = false;
00074                 pthread_join(m_usb_read_thread, NULL);
00075         }
00076 }
00077 
00078 ///////////////////////////////////////////////////////////////////////////////
00079 // protected members
00080 
00081 //
00082 // ReturnBuffer
00083 //
00084 /// Provides a method of returning a buffer to the free queue
00085 /// after processing.  The DataHandle class calls this automatically
00086 /// from its destructor.
00087 void SocketRoutingQueue::ReturnBuffer(Data *buf)
00088 {
00089         // don't need to lock here, since m_free handles its own locking
00090         m_free.push(buf);
00091 }
00092 
00093 //
00094 // SimpleReadThread()
00095 //
00096 /// Convenience thread to handle USB read activity.
00097 ///
00098 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
00099 {
00100         SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
00101 
00102         // read from USB and write to stdout until finished
00103         q->m_seen_usb_error = false;
00104         while( q->m_continue_reading ) {
00105                 try {
00106                         q->DoRead(1000);        // timeout in milliseconds
00107                 }
00108                 catch (std::runtime_error const &e) {
00109                         eout("SimpleReadThread received uncaught exception: " <<  typeid(e).name() << " what: " << e.what());
00110                 }
00111                 catch (...) {
00112                         eout("SimpleReadThread recevied uncaught exception of unknown type");
00113                 }
00114         }
00115         return 0;
00116 }
00117 
00118 
00119 ///////////////////////////////////////////////////////////////////////////////
00120 // public API
00121 
00122 // These functions connect the router to an external Usb::Device
00123 // object.  Normally this is handled automatically by the
00124 // Controller class, but are public here in case they are needed.
00125 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
00126                                         SocketDataHandlerPtr callback)
00127 {
00128         scoped_lock lock(m_mutex);
00129         m_dev = dev;
00130         m_usb_error_dev_callback = callback;
00131         m_writeEp = writeEp;
00132         m_readEp = readEp;
00133 }
00134 
00135 void SocketRoutingQueue::ClearUsbDevice()
00136 {
00137         scoped_lock lock(m_mutex);
00138         m_dev = 0;
00139         m_usb_error_dev_callback.reset();
00140         lock.unlock();
00141 
00142         // wait for the DoRead cycle to finish, so the external
00143         // Usb::Device object doesn't close before we're done with it
00144         scoped_lock wait(m_readwaitMutex);
00145         pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
00146 }
00147 
00148 bool SocketRoutingQueue::UsbDeviceReady()
00149 {
00150         scoped_lock lock(m_mutex);
00151         return m_dev != 0 && !m_seen_usb_error;
00152 }
00153 
00154 //
00155 // AllocateBuffers
00156 //
00157 /// This class starts out with no buffers, and will grow one buffer
00158 /// at a time if needed.  Call this to allocate count buffers
00159 /// all at once and place them on the free queue.  After calling
00160 /// this function, at least count buffers will exist in the free
00161 /// queue.  If there are already count buffers, none will be added.
00162 ///
00163 void SocketRoutingQueue::AllocateBuffers(int count)
00164 {
00165         int todo = count - m_free.size();
00166 
00167         for( int i = 0; i < todo; i++ ) {
00168                 // m_free handles its own locking
00169                 m_free.push( new Data );
00170         }
00171 }
00172 
00173 //
00174 // DefaultRead (both variations)
00175 //
00176 /// Returns the data for the next unregistered socket.
00177 /// Blocks until timeout or data is available.
00178 /// Returns false (or null pointer) on timeout and no data.
00179 /// With the return version of the function, there is no
00180 /// copying performed.
00181 ///
00182 /// This version performs a copy.
00183 ///
00184 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
00185 {
00186         DataHandle buf = DefaultRead(timeout);
00187         if( !buf.get() )
00188                 return false;
00189 
00190         // copy to desired buffer
00191         receive = *buf.get();
00192         return true;
00193 }
00194 
00195 ///
00196 /// This version does not perform a copy.
00197 ///
00198 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
00199 {
00200         if( m_seen_usb_error && timeout == -1 ) {
00201                 // If an error has been seen and not cleared then no
00202                 // more data will be read into the queue by
00203                 // DoRead(). Forcing the timeout to zero allows any
00204                 // data already in the queue to be read, but prevents
00205                 // waiting for data which will never arrive.
00206                 timeout = 0;
00207         }
00208 
00209         // m_default handles its own locking
00210         // Be careful with the queue timeout, since its -1 means "forever"
00211         Data *buf = m_default.wait_pop(timeout == -1 ? m_timeout : timeout);
00212         return DataHandle(*this, buf);
00213 }
00214 
00215 //
00216 // RegisterInterest
00217 //
00218 /// Register an interest in data from a certain socket.  To read
00219 /// from that socket, use the SocketRead() function from then on.
00220 ///
00221 /// Any non-registered socket goes in the default queue
00222 /// and must be read by DefaultRead()
00223 ///
00224 /// If not null, handler is called when new data is read.  It will
00225 /// be called in the same thread instance that DoRead() is called from.
00226 /// Handler is passed the DataQueue Data pointer, and so no
00227 /// copying is done.  Once the handler returns, the data is
00228 /// considered processed and not added to the interested queue,
00229 /// but instead returned to m_free.
00230 ///
00231 /// Throws std::logic_error if already registered.
00232 ///
00233 void SocketRoutingQueue::RegisterInterest(SocketId socket,
00234                                           SocketDataHandlerPtr handler)
00235 {
00236         // modifying our own std::map, need a lock
00237         scoped_lock lock(m_mutex);
00238 
00239         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00240         if( qi != m_socketQueues.end() )
00241                 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
00242 
00243         m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
00244         m_interest = true;
00245 }
00246 
00247 //
00248 // UnregisterInterest
00249 //
00250 /// Unregisters interest in data from the given socket, and discards
00251 /// any existing data in its interest queue.  Any new incoming data
00252 /// for this socket will be placed in the default queue.
00253 ///
00254 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
00255 {
00256         // modifying our own std::map, need a lock
00257         scoped_lock lock(m_mutex);
00258 
00259         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00260         if( qi == m_socketQueues.end() )
00261                 return; // nothing registered, done
00262 
00263         // salvage all our data buffers
00264         m_free.append_from( qi->second->m_queue );
00265 
00266         // remove the QueueEntryPtr from the map
00267         m_socketQueues.erase( qi );
00268 
00269         // check the interest flag
00270         m_interest = m_socketQueues.size() > 0;
00271 }
00272 
00273 //
00274 // SocketRead
00275 //
00276 /// Reads data from the interested socket cache.  Can only read
00277 /// from sockets that have been previously registered.
00278 ///
00279 /// Blocks until timeout or data is available.
00280 ///
00281 /// Returns false (or null pointer) on timeout and no data.
00282 /// With the return version of the function, there is no
00283 /// copying performed.
00284 ///
00285 /// Throws std::logic_error if a socket was requested that was
00286 /// not previously registered.
00287 ///
00288 /// Copying is performed with this function.
00289 ///
00290 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
00291 {
00292         DataHandle buf = SocketRead(socket, timeout);
00293         if( !buf.get() )
00294                 return false;
00295 
00296         // copy to desired buffer
00297         receive = *buf.get();
00298         return true;
00299 }
00300 
00301 ///
00302 /// Copying is not performed with this function.
00303 ///
00304 /// Throws std::logic_error if a socket was requested that was
00305 /// not previously registered.
00306 ///
00307 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
00308 {
00309         QueueEntryPtr qep;
00310         DataQueue *dq = 0;
00311 
00312         // accessing our own std::map, need a lock
00313         {
00314                 scoped_lock lock(m_mutex);
00315                 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00316                 if( qi == m_socketQueues.end() )
00317                         throw std::logic_error("SocketRead requested data from unregistered socket.");
00318 
00319                 // got our queue, save the whole QueueEntryPtr (shared_ptr),
00320                 // and unlock, since we will be waiting on the DataQueue,
00321                 // not the socketQueues map
00322                 //
00323                 // This is safe, since even if UnregisterInterest is called,
00324                 // our pointer won't be deleted until our shared_ptr
00325                 // (QueueEntryPtr) goes out of scope.
00326                 //
00327                 // The remaining problem is that wait_pop() might wait
00328                 // forever if there is no timeout... c'est la vie.
00329                 // Should'a used a timeout. :-)
00330                 qep = qi->second;
00331                 dq = &qep->m_queue;
00332         }
00333 
00334         // get data from DataQueue
00335         // Be careful with the queue timeout, since its -1 means "forever"
00336         Data *buf = dq->wait_pop(timeout == -1 ? m_timeout : timeout);
00337 
00338         // specifically delete our copy of shared pointer, in a locked
00339         // environment
00340         {
00341                 scoped_lock lock(m_mutex);
00342                 qep.reset();
00343         }
00344 
00345         return DataHandle(*this, buf);
00346 }
00347 
00348 // Returns true if data is available for that socket.
00349 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
00350 {
00351         scoped_lock lock(m_mutex);
00352         SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
00353         if( qi == m_socketQueues.end() )
00354                 return false;
00355         return qi->second->m_queue.size() > 0;
00356 }
00357 
00358 //
00359 // DoRead
00360 //
00361 /// Called by the application's "read thread" to read the next usb
00362 /// packet and route it to the correct queue.  Returns after every
00363 /// read, even if a handler is associated with a queue.
00364 /// Note: this function is safe to call before SetUsbDevice() is
00365 /// called... it just doesn't do anything if there is no usb
00366 /// device to work with.
00367 ///
00368 /// Timeout is in milliseconds.
00369 //  This timeout is for the USB subsystem, so no special handling
00370 //  for it is needed... just use usbwrap's default timeout.
00371 void SocketRoutingQueue::DoRead(int timeout)
00372 {
00373         class ReadWaitSignal
00374         {
00375                 pthread_mutex_t &m_Mutex;
00376                 pthread_cond_t &m_Cond;
00377         public:
00378                 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
00379                         : m_Mutex(mut), m_Cond(cond)
00380                         {}
00381                 ~ReadWaitSignal()
00382                 {
00383                         scoped_lock wait(m_Mutex);
00384                         pthread_cond_signal(&m_Cond);
00385                 }
00386         } readwait(m_readwaitMutex, m_readwaitCond);
00387 
00388         Usb::Device * volatile dev = 0;
00389         int readEp;
00390         DataHandle buf(*this, 0);
00391 
00392         // if we are not connected to a USB device yet, just wait
00393         {
00394                 scoped_lock lock(m_mutex);
00395 
00396                 if( !m_dev || m_seen_usb_error ) {
00397                         lock.unlock();  // unlock early, since we're sleeping
00398                         // sleep only a short time, since things could be
00399                         // in the process of setup or teardown
00400                         usleep(125000);
00401                         return;
00402                 }
00403 
00404                 dev = m_dev;
00405                 readEp = m_readEp;
00406 
00407                 // fetch a free buffer
00408                 Data *raw = m_free.pop();
00409                 if( !raw )
00410                         buf = DataHandle(*this, new Data);
00411                 else
00412                         buf = DataHandle(*this, raw);
00413         }
00414 
00415         // take a chance and do the read unlocked, as this has the potential
00416         // for blocking for a while
00417         try {
00418 
00419                 Data &data = *buf.get();
00420 
00421                 if( !dev->BulkRead(readEp, data, timeout) )
00422                         return; // no data, done!
00423 
00424                 MAKE_PACKET(pack, data);
00425 
00426                 // make sure the size is right
00427                 if( data.GetSize() < SB_PACKET_SOCKET_SIZE )
00428                         return; // bad size, just skip
00429 
00430                 // extract the socket from the packet
00431                 uint16_t socket = btohs(pack->socket);
00432 
00433                 // we have data, now lock up again to place it
00434                 // in the right queue
00435                 scoped_lock lock(m_mutex);
00436 
00437                 // search for registration of socket
00438                 if( m_interest ) {
00439                         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00440                         if( qi != m_socketQueues.end() ) {
00441                                 SocketDataHandlerPtr &sdh = qi->second->m_handler;
00442 
00443                                 // is there a handler?
00444                                 if( sdh ) {
00445                                         // unlock & let the handler process it
00446                                         lock.unlock();
00447                                         sdh->DataReceived(*buf.get());
00448                                         return;
00449                                 }
00450                                 else {
00451                                         qi->second->m_queue.push(buf.release());
00452                                         return;
00453                                 }
00454                         }
00455 
00456                         // fall through
00457                 }
00458 
00459                 // safe to unlock now, we are done with the map
00460                 lock.unlock();
00461 
00462                 // if we get here, send to default queue
00463                 m_default.push(buf.release());
00464         }
00465         catch( Usb::Timeout & ) {
00466                 // this is expected... just ignore
00467         }
00468         catch( Usb::Error &ue ) {
00469                 // set the flag first, in case any of the handlers
00470                 // are able to recover from this error
00471                 m_seen_usb_error = true;
00472 
00473                 // this is unexpected, but we're in a thread here...
00474                 // Need to iterate through all the registered handlers
00475                 // calling their error callback.
00476                 // Can't be locked when calling the callback, so need
00477                 // to make a list of them first.
00478                 scoped_lock lock(m_mutex);
00479                 std::vector<SocketDataHandlerPtr> handlers;
00480                 SocketQueueMap::iterator qi = m_socketQueues.begin();
00481                 while( qi != m_socketQueues.end() ) {
00482                         SocketDataHandlerPtr &sdh = qi->second->m_handler;
00483                         // is there a handler?
00484                         if( sdh ) {
00485                                 handlers.push_back(sdh);
00486                         }
00487                         ++qi;
00488                 }
00489 
00490                 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
00491 
00492                 lock.unlock();
00493                 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
00494                 while( hi != handlers.end() ) {
00495                         (*hi)->Error(ue);
00496                         ++hi;
00497                 }
00498 
00499                 // and finally, call the specific error callback if available
00500                 if( usb_error_handler.get() ) {
00501                         usb_error_handler->Error(ue);
00502                 }
00503         }
00504 }
00505 
00506 void SocketRoutingQueue::SpinoffSimpleReadThread()
00507 {
00508         // signal that it's ok to run inside the thread
00509         if( m_continue_reading )
00510                 return; // already running
00511         m_continue_reading = true;
00512 
00513         // Start USB read thread, to handle all routing
00514         int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
00515         if( ret ) {
00516                 m_continue_reading = false;
00517                 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
00518         }
00519 }
00520 
00521 } // namespace Barry
00522 

Generated on Tue Mar 1 17:50:16 2011 for Barry by  doxygen 1.5.6