ulxr_mtrpc_server.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002            ulxr_mtrpc_server.cpp  -  a simple multithreaded rpc server
00003                              -------------------
00004     begin                : Wed Oct 10 2003
00005     copyright            : (C) 2002-2007 by Ewald Arnold
00006     email                : ulxmlrpcpp@ewald-arnold.de
00007 
00008     $Id: ulxr_mtrpc_server.cpp 999 2007-07-16 12:49:00Z ewald-arnold $
00009 
00010  ***************************************************************************/
00011 
00012 /**************************************************************************
00013  *
00014  * This program is free software; you can redistribute it and/or modify
00015  * it under the terms of the GNU Lesser General Public License as
00016  * published by the Free Software Foundation; either version 2 of the License,
00017  * or (at your option) any later version.
00018  *
00019  * This program is distributed in the hope that it will be useful,
00020  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00021  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00022  * GNU General Public License for more details.
00023  *
00024  * You should have received a copy of the GNU Lesser General Public License
00025  * along with this program; if not, write to the Free Software
00026  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
00027  *
00028  ***************************************************************************/
00029 
00030 
00031 //#define ULXR_SHOW_TRACE
00032 //#define ULXR_DEBUG_OUTPUT
00033 //#define ULXR_SHOW_READ
00034 //#define ULXR_SHOW_WRITE
00035 
00036 #define ULXR_NEED_EXPORTS
00037 #include <ulxmlrpcpp/ulxmlrpcpp.h>  // always first header
00038 
00039 #ifdef __unix__
00040 #include <sys/socket.h>
00041 #endif
00042 
00043 #ifdef ULXR_MULTITHREADED
00044 
00045 #ifdef __WIN32__
00046 #include <winsock2.h>
00047 #endif
00048 
00049 #include <ulxmlrpcpp/ulxr_mtrpc_server.h>
00050 
00051 #include <ulxmlrpcpp/ulxr_dispatcher.h>
00052 #include <ulxmlrpcpp/ulxr_protocol.h>
00053 #include <ulxmlrpcpp/ulxr_except.h>
00054 
00055 #ifdef __unix__
00056 #include <csignal>
00057 #endif
00058 
00059 #include <iostream>
00060 
00061 namespace ulxr {
00062 
00063 
00064 /* Helper class for thread handling
00065  */
00066 class ULXR_API_DECL0 MultiThreadRpcServer::ThreadData
00067 {
00068  public:
00069 
00070 #ifdef __unix__
00071   typedef pthread_t  handle_t;
00072 #elif defined(__WIN32__)
00073   typedef HANDLE     handle_t;
00074 #else
00075 #error unsupported platform here
00076 #endif
00077 
00078  /* Constucts the thread data
00079   * @param server       pointer to the according rpc-server
00080   * @param connection   points to the connection data
00081   */
00082   ThreadData (MultiThreadRpcServer *server, Protocol *prot);
00083 
00084  /* Gets info, if thread should continue to run.
00085   * @return true: thread should continue
00086   */
00087   bool shouldRun() const;
00088 
00089  /* Signals thread to terminate
00090   */
00091   void setTerminate();
00092 
00093  /* Gets the thread handle.
00094   * @return handle from operation system
00095   */
00096   handle_t getHandle() const;
00097 
00098  /* Sets the thread handle.
00099   * @param  handle   handle from operation system
00100   */
00101   void setHandle(handle_t hd);
00102 
00103  /* Gets the connection.
00104   * @return connection
00105   */
00106   Protocol *getProtocol() const;
00107 
00108  /* Gets the server object.
00109   * @return server object
00110   */
00111   MultiThreadRpcServer *getServer() const;
00112 
00113  /* Increments invocation counter.
00114   */
00115   void incInvoked();
00116 
00117  /* Returns the invocation counter.
00118   * @return number of processed requests
00119   */
00120   unsigned numInvoked() const;
00121 
00122  private:
00123 
00124   bool                  run;
00125   handle_t              handle;
00126   unsigned              ctrInvoked;
00127   Protocol             *protocol;
00128   MultiThreadRpcServer *server;
00129 };
00130 
00131 
00132 ULXR_API_IMPL0 MultiThreadRpcServer::MultiThreadRpcServer(Protocol *prot, unsigned num_threads, bool wbxml)
00133 {
00134   wbxml_mode = wbxml;
00135   for (unsigned i = 0; i < num_threads; ++i)
00136 #ifdef _MSC_VER
00137       threads.push_back(new ThreadData(this, (Protocol*)(prot->detach())));
00138 #else
00139       threads.push_back(new ThreadData(this, dynamic_cast<Protocol*>(prot->detach())));
00140 #endif
00141 }
00142 
00143 
00144 ULXR_API_IMPL0 MultiThreadRpcServer::~MultiThreadRpcServer()
00145 {
00146   waitAsync(true);
00147 
00148   releaseThreads();
00149 }
00150 
00151 
00152 ULXR_API_IMPL(void)
00153 MultiThreadRpcServer::addMethod (MethodAdder::StaticMethodCall_t adr,
00154                                  const CppString &ret_signature,
00155                                  const CppString &name,
00156                                  const CppString &signature,
00157                                  const CppString &help)
00158 {
00159     dispatcher.addMethod(adr, ret_signature, name, signature, help);
00160 }
00161 
00162 
00163 ULXR_API_IMPL(void)
00164 MultiThreadRpcServer::addMethod (MethodAdder::DynamicMethodCall_t wrapper,
00165                                  const CppString &ret_signature,
00166                                  const CppString &name,
00167                                  const CppString &signature,
00168                                  const CppString &help)
00169 {
00170   dispatcher.addMethod(wrapper, ret_signature, name, signature, help);
00171 }
00172 
00173 
00174 ULXR_API_IMPL(void)
00175 MultiThreadRpcServer::addMethod (MethodAdder::SystemMethodCall_t adr,
00176                                  const CppString &ret_signature,
00177                                  const CppString &name,
00178                                  const CppString &signature,
00179                                  const CppString &help)
00180 {
00181   dispatcher.addMethod(adr, ret_signature, name, signature, help);
00182 }
00183 
00184 
00185 ULXR_API_IMPL(void)
00186 MultiThreadRpcServer::addMethod (MethodAdder::StaticMethodCall_t adr,
00187                                  const Signature &ret_signature,
00188                                  const CppString &name,
00189                                  const Signature &signature,
00190                                  const CppString &help)
00191 {
00192   dispatcher.addMethod(adr, ret_signature, name, signature, help);
00193 }
00194 
00195 
00196 ULXR_API_IMPL(void)
00197 MultiThreadRpcServer::addMethod (MethodAdder::DynamicMethodCall_t wrapper,
00198                                  const Signature &ret_signature,
00199                                  const CppString &name,
00200                                  const Signature &signature,
00201                                  const CppString &help)
00202 {
00203   dispatcher.addMethod(wrapper, ret_signature, name, signature, help);
00204 }
00205 
00206 
00207 ULXR_API_IMPL(void)
00208 MultiThreadRpcServer::addMethod (MethodAdder::SystemMethodCall_t adr,
00209                                  const Signature &ret_signature,
00210                                  const CppString &name,
00211                                  const Signature &signature,
00212                                  const CppString &help)
00213 {
00214   dispatcher.addMethod(adr, ret_signature, name, signature, help);
00215 }
00216 
00217 
00218 ULXR_API_IMPL(void)
00219 MultiThreadRpcServer::removeMethod(const CppString &name)
00220 {
00221   dispatcher.removeMethod(name);
00222 }
00223 
00224 
00225 ULXR_API_IMPL(void) MultiThreadRpcServer::preProcessCall(MethodCall &/*call*/)
00226 {
00227 }
00228 
00229 
00230 ULXR_API_IMPL(void) MultiThreadRpcServer::preProcessResponse(MethodResponse &/*resp*/)
00231 {
00232 }
00233 
00234 
00235 ULXR_API_IMPL(void *) MultiThreadRpcServer::serverLoop(Protocol *protocol, ThreadData *td)
00236 {
00237   ULXR_TRACE(ULXR_PCHAR("entered new server thread"));
00238   Dispatcher waiter(protocol, wbxml_mode);
00239   while (td->shouldRun())
00240   {
00241     try
00242     {
00243       ULXR_TRACE(ULXR_PCHAR("waitForCall()"));
00244       MethodCall call = waiter.waitForCall();
00245       preProcessCall(call);
00246 
00247       td->incInvoked();
00248       ULXR_TRACE(ULXR_PCHAR("server thread ")
00249                  << std::hex << (void*) td->getHandle() << std::dec
00250                  << ULXR_PCHAR(" received new call"));
00251 
00252       MethodResponse resp = dispatcher.dispatchCall(call);
00253       preProcessResponse(resp);
00254 
00255       if (!protocol->isTransmitOnly())
00256       {
00257         ULXR_TRACE(ULXR_PCHAR("NOT TransmitOnly"));
00258         protocol->sendRpcResponse(resp, wbxml_mode);
00259       }
00260       else
00261       {
00262         ULXR_TRACE(ULXR_PCHAR("IS TransmitOnly"));
00263       }
00264 
00265       if (!protocol->isPersistent())
00266         protocol->close();
00267     }
00268 
00269     catch(ConnectionException &ex)
00270     {
00271       forwardThreadedError(ex);
00272 
00273       if (protocol->isOpen())
00274       {
00275         try
00276         {
00277           MethodResponse resp(ex.getStatusCode(), ex.why() );
00278           if (!protocol->isTransmitOnly())
00279             protocol->sendRpcResponse(resp, wbxml_mode);
00280         }
00281         catch(...)
00282         {
00283           // nothing
00284         }
00285         protocol->close();
00286       }
00287     }
00288 
00289     catch(Exception& ex)
00290     {
00291       forwardThreadedError(ex);
00292 
00293       if (protocol->isOpen())
00294       {
00295         try
00296         {
00297           MethodResponse resp(1, ex.why() );
00298           if (!protocol->isTransmitOnly())
00299             protocol->sendRpcResponse(resp, wbxml_mode);
00300         }
00301         catch(...)
00302         {
00303           // nothing
00304         }
00305         protocol->close();
00306       }
00307     }
00308 
00309     catch(std::exception& ex)
00310     {
00311       forwardThreadedError(Exception(ApplicationError, ULXR_GET_STRING(ex.what())));
00312 
00313       if (protocol->isOpen())
00314       {
00315         try
00316         {
00317           MethodResponse resp(1, ULXR_GET_STRING(ex.what()) );
00318           if (!protocol->isTransmitOnly())
00319             protocol->sendRpcResponse(resp, wbxml_mode);
00320         }
00321         catch(...)
00322         {
00323           // nothing
00324         }
00325         protocol->close();
00326       }
00327     }
00328 
00329     catch(...)
00330     {
00331       RuntimeException ex (SystemError, ULXR_PCHAR("Unknown error occured"));
00332       forwardThreadedError(ex);
00333 
00334       if (protocol->isOpen())
00335       {
00336         try
00337         {
00338           MethodResponse resp(1, ex.why() );
00339           if (!protocol->isTransmitOnly())
00340             protocol->sendRpcResponse(resp, wbxml_mode);
00341         }
00342         catch(...)
00343         {
00344           // nothing
00345         }
00346         protocol->close();
00347       }
00348     }
00349   }
00350 
00351   ULXR_TRACE(ULXR_PCHAR("Leaving server thread ")
00352               << std::hex << (void*) td->getHandle() << std::dec);
00353   return 0;
00354 }
00355 
00356 
00357 
00358 #ifdef ULXR_SHOW_TRACE
00359 ULXR_API_IMPL(void) MultiThreadRpcServer::forwardThreadedError(const Exception &ex) const
00360 #else
00361 ULXR_API_IMPL(void) MultiThreadRpcServer::forwardThreadedError(const Exception &) const
00362 #endif
00363 {
00364    ULXR_TRACE(ULXR_CHAR("Threaded error occured: ") << ex.why());
00365 }
00366 
00367 
00368 ULXR_API_IMPL(void *) MultiThreadRpcServer::startThread(ThreadData *td)
00369 {
00370   ULXR_TRACE(ULXR_PCHAR("startThread ")
00371              << std::hex << (void*) td
00372              << std::dec);
00373   return (void*) (td->getServer())->serverLoop(td->getProtocol(), td);
00374 }
00375 
00376 
00377 ULXR_API_IMPL(unsigned) MultiThreadRpcServer::dispatchAsync()
00378 {
00379   ULXR_TRACE(ULXR_PCHAR("dispatchAsync()"));
00380   unsigned num_started = 0;
00381 
00382   for (unsigned i = 0; i < threads.size(); ++i)
00383   {
00384     ThreadData::handle_t tdh;
00385 #ifdef __unix__
00386     typedef void* (*pthread_sig)(void*);
00387     int result = pthread_create(&tdh, 0, (pthread_sig)startThread, threads[i]);
00388     if (result == 0)
00389       ++num_started;
00390 #elif defined(__WIN32__)
00391     unsigned tid;
00392     typedef unsigned int (__stdcall *thread_sig)(void*);
00393     tdh = (HANDLE)_beginthreadex(0, 16*1024, (thread_sig)startThread,
00394                                  threads[i], CREATE_SUSPENDED,
00395                                  &tid );
00396     int resume = ResumeThread(tdh);
00397     if (tdh >= 0 && resume >= 0)
00398       ++num_started;
00399 #else
00400 #error unsupported platform here
00401 #endif
00402     threads[i]->setHandle(tdh);
00403   }
00404   ULXR_TRACE(ULXR_PCHAR("leaving dispatchAsync()"));
00405   return num_started;
00406 }
00407 
00408 
00409 ULXR_API_IMPL(unsigned) MultiThreadRpcServer::numThreads() const
00410 {
00411   return threads.size();
00412 }
00413 
00414 
00415 ULXR_API_IMPL(void) MultiThreadRpcServer::terminateAllThreads(unsigned /*time*/)
00416 {
00417   ULXR_TRACE(ULXR_PCHAR("Request to terminate all threads."));
00418   for (unsigned i1 = 0; i1 < threads.size(); ++i1)
00419     threads[i1]->setTerminate();
00420 }
00421 
00422 
00423 ULXR_API_IMPL(void) MultiThreadRpcServer::shutdownAllThreads(unsigned /*time*/)
00424 {
00425   ULXR_TRACE(ULXR_PCHAR("Request to shutdown all threads."));
00426   for (unsigned i1 = 0; i1 < threads.size(); ++i1)
00427   {
00428     threads[i1]->setTerminate();
00429     try
00430     {
00431 #ifdef __WIN32__
00432       threads[i1]->getProtocol()->shutdown(SD_BOTH);
00433 #else
00434       threads[i1]->getProtocol()->shutdown(SHUT_RDWR);
00435 #endif
00436     }
00437     catch(...)  // ignore expected errors
00438     {
00439     }
00440   }
00441 }
00442 
00443 
00444 ULXR_API_IMPL(void) MultiThreadRpcServer::waitAsync(bool term, bool stat)
00445 {
00446    ULXR_TRACE(ULXR_PCHAR("waitAsync"));
00447 
00448    if (threads.size() == 0)
00449      return;
00450 
00451    if (term)
00452      terminateAllThreads(1000);
00453 
00454    ULXR_TRACE(ULXR_PCHAR("waitAsync: join"));
00455    for (unsigned i = 0; i < threads.size(); ++i)
00456    {
00457      ULXR_TRACE(ULXR_PCHAR(" join " << i));
00458 #ifdef __unix__
00459      void *status;
00460      pthread_join(threads[i]->getHandle(), &status);
00461 #elif defined(__WIN32__)
00462      WaitForSingleObject(threads[i]->getHandle(), INFINITE);
00463      CloseHandle(threads[i]->getHandle());
00464 #else
00465 #error unsupported platform here
00466 #endif
00467      // maybe check (*status != 0) here
00468    }
00469    if (stat)
00470      printStatistics();
00471 
00472    releaseThreads();
00473 }
00474 
00475 
00476 ULXR_API_IMPL(void) MultiThreadRpcServer::releaseThreads()
00477 {
00478    ULXR_TRACE(ULXR_PCHAR("releaseThreads()"));
00479    for (unsigned i = 0; i < threads.size(); ++i)
00480    {
00481      delete threads[i]->getProtocol();
00482      delete threads[i];
00483    }
00484 
00485    threads.clear();
00486 }
00487 
00488 
00489 ULXR_API_IMPL(void) MultiThreadRpcServer::printStatistics() const
00490 {
00491    for (unsigned i = 0; i < threads.size(); ++i)
00492      ULXR_COUT << ULXR_PCHAR("Thread ")
00493                << std::dec << i
00494                << ULXR_PCHAR(" invoked ")
00495                << threads[i]->numInvoked()
00496                << ULXR_PCHAR(" times.\n");
00497 }
00498 
00499 
00501 
00502 
00503 ULXR_API_IMPL0 MultiThreadRpcServer::ThreadData::ThreadData (MultiThreadRpcServer *serv, Protocol *prot)
00504 {
00505   run = true;
00506   handle = 0;
00507   ctrInvoked = 0;
00508   protocol = prot;
00509   server = serv;
00510 }
00511 
00512 
00513 ULXR_API_IMPL(bool) MultiThreadRpcServer::ThreadData::shouldRun() const
00514 {
00515   return run;
00516 }
00517 
00518 
00519 ULXR_API_IMPL(void) MultiThreadRpcServer::ThreadData::setTerminate()
00520 {
00521   ULXR_TRACE(ULXR_PCHAR("Request to terminate a single thread."));
00522 
00523   run = false;
00524 }
00525 
00526 
00527 MultiThreadRpcServer::ThreadData::handle_t
00528  ULXR_API_IMPL0 MultiThreadRpcServer::ThreadData::getHandle() const
00529 {
00530   return handle;
00531 }
00532 
00533 
00534 ULXR_API_IMPL(Protocol *) MultiThreadRpcServer::ThreadData::getProtocol() const
00535 {
00536   return protocol;
00537 }
00538 
00539 
00540 ULXR_API_IMPL(MultiThreadRpcServer *) MultiThreadRpcServer::ThreadData::getServer() const
00541 {
00542   return server;
00543 }
00544 
00545 
00546 ULXR_API_IMPL(void) MultiThreadRpcServer::ThreadData::setHandle(handle_t hd)
00547 {
00548   handle = hd;
00549 }
00550 
00551 
00552 ULXR_API_IMPL(void) MultiThreadRpcServer::ThreadData::incInvoked()
00553 {
00554   ctrInvoked++;
00555 }
00556 
00557 
00558 ULXR_API_IMPL(unsigned) MultiThreadRpcServer::ThreadData::numInvoked() const
00559 {
00560   return ctrInvoked;
00561 }
00562 
00563 
00564 }  // namespace ulxr
00565 
00566 
00567 #endif // ULXR_MULTITHREADED
00568 

Generated on Sun Aug 19 20:08:57 2007 for ulxmlrpcpp by  doxygen 1.5.1