ulxr_requester.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002             ulxr_requester.cpp  -  send rpc request ("rpc-client")
00003                              -------------------
00004     begin                : Sat Mar 23 2002
00005     copyright            : (C) 2002-2007 by Ewald Arnold
00006     email                : ulxmlrpcpp@ewald-arnold.de
00007 
00008     $Id: ulxr_requester.cpp 983 2007-07-12 09:52:14Z ewald-arnold $
00009 
00010  ***************************************************************************/
00011 
00012 /**************************************************************************
00013  *
00014  * This program is free software; you can redistribute it and/or modify
00015  * it under the terms of the GNU Lesser General Public License as
00016  * published by the Free Software Foundation; either version 2 of the License,
00017  * or (at your option) any later version.
00018  *
00019  * This program is distributed in the hope that it will be useful,
00020  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00021  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00022  * GNU General Public License for more details.
00023  *
00024  * You should have received a copy of the GNU Lesser General Public License
00025  * along with this program; if not, write to the Free Software
00026  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
00027  *
00028  ***************************************************************************/
00029 
00030 //#define ULXR_SHOW_TRACE
00031 //#define ULXR_DEBUG_OUTPUT
00032 //#define ULXR_SHOW_READ
00033 //#define ULXR_SHOW_WRITE
00034 //#define ULXR_SHOW_XML
00035 
00036 #define ULXR_NEED_EXPORTS
00037 #include <ulxmlrpcpp/ulxmlrpcpp.h>  // always first header
00038 
00039 #include <ulxmlrpcpp/ulxr_requester.h>
00040 #include <ulxmlrpcpp/ulxr_except.h>
00041 #include <ulxmlrpcpp/ulxr_protocol.h>
00042 #include <ulxmlrpcpp/ulxr_connection.h>
00043 #include <ulxmlrpcpp/ulxr_responseparse.h>
00044 #include <ulxmlrpcpp/ulxr_responseparse_wb.h>
00045 
00046 #ifdef __WIN32__
00047 #include <windows.h>
00048 #include <process.h>
00049 #endif
00050 
00051 #ifdef ULXR_MULTITHREADED
00052 
00053 #ifdef __unix__
00054 #include <pthread.h>
00055 #endif
00056 
00057 #endif
00058 
00059 #include <cerrno>
00060 #include <memory>
00061 
00062 namespace ulxr {
00063 
00064 
00065 ULXR_API_IMPL0 Requester::Requester(Protocol* prot, bool wbxml)
00066 {
00067   wbxml_mode = wbxml;
00068   protocol = prot;
00069   cntPendingRequests = 0;
00070 }
00071 
00072 
00073 ULXR_API_IMPL0 Requester::~Requester()
00074 {
00075 }
00076 
00077 
00078 void Requester::send_call (const MethodCall &calldata,
00079                            const CppString &rpc_root)
00080 {
00081   ULXR_TRACE(ULXR_PCHAR("send_call ") << calldata.getMethodName());
00082   if (!protocol->isOpen() )
00083     protocol->open();
00084   else
00085     protocol->resetConnection();
00086 
00087 #ifdef ULXR_ENFORCE_NON_PERSISTENT
00088   protocol->setPersistent(false);
00089 #endif
00090 
00091   protocol->sendRpcCall(calldata, rpc_root, wbxml_mode);
00092 }
00093 
00094 
00095 ULXR_API_IMPL(MethodResponse) Requester::waitForResponse()
00096 {
00097   ULXR_TRACE(ULXR_PCHAR("waitForResponse"));
00098   return waitForResponse(protocol, wbxml_mode);
00099 }
00100 
00101 
00102 ULXR_API_IMPL(MethodResponse)
00103 Requester::waitForResponse(Protocol *protocol, bool wbxml_mode)
00104 {
00105   ULXR_TRACE(ULXR_PCHAR("waitForResponse(Protocol, wbxml)"));
00106   char buffer[ULXR_RECV_BUFFER_SIZE];
00107   char *buff_ptr;
00108 
00109   std::auto_ptr<XmlParserBase> parser;
00110   MethodResponseParserBase *rpb = 0;
00111   if (wbxml_mode)
00112   {
00113     ULXR_TRACE(ULXR_PCHAR("waitForResponse in WBXML"));
00114     MethodResponseParserWb *rp = new MethodResponseParserWb();
00115     rpb = rp;
00116 #ifdef _MSC_VER
00117   std::auto_ptr<XmlParserBase> temp(rp);
00118   parser = temp;
00119 #else
00120     parser.reset(rp);
00121 #endif
00122   }
00123   else
00124   {
00125     ULXR_TRACE(ULXR_PCHAR("waitForResponse in XML"));
00126     MethodResponseParser *rp = new MethodResponseParser();
00127     rpb = rp;
00128 #ifdef _MSC_VER
00129     std::auto_ptr<XmlParserBase> temp(rp);
00130     parser = temp;
00131 #else
00132     parser.reset(rp);
00133 #endif
00134   }
00135 
00136   bool done = false;
00137   long readed;
00138   while (!done && protocol->hasBytesToRead()
00139                && ((readed = protocol->readRaw(buffer, sizeof(buffer))) > 0) )
00140   {
00141     buff_ptr = buffer;
00142     while (readed > 0)
00143     {
00144       Protocol::State state = protocol->connectionMachine(buff_ptr, readed);
00145       if (state == Protocol::ConnError)
00146       {
00147         done = true;
00148         throw ConnectionException(TransportError, ulxr_i18n(ULXR_PCHAR("network problem occured")), 400);
00149       }
00150 
00151       else if (state == Protocol::ConnSwitchToBody)
00152       {
00153 #ifdef ULXR_SHOW_READ
00154         Cpp8BitString super_data (buff_ptr, readed);
00155         while ((readed = protocol->readRaw(buffer, sizeof(buffer))) > 0)
00156           super_data.append(buffer, readed);
00157         ULXR_DOUT_READ(ULXR_PCHAR("superdata 3 start:\n"));
00158 
00159         if (wbxml_mode)
00160         {
00161            ULXR_DOUT_READ(binaryDebugOutput(super_data));
00162         }
00163         else
00164         {
00165           ULXR_DOUT_READ(ULXR_GET_STRING(super_data));
00166         }
00167         ULXR_DOUT_READ(ULXR_PCHAR("superdata 3 end:\n") );
00168 #endif
00169         if (!protocol->hasBytesToRead())
00170         {
00171           throw ConnectionException(NotConformingError,
00172                                     ulxr_i18n(ULXR_PCHAR("Content-Length of message not available")), 411);
00173         }
00174 
00175         CppString s;
00176         if (!protocol->responseStatus(s))
00177           throw ConnectionException(TransportError, s, 500);
00178 
00179       }
00180 
00181       else if (state == Protocol::ConnBody)
00182       {
00183         ULXR_DOUT_XML(ULXR_GET_STRING(std::string(buff_ptr, readed)));
00184         if (!parser->parse(buff_ptr, readed, false))
00185         {
00186           throw XmlException(parser->mapToFaultCode(parser->getErrorCode()),
00187                              ulxr_i18n(ULXR_PCHAR("Problem while parsing xml response")),
00188                              parser->getCurrentLineNumber(),
00189                              ULXR_GET_STRING(parser->getErrorString(parser->getErrorCode())));
00190         }
00191         readed = 0;
00192       }
00193     }
00194 
00195     if (!protocol->hasBytesToRead())
00196 //        || parser->isComplete())
00197       done = true;
00198   }
00199 
00200   if (protocol->isOpen() && !protocol->isPersistent() )
00201     protocol->close();
00202 
00203   return rpb->getMethodResponse();
00204 }
00205 
00206 
00207 ULXR_API_IMPL(MethodResponse)
00208   Requester::call (const MethodCall& calldata, const CppString &rpc_root,
00209                    const CppString &user, const CppString &pass)
00210 {
00211    ULXR_TRACE(ULXR_PCHAR("call(..,user, pass)"));
00212    protocol->setMessageAuthentication(user, pass);
00213    send_call (calldata, rpc_root);
00214    return waitForResponse();
00215 }
00216 
00217 
00218 ULXR_API_IMPL(MethodResponse)
00219   Requester::call (const MethodCall& calldata, const CppString &rpc_root)
00220 {
00221    ULXR_TRACE(ULXR_PCHAR("call"));
00222    send_call (calldata, rpc_root);
00223    return waitForResponse();
00224 }
00225 
00226 
00228 //
00229 
00230 
00231 namespace hidden {
00232 
00233 
00234 ULXR_API_IMPL0 Receiver_t::Receiver_t(StaticReceiver_t recv)
00235  : static_recv(recv)
00236  , dynamic_recv(0)
00237 {
00238 }
00239 
00240 
00241 ULXR_API_IMPL0 Receiver_t::Receiver_t(DynamicReceiver_t recv)
00242  : static_recv(0)
00243  , dynamic_recv(recv)
00244 {
00245 }
00246 
00247 
00248 ULXR_API_IMPL(void) Receiver_t::receive(const MethodResponse &resp)
00249 {
00250   if (0 != dynamic_recv)
00251     dynamic_recv->receive(resp);
00252 
00253   else if (0 != static_recv)
00254     static_recv(resp);
00255 }
00256 
00257 
00258 ULXR_API_IMPL(void) Receiver_t::free()
00259 {
00260   if (0 != dynamic_recv)
00261   {
00262     delete dynamic_recv;
00263     dynamic_recv = 0;
00264   }
00265 }
00266 
00267 
00268 }  // namespace hidden
00269 
00270 
00272 //
00273 
00274 
00275 struct DispatcherData
00276 {
00277   DispatcherData(Protocol *p, hidden::Receiver_t r, Requester *req)
00278     : requester(req)
00279     , prot(p)
00280     , recv(r)
00281 
00282     {}
00283 
00284   Requester            *requester;
00285   Protocol             *prot;
00286   hidden::Receiver_t    recv;
00287 };
00288 
00289 
00291 //
00292 
00293 
00294 #ifdef ULXR_MULTITHREADED
00295 
00296 void *dispatchThreaded(DispatcherData *data)
00297 {
00298   data->requester->incPending();
00299   ULXR_TRACE(ULXR_PCHAR("dispatchThreaded"));
00300   try
00301   {
00302     MethodResponse resp = Requester::waitForResponse(data->prot, data->requester->isWbXml());
00303     ULXR_TRACE(ULXR_PCHAR("  recv.receive(resp)"));
00304     data->recv.receive(resp);
00305     data->requester->decPending();
00306     data->recv.free();
00307     delete data->prot;
00308     delete data;
00309     return 0;
00310   }
00311 
00312   catch(const Exception &ex)
00313   {
00314     data->requester->forwardException(ex);
00315     ULXR_TRACE(ULXR_PCHAR("catch(const Exception &ex) in dispatchThreaded"));
00316   }
00317 
00318   catch(const std::exception &ex)
00319   {
00320     data->requester->forwardException(ex);
00321     ULXR_TRACE(ULXR_PCHAR("catch(const std::exception &ex) in dispatchThreaded"));
00322   }
00323 
00324   catch(...)
00325   {
00326     data->requester->forwardException();
00327     ULXR_TRACE(ULXR_PCHAR("catch(...) in dispatchThreaded"));
00328   }
00329 
00330   data->requester->decPending();
00331   delete data->prot;
00332   delete data;
00333   return (void*)1;   // FIXME: more error handling ??
00334 }
00335 
00336 
00337 #endif // ULXR_MULTITHREADED
00338 
00339 
00341 //
00342 
00343 
00344 #ifdef ULXR_MULTITHREADED
00345 
00346 void Requester::startDispatch(const MethodCall &methcall,
00347                               const CppString &rpc_root,
00348                               hidden::Receiver_t recv)
00349 {
00350     ULXR_TRACE(ULXR_PCHAR("startDispatch"));
00351     send_call(methcall, rpc_root);
00352     Protocol *prot = protocol->detach();
00353     DispatcherData *dd = new DispatcherData (prot, recv, this);
00354 
00355 #ifdef __unix__
00356     typedef void* (*pthread_sig)(void*);
00357     pthread_t handle;
00358 
00359     bool created = false;
00360     while (!created)
00361     {
00362       created = (0 == pthread_create(&handle, 0, (pthread_sig)dispatchThreaded, dd));
00363       if (!created)
00364       {
00365         switch(protocol->getConnection()->getLastError())
00366         {
00367           case EAGAIN: // fallthrough
00368           case EINTR:
00369 #ifdef __unix__
00370             errno = 0;
00371 #endif
00372             continue;
00373           break;
00374 
00375           default:
00376           {
00377             delete prot;
00378             throw Exception(SystemError,
00379                             ulxr_i18n(ULXR_PCHAR("Could not create thread which processes rpc response.\n"))
00380                                             + getLastErrorString(prot->getConnection()->getLastError()));
00381           }
00382         }
00383       }
00384     }
00385 
00386     unsigned detached = pthread_detach(handle);
00387     if (detached != 0)
00388       throw Exception(SystemError,
00389                       ulxr_i18n(ULXR_PCHAR("Could not detach thread which processes rpc response")));
00390 
00391 #elif defined(__WIN32__)
00392     typedef unsigned int (__stdcall *thread_sig)(void*);
00393     unsigned handle;
00394 
00395     unsigned ret = _beginthreadex( 0, 16*1024, (thread_sig)dispatchThreaded,
00396                                   dd, CREATE_SUSPENDED, &handle );
00397     int resume = ResumeThread((void*)ret);
00398     if (resume < 0)
00399       throw Exception(SystemError,
00400                       ulxr_i18n(ULXR_PCHAR("Could not resume thread which processes rpc response")));
00401 
00402     bool created = ret != 0;
00403     if (!created)
00404       throw Exception(SystemError,
00405                       ulxr_i18n(ULXR_PCHAR("Could not create thread which processes rpc response")));
00406 #else
00407 #error unsupported platform here
00408 #endif
00409 }
00410 
00411 #endif // ULXR_MULTITHREADED
00412 
00413 
00414 #ifdef ULXR_MULTITHREADED
00415 
00416 ULXR_API_IMPL(void)
00417 Requester::call (const MethodCall& methcall,
00418                  const CppString &rpc_root,
00419                  const CppString &user,
00420                  const CppString &pass,
00421                  hidden::Receiver_t recv)
00422 {
00423    ULXR_TRACE(ULXR_PCHAR("call (.., user, pass, rcv)"));
00424    protocol->setMessageAuthentication(user, pass);
00425    startDispatch(methcall, rpc_root, recv);
00426 }
00427 
00428 
00429 ULXR_API_IMPL(void)
00430 Requester::call (const MethodCall& methcall,
00431                  const CppString &rpc_root,
00432                  hidden::Receiver_t recv)
00433 {
00434    ULXR_TRACE(ULXR_PCHAR("call (.., rcv)"));
00435    startDispatch(methcall, rpc_root, recv);
00436 }
00437 
00438 #endif // ULXR_MULTITHREADED
00439 
00440 ULXR_API_IMPL(void)
00441 Requester::transmit (const MethodCall& calldata, const CppString &rpc_root,
00442                      const CppString &user, const CppString &pass)
00443 {
00444    ULXR_TRACE(ULXR_PCHAR("transmit (.., user, pass)"));
00445    protocol->setMessageAuthentication(user, pass);
00446    protocol->setTransmitOnly();
00447    send_call(calldata, rpc_root);
00448 }
00449 
00450 
00451 ULXR_API_IMPL(void)
00452 Requester::transmit (const MethodCall& calldata, const CppString &rpc_root)
00453 
00454 {
00455    ULXR_TRACE(ULXR_PCHAR("transmit"));
00456    protocol->setTransmitOnly();
00457    send_call(calldata, rpc_root);
00458 }
00459 
00460 
00461 #ifdef ULXR_MULTITHREADED
00462 
00463 ULXR_API_IMPL(unsigned) Requester::numPendingRequests() const
00464 {
00465   ULXR_TRACE(ULXR_PCHAR("numPendingRequests ") << cntPendingRequests);
00466   return cntPendingRequests;
00467 }
00468 
00469 
00470 ULXR_API_IMPL(void) Requester::incPending()
00471 {
00472   ULXR_TRACE(ULXR_PCHAR("incPending ") << cntPendingRequests);
00473   Mutex::Locker lock (pendingMutex);
00474   ++cntPendingRequests;
00475 }
00476 
00477 
00478 ULXR_API_IMPL(void) Requester::decPending()
00479 {
00480   ULXR_TRACE(ULXR_PCHAR("decPending ") << cntPendingRequests);
00481   Mutex::Locker lock (pendingMutex);
00482   if (cntPendingRequests > 0)
00483     --cntPendingRequests;
00484 }
00485 
00486 
00487 ULXR_API_IMPL(void) Requester::forwardException()
00488 {
00489 }
00490 
00491 
00492 ULXR_API_IMPL(void) Requester::forwardException(const std::exception &/*ex*/)
00493 {
00494 }
00495 
00496 
00497 ULXR_API_IMPL(void) Requester::forwardException(const Exception &/*ex*/)
00498 {
00499 }
00500 
00501 
00502 #endif // ULXR_MULTITHREADED
00503 
00504 ULXR_API_IMPL(bool) Requester::isWbXml() const
00505 {
00506   return wbxml_mode;
00507 }
00508 
00509 
00510 namespace hidden {
00511 
00512 ULXR_API_IMPL0 ReceiverWrapperBase::~ReceiverWrapperBase()
00513 {
00514 }
00515 
00516 }
00517 
00518 
00519 }  // namespace ulxr

Generated on Sun Aug 19 20:08:57 2007 for ulxmlrpcpp by  doxygen 1.5.1