00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #define ULXR_NEED_EXPORTS
00037 #include <ulxmlrpcpp/ulxmlrpcpp.h>
00038
00039 #ifdef __unix__
00040 #include <sys/socket.h>
00041 #endif
00042
00043 #ifdef ULXR_MULTITHREADED
00044
00045 #ifdef __WIN32__
00046 #include <winsock2.h>
00047 #endif
00048
00049 #include <ulxmlrpcpp/ulxr_mtrpc_server.h>
00050
00051 #include <ulxmlrpcpp/ulxr_dispatcher.h>
00052 #include <ulxmlrpcpp/ulxr_protocol.h>
00053 #include <ulxmlrpcpp/ulxr_except.h>
00054
00055 #ifdef __unix__
00056 #include <csignal>
00057 #endif
00058
00059 #include <iostream>
00060
00061 namespace ulxr {
00062
00063
00064
00065
00066 class ULXR_API_DECL0 MultiThreadRpcServer::ThreadData
00067 {
00068 public:
00069
00070 #ifdef __unix__
00071 typedef pthread_t handle_t;
00072 #elif defined(__WIN32__)
00073 typedef HANDLE handle_t;
00074 #else
00075 #error unsupported platform here
00076 #endif
00077
00078
00079
00080
00081
00082 ThreadData (MultiThreadRpcServer *server, Protocol *prot);
00083
00084
00085
00086
00087 bool shouldRun() const;
00088
00089
00090
00091 void setTerminate();
00092
00093
00094
00095
00096 handle_t getHandle() const;
00097
00098
00099
00100
00101 void setHandle(handle_t hd);
00102
00103
00104
00105
00106 Protocol *getProtocol() const;
00107
00108
00109
00110
00111 MultiThreadRpcServer *getServer() const;
00112
00113
00114
00115 void incInvoked();
00116
00117
00118
00119
00120 unsigned numInvoked() const;
00121
00122 private:
00123
00124 bool run;
00125 handle_t handle;
00126 unsigned ctrInvoked;
00127 Protocol *protocol;
00128 MultiThreadRpcServer *server;
00129 };
00130
00131
00132 ULXR_API_IMPL0 MultiThreadRpcServer::MultiThreadRpcServer(Protocol *prot, unsigned num_threads, bool wbxml)
00133 {
00134 wbxml_mode = wbxml;
00135 for (unsigned i = 0; i < num_threads; ++i)
00136 #ifdef _MSC_VER
00137 threads.push_back(new ThreadData(this, (Protocol*)(prot->detach())));
00138 #else
00139 threads.push_back(new ThreadData(this, dynamic_cast<Protocol*>(prot->detach())));
00140 #endif
00141 }
00142
00143
00144 ULXR_API_IMPL0 MultiThreadRpcServer::~MultiThreadRpcServer()
00145 {
00146 waitAsync(true);
00147
00148 releaseThreads();
00149 }
00150
00151
00152 ULXR_API_IMPL(void)
00153 MultiThreadRpcServer::addMethod (MethodAdder::StaticMethodCall_t adr,
00154 const CppString &ret_signature,
00155 const CppString &name,
00156 const CppString &signature,
00157 const CppString &help)
00158 {
00159 dispatcher.addMethod(adr, ret_signature, name, signature, help);
00160 }
00161
00162
00163 ULXR_API_IMPL(void)
00164 MultiThreadRpcServer::addMethod (MethodAdder::DynamicMethodCall_t wrapper,
00165 const CppString &ret_signature,
00166 const CppString &name,
00167 const CppString &signature,
00168 const CppString &help)
00169 {
00170 dispatcher.addMethod(wrapper, ret_signature, name, signature, help);
00171 }
00172
00173
00174 ULXR_API_IMPL(void)
00175 MultiThreadRpcServer::addMethod (MethodAdder::SystemMethodCall_t adr,
00176 const CppString &ret_signature,
00177 const CppString &name,
00178 const CppString &signature,
00179 const CppString &help)
00180 {
00181 dispatcher.addMethod(adr, ret_signature, name, signature, help);
00182 }
00183
00184
00185 ULXR_API_IMPL(void)
00186 MultiThreadRpcServer::addMethod (MethodAdder::StaticMethodCall_t adr,
00187 const Signature &ret_signature,
00188 const CppString &name,
00189 const Signature &signature,
00190 const CppString &help)
00191 {
00192 dispatcher.addMethod(adr, ret_signature, name, signature, help);
00193 }
00194
00195
00196 ULXR_API_IMPL(void)
00197 MultiThreadRpcServer::addMethod (MethodAdder::DynamicMethodCall_t wrapper,
00198 const Signature &ret_signature,
00199 const CppString &name,
00200 const Signature &signature,
00201 const CppString &help)
00202 {
00203 dispatcher.addMethod(wrapper, ret_signature, name, signature, help);
00204 }
00205
00206
00207 ULXR_API_IMPL(void)
00208 MultiThreadRpcServer::addMethod (MethodAdder::SystemMethodCall_t adr,
00209 const Signature &ret_signature,
00210 const CppString &name,
00211 const Signature &signature,
00212 const CppString &help)
00213 {
00214 dispatcher.addMethod(adr, ret_signature, name, signature, help);
00215 }
00216
00217
00218 ULXR_API_IMPL(void)
00219 MultiThreadRpcServer::removeMethod(const CppString &name)
00220 {
00221 dispatcher.removeMethod(name);
00222 }
00223
00224
00225 ULXR_API_IMPL(void) MultiThreadRpcServer::preProcessCall(MethodCall &)
00226 {
00227 }
00228
00229
00230 ULXR_API_IMPL(void) MultiThreadRpcServer::preProcessResponse(MethodResponse &)
00231 {
00232 }
00233
00234
00235 ULXR_API_IMPL(void *) MultiThreadRpcServer::serverLoop(Protocol *protocol, ThreadData *td)
00236 {
00237 ULXR_TRACE(ULXR_PCHAR("entered new server thread"));
00238 Dispatcher waiter(protocol, wbxml_mode);
00239 while (td->shouldRun())
00240 {
00241 try
00242 {
00243 ULXR_TRACE(ULXR_PCHAR("waitForCall()"));
00244 MethodCall call = waiter.waitForCall();
00245 preProcessCall(call);
00246
00247 td->incInvoked();
00248 ULXR_TRACE(ULXR_PCHAR("server thread ")
00249 << std::hex << (void*) td->getHandle() << std::dec
00250 << ULXR_PCHAR(" received new call"));
00251
00252 MethodResponse resp = dispatcher.dispatchCall(call);
00253 preProcessResponse(resp);
00254
00255 if (!protocol->isTransmitOnly())
00256 {
00257 ULXR_TRACE(ULXR_PCHAR("NOT TransmitOnly"));
00258 protocol->sendRpcResponse(resp, wbxml_mode);
00259 }
00260 else
00261 {
00262 ULXR_TRACE(ULXR_PCHAR("IS TransmitOnly"));
00263 }
00264
00265 if (!protocol->isPersistent())
00266 protocol->close();
00267 }
00268
00269 catch(ConnectionException &ex)
00270 {
00271 forwardThreadedError(ex);
00272
00273 if (protocol->isOpen())
00274 {
00275 try
00276 {
00277 MethodResponse resp(ex.getStatusCode(), ex.why() );
00278 if (!protocol->isTransmitOnly())
00279 protocol->sendRpcResponse(resp, wbxml_mode);
00280 }
00281 catch(...)
00282 {
00283
00284 }
00285 protocol->close();
00286 }
00287 }
00288
00289 catch(Exception& ex)
00290 {
00291 forwardThreadedError(ex);
00292
00293 if (protocol->isOpen())
00294 {
00295 try
00296 {
00297 MethodResponse resp(1, ex.why() );
00298 if (!protocol->isTransmitOnly())
00299 protocol->sendRpcResponse(resp, wbxml_mode);
00300 }
00301 catch(...)
00302 {
00303
00304 }
00305 protocol->close();
00306 }
00307 }
00308
00309 catch(std::exception& ex)
00310 {
00311 forwardThreadedError(Exception(ApplicationError, ULXR_GET_STRING(ex.what())));
00312
00313 if (protocol->isOpen())
00314 {
00315 try
00316 {
00317 MethodResponse resp(1, ULXR_GET_STRING(ex.what()) );
00318 if (!protocol->isTransmitOnly())
00319 protocol->sendRpcResponse(resp, wbxml_mode);
00320 }
00321 catch(...)
00322 {
00323
00324 }
00325 protocol->close();
00326 }
00327 }
00328
00329 catch(...)
00330 {
00331 RuntimeException ex (SystemError, ULXR_PCHAR("Unknown error occured"));
00332 forwardThreadedError(ex);
00333
00334 if (protocol->isOpen())
00335 {
00336 try
00337 {
00338 MethodResponse resp(1, ex.why() );
00339 if (!protocol->isTransmitOnly())
00340 protocol->sendRpcResponse(resp, wbxml_mode);
00341 }
00342 catch(...)
00343 {
00344
00345 }
00346 protocol->close();
00347 }
00348 }
00349 }
00350
00351 ULXR_TRACE(ULXR_PCHAR("Leaving server thread ")
00352 << std::hex << (void*) td->getHandle() << std::dec);
00353 return 0;
00354 }
00355
00356
00357
00358 #ifdef ULXR_SHOW_TRACE
00359 ULXR_API_IMPL(void) MultiThreadRpcServer::forwardThreadedError(const Exception &ex) const
00360 #else
00361 ULXR_API_IMPL(void) MultiThreadRpcServer::forwardThreadedError(const Exception &) const
00362 #endif
00363 {
00364 ULXR_TRACE(ULXR_CHAR("Threaded error occured: ") << ex.why());
00365 }
00366
00367
00368 ULXR_API_IMPL(void *) MultiThreadRpcServer::startThread(ThreadData *td)
00369 {
00370 ULXR_TRACE(ULXR_PCHAR("startThread ")
00371 << std::hex << (void*) td
00372 << std::dec);
00373 return (void*) (td->getServer())->serverLoop(td->getProtocol(), td);
00374 }
00375
00376
00377 ULXR_API_IMPL(unsigned) MultiThreadRpcServer::dispatchAsync()
00378 {
00379 ULXR_TRACE(ULXR_PCHAR("dispatchAsync()"));
00380 unsigned num_started = 0;
00381
00382 for (unsigned i = 0; i < threads.size(); ++i)
00383 {
00384 ThreadData::handle_t tdh;
00385 #ifdef __unix__
00386 typedef void* (*pthread_sig)(void*);
00387 int result = pthread_create(&tdh, 0, (pthread_sig)startThread, threads[i]);
00388 if (result == 0)
00389 ++num_started;
00390 #elif defined(__WIN32__)
00391 unsigned tid;
00392 typedef unsigned int (__stdcall *thread_sig)(void*);
00393 tdh = (HANDLE)_beginthreadex(0, 16*1024, (thread_sig)startThread,
00394 threads[i], CREATE_SUSPENDED,
00395 &tid );
00396 int resume = ResumeThread(tdh);
00397 if (tdh >= 0 && resume >= 0)
00398 ++num_started;
00399 #else
00400 #error unsupported platform here
00401 #endif
00402 threads[i]->setHandle(tdh);
00403 }
00404 ULXR_TRACE(ULXR_PCHAR("leaving dispatchAsync()"));
00405 return num_started;
00406 }
00407
00408
00409 ULXR_API_IMPL(unsigned) MultiThreadRpcServer::numThreads() const
00410 {
00411 return threads.size();
00412 }
00413
00414
00415 ULXR_API_IMPL(void) MultiThreadRpcServer::terminateAllThreads(unsigned )
00416 {
00417 ULXR_TRACE(ULXR_PCHAR("Request to terminate all threads."));
00418 for (unsigned i1 = 0; i1 < threads.size(); ++i1)
00419 threads[i1]->setTerminate();
00420 }
00421
00422
00423 ULXR_API_IMPL(void) MultiThreadRpcServer::shutdownAllThreads(unsigned )
00424 {
00425 ULXR_TRACE(ULXR_PCHAR("Request to shutdown all threads."));
00426 for (unsigned i1 = 0; i1 < threads.size(); ++i1)
00427 {
00428 threads[i1]->setTerminate();
00429 try
00430 {
00431 #ifdef __WIN32__
00432 threads[i1]->getProtocol()->shutdown(SD_BOTH);
00433 #else
00434 threads[i1]->getProtocol()->shutdown(SHUT_RDWR);
00435 #endif
00436 }
00437 catch(...)
00438 {
00439 }
00440 }
00441 }
00442
00443
00444 ULXR_API_IMPL(void) MultiThreadRpcServer::waitAsync(bool term, bool stat)
00445 {
00446 ULXR_TRACE(ULXR_PCHAR("waitAsync"));
00447
00448 if (threads.size() == 0)
00449 return;
00450
00451 if (term)
00452 terminateAllThreads(1000);
00453
00454 ULXR_TRACE(ULXR_PCHAR("waitAsync: join"));
00455 for (unsigned i = 0; i < threads.size(); ++i)
00456 {
00457 ULXR_TRACE(ULXR_PCHAR(" join " << i));
00458 #ifdef __unix__
00459 void *status;
00460 pthread_join(threads[i]->getHandle(), &status);
00461 #elif defined(__WIN32__)
00462 WaitForSingleObject(threads[i]->getHandle(), INFINITE);
00463 CloseHandle(threads[i]->getHandle());
00464 #else
00465 #error unsupported platform here
00466 #endif
00467
00468 }
00469 if (stat)
00470 printStatistics();
00471
00472 releaseThreads();
00473 }
00474
00475
00476 ULXR_API_IMPL(void) MultiThreadRpcServer::releaseThreads()
00477 {
00478 ULXR_TRACE(ULXR_PCHAR("releaseThreads()"));
00479 for (unsigned i = 0; i < threads.size(); ++i)
00480 {
00481 delete threads[i]->getProtocol();
00482 delete threads[i];
00483 }
00484
00485 threads.clear();
00486 }
00487
00488
00489 ULXR_API_IMPL(void) MultiThreadRpcServer::printStatistics() const
00490 {
00491 for (unsigned i = 0; i < threads.size(); ++i)
00492 ULXR_COUT << ULXR_PCHAR("Thread ")
00493 << std::dec << i
00494 << ULXR_PCHAR(" invoked ")
00495 << threads[i]->numInvoked()
00496 << ULXR_PCHAR(" times.\n");
00497 }
00498
00499
00501
00502
00503 ULXR_API_IMPL0 MultiThreadRpcServer::ThreadData::ThreadData (MultiThreadRpcServer *serv, Protocol *prot)
00504 {
00505 run = true;
00506 handle = 0;
00507 ctrInvoked = 0;
00508 protocol = prot;
00509 server = serv;
00510 }
00511
00512
00513 ULXR_API_IMPL(bool) MultiThreadRpcServer::ThreadData::shouldRun() const
00514 {
00515 return run;
00516 }
00517
00518
00519 ULXR_API_IMPL(void) MultiThreadRpcServer::ThreadData::setTerminate()
00520 {
00521 ULXR_TRACE(ULXR_PCHAR("Request to terminate a single thread."));
00522
00523 run = false;
00524 }
00525
00526
00527 MultiThreadRpcServer::ThreadData::handle_t
00528 ULXR_API_IMPL0 MultiThreadRpcServer::ThreadData::getHandle() const
00529 {
00530 return handle;
00531 }
00532
00533
00534 ULXR_API_IMPL(Protocol *) MultiThreadRpcServer::ThreadData::getProtocol() const
00535 {
00536 return protocol;
00537 }
00538
00539
00540 ULXR_API_IMPL(MultiThreadRpcServer *) MultiThreadRpcServer::ThreadData::getServer() const
00541 {
00542 return server;
00543 }
00544
00545
00546 ULXR_API_IMPL(void) MultiThreadRpcServer::ThreadData::setHandle(handle_t hd)
00547 {
00548 handle = hd;
00549 }
00550
00551
00552 ULXR_API_IMPL(void) MultiThreadRpcServer::ThreadData::incInvoked()
00553 {
00554 ctrInvoked++;
00555 }
00556
00557
00558 ULXR_API_IMPL(unsigned) MultiThreadRpcServer::ThreadData::numInvoked() const
00559 {
00560 return ctrInvoked;
00561 }
00562
00563
00564 }
00565
00566
00567 #endif // ULXR_MULTITHREADED
00568