00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #define ULXR_NEED_EXPORTS
00032 #include <ulxmlrpcpp/ulxmlrpcpp.h>
00033
00034 #include <ulxmlrpcpp/ulxr_dispatcher.h>
00035 #include <ulxmlrpcpp/ulxr_protocol.h>
00036 #include <ulxmlrpcpp/ulxr_except.h>
00037 #include <ulxmlrpcpp/ulxr_xmlparse.h>
00038 #include <ulxmlrpcpp/ulxr_xmlparse_base.h>
00039 #include <ulxmlrpcpp/ulxr_callparse.h>
00040 #include <ulxmlrpcpp/ulxr_callparse_wb.h>
00041
00042 #include <iostream>
00043
00044 #include <sys/types.h>
00045 #include <sys/wait.h>
00046 #include <sys/types.h>
00047 #include <sys/socket.h>
00048 #include <netdb.h>
00049 #include <errno.h>
00050
00051 #include <ulxmlrpcpp/contrib/mprocess_rpc_server.h>
00052
00053
00054 namespace funtik {
00055
00056
00057 MultiProcessRpcServerError::MultiProcessRpcServerError(const std::string& what_arg):
00058 _what(what_arg)
00059 {
00060 }
00061
00062
00063 MultiProcessRpcServerError::~MultiProcessRpcServerError() throw()
00064 {
00065
00066 }
00067
00068 const char* MultiProcessRpcServerError::what () const throw()
00069 {
00070 return this->_what.c_str();
00071 }
00072
00073
00074
00075 void MultiProcessRpcServer::doChild()
00076 {
00077 ULXR_TRACE(ULXR_PCHAR("doChild"));
00078
00079 close(m_poDispatcher->getProtocol()->getConnection()->getServerHandle());
00080 ulxr::MethodCall call=handleXmlRequest();
00081
00082 ulxr::Protocol *poNowProtocol=m_poDispatcher->getProtocol();
00083 ulxr::MethodResponse resp = m_poDispatcher->dispatchCall(call);
00084 storeFuncResult(call,resp);
00085 if (poNowProtocol!=0 && !poNowProtocol->isTransmitOnly())
00086 poNowProtocol->sendRpcResponse(resp,m_wbxml_mode);
00087 if (poNowProtocol!=0 && !poNowProtocol->isPersistent())
00088 poNowProtocol->close();
00089
00090 exit(0);
00091 }
00092
00093 std::map<pid_t,MultiProcessRpcServer::ProcessContext> MultiProcessRpcServer::m_mapProcesses;
00094
00095
00096 void MultiProcessRpcServer::handleRequest()
00097 {
00098 ULXR_TRACE(ULXR_PCHAR("handleRequest"));
00099
00100 if(m_poDispatcher.get() == 0)
00101 throw MultiProcessRpcServerError("MultiProcessRpcServer::handleRequest: Dispatcher does not initialize");
00102
00103 if(m_poDispatcher->getProtocol() == 0)
00104 throw MultiProcessRpcServerError("MultiProcessRpcServer::handleRequest: Protocol does not initialize");
00105
00106 try{
00107 acceptConnection();
00108
00109 if((m_maxProcesses != 0) && (m_mapProcesses.size() > m_maxProcesses))
00110 throw ulxr::Exception(ulxr::SystemError, ULXR_PCHAR("MultiProcessRpcServer::handleRequest: Max number of process already started."));
00111
00112 blockSigchld();
00113
00114 pid_t ppid=fork();
00115
00116 if(ppid == -1)
00117 throw ulxr::Exception(ulxr::SystemError,ULXR_PCHAR("MultiProcessRpcServer::handleRequest: Can`t create process for handle request."));
00118
00119 if(ppid == 0)
00120 {
00121
00122 unblockSigchld();
00123
00124 try{
00125 doChild();
00126 }
00127 catch(...)
00128 {
00129
00130 exit(1);
00131 }
00132 }
00133
00134
00135 if(ppid > 0)
00136 storeProcessData(ppid);
00137
00138 unblockSigchld();
00139 }
00140 catch(std::exception &ex)
00141 {
00142
00143 unblockSigchld();
00144
00145 if (!m_poDispatcher->getProtocol()->isPersistent())
00146 m_poDispatcher->getProtocol()->close();
00147
00148 sweepProcessData();
00149
00150 throw;
00151 }
00152
00153 if (!m_poDispatcher->getProtocol()->isPersistent())
00154 m_poDispatcher->getProtocol()->close();
00155
00156 sweepProcessData();
00157 }
00158
00159
00160 void MultiProcessRpcServer::acceptConnection()
00161 {
00162 if (!m_poDispatcher->getProtocol()->isOpen())
00163 m_poDispatcher->getProtocol()->accept();
00164 else
00165 m_poDispatcher->getProtocol()->resetConnection();
00166 m_poDispatcher->getProtocol()->setPersistent(false);
00167 }
00168
00169
00170 ulxr::MethodCall MultiProcessRpcServer::handleXmlRequest()
00171 {
00172 char buffer[ULXR_RECV_BUFFER_SIZE];
00173 char *buff_ptr;
00174
00175 std::auto_ptr<ulxr::XmlParserBase> parser;
00176 ulxr::MethodCallParserBase *cpb = 0;
00177 if (m_wbxml_mode)
00178 {
00179 ULXR_TRACE(ULXR_PCHAR("waitForCall in WBXML"));
00180 ulxr::MethodCallParserWb *cp = new ulxr::MethodCallParserWb();
00181 cpb = cp;
00182 parser.reset(cp);
00183 }
00184 else
00185 {
00186 ULXR_TRACE(ULXR_PCHAR("waitForCall in XML"));
00187 ulxr::MethodCallParser *cp = new ulxr::MethodCallParser();
00188 cpb = cp;
00189 parser.reset(cp);
00190 }
00191
00192 bool done = false;
00193 long readed;
00194 while (!done && ((readed = m_poDispatcher->getProtocol()->readRaw(buffer, sizeof(buffer))) > 0) )
00195 {
00196 buff_ptr = buffer;
00197
00198 if (!m_poDispatcher->getProtocol()->hasBytesToRead())
00199
00200 done = true;
00201
00202 while (readed > 0)
00203 {
00204 ulxr::Protocol::State state = m_poDispatcher->getProtocol()->connectionMachine(buff_ptr, readed);
00205
00206 if (state == ulxr::Protocol::ConnError)
00207 throw ulxr::ConnectionException(ulxr::TransportError, ulxr_i18n(ULXR_PCHAR("MultiProcessRpcServer::handleXmlRequest: network problem occured")), 500);
00208
00209 else if (state == ulxr::Protocol::ConnSwitchToBody)
00210 {
00211 if (!m_poDispatcher->getProtocol()->hasBytesToRead())
00212 {
00213 #ifdef ULXR_SHOW_READ
00214 ulxr::Cpp8BitString super_data(buff_ptr, readed);
00215 while ((readed = m_poDispatcher->getProtocol()->getConnection()->read(buffer, sizeof(buffer))) > 0)
00216 super_data.append(buffer, readed);
00217 ULXR_DOUT_READ(ULXR_PCHAR("superdata 1 start:\n")
00218 << ULXR_GET_STRING(super_data)
00219 << ULXR_PCHAR("superdata 1 end:\n" ));
00220 #endif
00221 throw ulxr::ConnectionException(ulxr::NotConformingError,
00222 ulxr_i18n(ULXR_PCHAR("MultiProcessRpcServer::handleXmlRequest: Content-Length of message not available")), 411);
00223 }
00224 }
00225
00226 else if (state == ulxr::Protocol::ConnBody)
00227 {
00228 ULXR_DOUT_XML(ULXR_GET_STRING(std::string(buff_ptr, readed)));
00229 if (!parser->parse(buff_ptr, readed, done))
00230 {
00231 throw ulxr::XmlException(parser->mapToFaultCode(parser->getErrorCode()),
00232 ulxr_i18n(ULXR_PCHAR("MultiProcessRpcServer::handleXmlRequest: Problem while parsing xml request")),
00233 parser->getCurrentLineNumber(),
00234 ULXR_GET_STRING(parser->getErrorString(parser->getErrorCode())));
00235 }
00236 readed = 0;
00237 }
00238 }
00239 }
00240 return cpb->getMethodCall();
00241 }
00242
00243
00244 void MultiProcessRpcServer::serverLoop()
00245 {
00246 ULXR_TRACE(ULXR_PCHAR("Main process start"));
00247
00248 while(m_iState==RUN)
00249 {
00250 handleRequest();
00251 }
00252 terminateAllProcess(true);
00253 }
00254
00255
00256 unsigned MultiProcessRpcServer::getNumProcesses() const
00257 {
00258 return m_mapProcesses.size();
00259 }
00260
00261 void MultiProcessRpcServer::setMaxNumProcesses(unsigned uNumProcesses)
00262 {
00263 m_maxProcesses=uNumProcesses;
00264 }
00265
00266
00267 unsigned MultiProcessRpcServer::getMaxNumProcesses() const
00268 {
00269 return m_maxProcesses;
00270 }
00271
00272
00273 bool MultiProcessRpcServer::terminateAllProcess(bool bForce,long lTimeout)
00274 {
00275 if(getNumProcesses() == 0)
00276 return true;
00277
00278 sweepProcessData();
00279 std::map<pid_t,ProcessContext> mapProcInfo1=getProcInfo();
00280 for(std::map<pid_t,ProcessContext>::iterator it=mapProcInfo1.begin(); it!=mapProcInfo1.end();it++)
00281 {
00282 if(it->second.iState == FINISH)
00283 continue;
00284 if((0 > kill(it->first, SIGTERM)) && (errno!=ESRCH))
00285 continue;
00286 }
00287
00288
00289 if(!waitChildren(lTimeout))
00290 {
00291 if(bForce)
00292 {
00293 sweepProcessData();
00294 std::map<pid_t,ProcessContext> mapProcInfo2=getProcInfo();
00295 for(std::map<pid_t,ProcessContext>::iterator it=mapProcInfo2.begin(); it!=mapProcInfo2.end();it++)
00296 {
00297 if(it->second.iState == FINISH)
00298 continue;
00299 if((0 > kill(it->first, SIGKILL)) && (errno!=ESRCH))
00300 continue;
00301 }
00302
00303 waitChildren();
00304 }
00305 else
00306 return false;
00307 }
00308 return false;
00309 }
00310
00311 void MultiProcessRpcServer::sweepProcessData()
00312 {
00313 for(std::map<pid_t,ProcessContext>::iterator it=m_mapProcesses.begin(); it!=m_mapProcesses.end();)
00314 {
00315 if(it->second.iState == FINISH)
00316 m_mapProcesses.erase(it++);
00317 else
00318 ++it;
00319 }
00320
00321 }
00322
00323 MultiProcessRpcServer::MultiProcessRpcServer(unsigned ,
00324 bool bHandleSigchld,
00325 bool wbxml_mode)
00326 {
00327 m_poDispatcher.reset(new ulxr::Dispatcher());
00328
00329 m_bHandleSigchld=bHandleSigchld;
00330
00331 if(m_bHandleSigchld)
00332 sigchild_register();
00333 m_wbxml_mode=wbxml_mode;
00334 }
00335
00336 MultiProcessRpcServer::MultiProcessRpcServer(ulxr::Protocol* poProtocol,
00337 unsigned ,
00338 bool bHandleSigchld,
00339 bool wbxml_mode)
00340 {
00341 m_poDispatcher.reset(new ulxr::Dispatcher(poProtocol));
00342
00343 m_bHandleSigchld=bHandleSigchld;
00344
00345 if(m_bHandleSigchld)
00346 sigchild_register();
00347 m_wbxml_mode=wbxml_mode;
00348 }
00349
00350 MultiProcessRpcServer::~MultiProcessRpcServer()
00351 {
00352 terminateAllProcess(false);
00353
00354 if(m_bHandleSigchld)
00355 sigchild_unregister();
00356 }
00357
00358 void MultiProcessRpcServer::sigchild_handler(int )
00359 {
00360 pid_t ppid;
00361 while((ppid=waitpid(0,0,WNOHANG))>0)
00362 MultiProcessRpcServer::m_mapProcesses[ppid].iState=FINISH;
00363
00364 }
00365
00366
00367 void MultiProcessRpcServer::sigchild_register()
00368 {
00369 struct sigaction sa;
00370 sigemptyset(&sa.sa_mask);
00371 sa.sa_handler=MultiProcessRpcServer::sigchild_handler;
00372 sa.sa_flags=SA_NOCLDSTOP | SA_RESTART;
00373 sigaction(SIGCHLD,&sa,&this->old_sigchld);
00374 }
00375
00376 void MultiProcessRpcServer::sigchild_unregister()
00377 {
00378 sigaction(SIGCHLD,&this->old_sigchld,0);
00379 }
00380
00381 ulxr::Dispatcher* MultiProcessRpcServer::getDispatcher() const
00382 {
00383 return m_poDispatcher.get();
00384 }
00385
00386 void MultiProcessRpcServer::resetDispatcher(ulxr::Protocol* poProtocol)
00387 {
00388 if(poProtocol != 0)
00389 {
00390 m_poDispatcher.reset();
00391 m_poDispatcher.reset(new ulxr::Dispatcher(poProtocol));
00392 }
00393
00394 else
00395 {
00396 ulxr::Protocol* tmp_poProtocol=m_poDispatcher->getProtocol();
00397 m_poDispatcher.reset();
00398 m_poDispatcher.reset(new ulxr::Dispatcher(tmp_poProtocol));
00399 }
00400
00401 }
00402
00403 void MultiProcessRpcServer::setState(int iState)
00404 {
00405 m_iState=iState;
00406 }
00407
00408
00409 int MultiProcessRpcServer::getState() const
00410 {
00411 return m_iState;
00412 }
00413
00414 bool MultiProcessRpcServer::waitConnection(bool bInterruptBySig)
00415 {
00416 fd_set fds;
00417 FD_ZERO(&fds);
00418 FD_SET(m_poDispatcher->getProtocol()->getConnection()->getServerHandle(), &fds);
00419
00420 int ready;
00421 while((ready = select(m_poDispatcher->getProtocol()->getConnection()->getServerHandle()+1, &fds, 0, 0, 0)) < 0)
00422 {
00423 if(errno == EINTR)
00424 {
00425 if(bInterruptBySig)
00426 return false;
00427
00428 continue;
00429 }
00430 else
00431 throw ulxr::ConnectionException(ulxr::SystemError,
00432 ULXR_PCHAR("MultiProcessRpcServer::waitConnection: Could not perform select() call: ")
00433 + ulxr::getLastErrorString(errno), 500);
00434 }
00435 if(0 == ready)
00436 return false;
00437
00438 return true;
00439 }
00440
00441 void MultiProcessRpcServer::printProcess() const
00442 {
00443
00444 std::cout<<"All: "<<getNumProcesses()<<std::endl;
00445 std::map<pid_t,ProcessContext> mapProcInfo=getProcInfo();
00446 for(std::map<pid_t,ProcessContext>::iterator it=mapProcInfo.begin(); it!=mapProcInfo.end();it++)
00447 std::cout<<"Pid: "<<it->first<<" Start time: "<<it->second.timeStart<<" State: "
00448 <<it->second.iState<<std::endl;
00449
00450 }
00451
00452
00453 void MultiProcessRpcServer::blockSigchld()
00454 {
00455
00456 sigset_t sigcur_set;
00457 while(sigprocmask(0, 0, &sigcur_set)<0)
00458 {
00459 if(EINTR == errno)
00460 continue;
00461
00462 throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::blockSigchld: sigprocmask error - ") +
00463 ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00464 }
00465
00466 if(sigismember(&sigcur_set,SIGCHLD) != 0)
00467 return;
00468
00469 sigset_t sigchild_set;
00470 sigemptyset( &sigchild_set );
00471 sigaddset( &sigchild_set, SIGCHLD);
00472
00473 while(sigprocmask(SIG_BLOCK, &sigchild_set, 0)<0)
00474 {
00475 if(EINTR == errno)
00476 continue;
00477
00478 throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::blockSigchld: sigprocmask error - ") +
00479 ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00480 }
00481
00482 }
00483
00484 void MultiProcessRpcServer::unblockSigchld()
00485 {
00486
00487 sigset_t sigcur_set;
00488 while(sigprocmask(0, 0, &sigcur_set)<0)
00489 {
00490 if(EINTR == errno)
00491 continue;
00492
00493 throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::unblockSigchld: sigprocmask error - ") +
00494 ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00495 }
00496
00497 if(sigismember(&sigcur_set,SIGCHLD) == 0)
00498 return;
00499
00500 sigset_t sigchild_set;
00501
00502 sigemptyset( &sigchild_set );
00503 sigaddset( &sigchild_set, SIGCHLD);
00504
00505
00506 while(sigprocmask(SIG_UNBLOCK, &sigchild_set, 0)<0)
00507 {
00508 if(EINTR == errno)
00509 continue;
00510 throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::unblockSigchld: sigprocmask error - ") +
00511 ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00512 }
00513 }
00514
00515 void MultiProcessRpcServer::storeProcessData(pid_t pid)
00516 {
00517 m_mapProcesses[pid]=(ProcessContext){time(0),RUN};
00518 }
00519
00520 std::map<pid_t,MultiProcessRpcServer::ProcessContext> MultiProcessRpcServer::getProcInfo() const
00521 {
00522 return m_mapProcesses;
00523 }
00524
00525 bool MultiProcessRpcServer::waitChildren(long lTimeout) const
00526 {
00527 lTimeout*=1000;
00528 int wait_flag=lTimeout>0?WNOHANG:0;
00529 long delta=lTimeout>0?lTimeout/4:0;
00530 while(true)
00531 {
00532 int ret,status;
00533 ret=wait3(&status,wait_flag,0);
00534
00535 if(ret>0)
00536 continue;
00537
00538 if(ret == -1)
00539 {
00540 if(errno==EINTR)
00541 continue;
00542 if(errno==ECHILD)
00543 return true;
00544
00545 throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::waitChildren: wait3 error - ") +
00546 ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00547 }
00548
00549 if(ret==0 && lTimeout>0)
00550 {
00551 timeval timeout={0,delta};
00552 int sel_ret= select(0,0,0,0,&timeout);
00553 if(sel_ret==-1)
00554 if(errno==EINTR)
00555 continue;
00556 else
00557 throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::waitChildren: select error - ") +
00558 ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00559
00560 if(sel_ret==0)
00561 {
00562 lTimeout-=delta;
00563 continue;
00564 }
00565 }
00566
00567
00568 return false;
00569 }
00570 return false;
00571 }
00572
00573 void MultiProcessRpcServer::storeFuncResult(const ulxr::MethodCall& callMethod,const ulxr::MethodResponse& respMethod) const
00574 {
00575
00576 std::cout<<"Peer name: ";
00577 struct sockaddr name;
00578 socklen_t namelen=sizeof(name);
00579
00580 if(!getpeername(m_poDispatcher->getProtocol()->getConnection()->getServerHandle(),&name,&namelen)==0)
00581 std::cout << ulxr::getLatin1(ulxr::getLastErrorString(errno))<<"; ";
00582 else
00583 {
00584 #ifdef HAVE_GETNAMEINFO
00585 char hbuf[NI_MAXHOST],
00586 sbuf[NI_MAXSERV];
00587
00588
00589 if (getnameinfo(&name, namelen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),NI_NUMERICSERV)!=0)
00590 std::cout<<"unknow; ";
00591 else
00592 std::cout<<hbuf<<":"<<sbuf<<"; ";
00593 #endif
00594 }
00595 std::cout<<"Call method: "<<ulxr::getLatin1(callMethod.getMethodName())<<"; ";
00596 std::cout<<"Method response: "<<(respMethod.isOK()?"ok":"bad")<<std::endl;
00597 }
00598
00599
00600 }