mprocess_rpc_server.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002           mprocess_rpc_client.cpp.cpp  -  multi process rpc server
00003                              -------------------
00004     begin                : Sun May 29 2005
00005     copyright            : (C) 2005 Dmitry Nizovtsev <funt@alarit.com>
00006                                     Olexander Shtepa <isk@alarit.com>
00007 
00008     $Id: mprocess_rpc_server.cpp 983 2007-07-12 09:52:14Z ewald-arnold $
00009 
00010  ***************************************************************************/
00011 
00012 /**************************************************************************
00013  *
00014  * This program is free software; you can redistribute it and/or modify
00015  * it under the terms of the GNU Lesser General Public License as
00016  * published by the Free Software Foundation; either version 2 of the License,
00017  * or (at your option) any later version.
00018  *
00019  * This program is distributed in the hope that it will be useful,
00020  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00021  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00022  * GNU General Public License for more details.
00023  *
00024  * You should have received a copy of the GNU Lesser General Public License
00025  * along with this program; if not, write to the Free Software
00026  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
00027  *
00028  ***************************************************************************/
00029 
00030 
00031 #define ULXR_NEED_EXPORTS
00032 #include <ulxmlrpcpp/ulxmlrpcpp.h>  // always first header
00033 
00034 #include <ulxmlrpcpp/ulxr_dispatcher.h>
00035 #include <ulxmlrpcpp/ulxr_protocol.h>
00036 #include <ulxmlrpcpp/ulxr_except.h>
00037 #include <ulxmlrpcpp/ulxr_xmlparse.h>
00038 #include <ulxmlrpcpp/ulxr_xmlparse_base.h>
00039 #include <ulxmlrpcpp/ulxr_callparse.h>
00040 #include <ulxmlrpcpp/ulxr_callparse_wb.h>
00041 
00042 #include <iostream>
00043 
00044 #include <sys/types.h>
00045 #include <sys/wait.h>
00046 #include <sys/types.h>
00047 #include <sys/socket.h>
00048 #include <netdb.h>
00049 #include <errno.h>
00050 
00051 #include <ulxmlrpcpp/contrib/mprocess_rpc_server.h>
00052 
00053 
00054 namespace funtik {
00055 
00056 
00057 MultiProcessRpcServerError::MultiProcessRpcServerError(const std::string& what_arg):
00058   _what(what_arg)
00059 {
00060 }
00061 
00062 
00063 MultiProcessRpcServerError::~MultiProcessRpcServerError() throw()
00064 {
00065 
00066 }
00067 
00068 const char*  MultiProcessRpcServerError::what () const throw()
00069 {
00070   return this->_what.c_str();
00071 }
00072 
00073 
00074 
00075 void MultiProcessRpcServer::doChild()
00076 {
00077   ULXR_TRACE(ULXR_PCHAR("doChild"));
00078 
00079   close(m_poDispatcher->getProtocol()->getConnection()->getServerHandle());
00080   ulxr::MethodCall call=handleXmlRequest();
00081 
00082     ulxr::Protocol *poNowProtocol=m_poDispatcher->getProtocol();
00083     ulxr::MethodResponse resp = m_poDispatcher->dispatchCall(call);
00084   storeFuncResult(call,resp);
00085     if (poNowProtocol!=0 && !poNowProtocol->isTransmitOnly())
00086     poNowProtocol->sendRpcResponse(resp,m_wbxml_mode);
00087   if (poNowProtocol!=0 && !poNowProtocol->isPersistent())
00088       poNowProtocol->close();
00089 //finish him!
00090   exit(0);
00091 }
00092 
00093 std::map<pid_t,MultiProcessRpcServer::ProcessContext> MultiProcessRpcServer::m_mapProcesses;
00094 
00095 
00096 void  MultiProcessRpcServer::handleRequest()
00097 {
00098   ULXR_TRACE(ULXR_PCHAR("handleRequest"));
00099 
00100   if(m_poDispatcher.get() == 0)
00101     throw MultiProcessRpcServerError("MultiProcessRpcServer::handleRequest: Dispatcher does not initialize");
00102 
00103   if(m_poDispatcher->getProtocol() == 0)
00104     throw MultiProcessRpcServerError("MultiProcessRpcServer::handleRequest: Protocol does not initialize");
00105 
00106   try{
00107     acceptConnection();
00108 
00109       if((m_maxProcesses != 0) && (m_mapProcesses.size() > m_maxProcesses))
00110         throw ulxr::Exception(ulxr::SystemError, ULXR_PCHAR("MultiProcessRpcServer::handleRequest: Max number of process already started."));
00111 
00112     blockSigchld();
00113 
00114     pid_t ppid=fork();
00115 
00116     if(ppid == -1)
00117           throw ulxr::Exception(ulxr::SystemError,ULXR_PCHAR("MultiProcessRpcServer::handleRequest: Can`t create process for handle request."));
00118 
00119     if(ppid == 0)
00120     {
00121 //close server socket
00122       unblockSigchld();
00123 
00124       try{
00125         doChild();
00126       }
00127       catch(...)
00128       {
00129 //finish him if error!!!
00130         exit(1);
00131       }
00132     }
00133 
00134 //store Process info
00135     if(ppid > 0)
00136       storeProcessData(ppid);
00137 
00138     unblockSigchld();
00139   }
00140   catch(std::exception &ex)
00141   {
00142 //cleaning
00143     unblockSigchld();
00144 
00145     if (!m_poDispatcher->getProtocol()->isPersistent())
00146       m_poDispatcher->getProtocol()->close();
00147 
00148     sweepProcessData();
00149 
00150     throw;
00151   }
00152 //Close client connection
00153   if (!m_poDispatcher->getProtocol()->isPersistent())
00154     m_poDispatcher->getProtocol()->close();
00155 
00156   sweepProcessData();
00157 }
00158 
00159 
00160 void MultiProcessRpcServer::acceptConnection()
00161 {
00162   if (!m_poDispatcher->getProtocol()->isOpen())
00163     m_poDispatcher->getProtocol()->accept();
00164   else
00165     m_poDispatcher->getProtocol()->resetConnection();
00166   m_poDispatcher->getProtocol()->setPersistent(false);
00167 }
00168 
00169 
00170 ulxr::MethodCall MultiProcessRpcServer::handleXmlRequest()
00171 {
00172   char buffer[ULXR_RECV_BUFFER_SIZE];
00173   char *buff_ptr;
00174 
00175   std::auto_ptr<ulxr::XmlParserBase> parser;
00176   ulxr::MethodCallParserBase *cpb = 0;
00177   if (m_wbxml_mode)
00178   {
00179     ULXR_TRACE(ULXR_PCHAR("waitForCall in WBXML"));
00180     ulxr::MethodCallParserWb *cp = new ulxr::MethodCallParserWb();
00181     cpb = cp;
00182     parser.reset(cp);
00183   }
00184   else
00185   {
00186     ULXR_TRACE(ULXR_PCHAR("waitForCall in XML"));
00187     ulxr::MethodCallParser *cp = new ulxr::MethodCallParser();
00188     cpb = cp;
00189     parser.reset(cp);
00190   }
00191 
00192   bool done = false;
00193   long readed;
00194   while (!done && ((readed = m_poDispatcher->getProtocol()->readRaw(buffer, sizeof(buffer))) > 0) )
00195   {
00196     buff_ptr = buffer;
00197 
00198     if (!m_poDispatcher->getProtocol()->hasBytesToRead())
00199 //      || parser->isComplete())
00200       done = true;
00201 
00202     while (readed > 0)
00203     {
00204       ulxr::Protocol::State state = m_poDispatcher->getProtocol()->connectionMachine(buff_ptr, readed);
00205 
00206       if (state == ulxr::Protocol::ConnError)
00207         throw ulxr::ConnectionException(ulxr::TransportError, ulxr_i18n(ULXR_PCHAR("MultiProcessRpcServer::handleXmlRequest: network problem occured")), 500);
00208 
00209       else if (state == ulxr::Protocol::ConnSwitchToBody)
00210       {
00211         if (!m_poDispatcher->getProtocol()->hasBytesToRead())
00212         {
00213 #ifdef ULXR_SHOW_READ
00214           ulxr::Cpp8BitString super_data(buff_ptr, readed);
00215           while ((readed = m_poDispatcher->getProtocol()->getConnection()->read(buffer, sizeof(buffer))) > 0)
00216           super_data.append(buffer, readed);
00217           ULXR_DOUT_READ(ULXR_PCHAR("superdata 1 start:\n")
00218           << ULXR_GET_STRING(super_data)
00219           << ULXR_PCHAR("superdata 1 end:\n" ));
00220 #endif
00221           throw ulxr::ConnectionException(ulxr::NotConformingError,
00222             ulxr_i18n(ULXR_PCHAR("MultiProcessRpcServer::handleXmlRequest: Content-Length of message not available")), 411);
00223         }
00224       }
00225 
00226       else if (state == ulxr::Protocol::ConnBody)
00227       {
00228         ULXR_DOUT_XML(ULXR_GET_STRING(std::string(buff_ptr, readed)));
00229         if (!parser->parse(buff_ptr, readed, done))
00230         {
00231           throw ulxr::XmlException(parser->mapToFaultCode(parser->getErrorCode()),
00232               ulxr_i18n(ULXR_PCHAR("MultiProcessRpcServer::handleXmlRequest: Problem while parsing xml request")),
00233               parser->getCurrentLineNumber(),
00234               ULXR_GET_STRING(parser->getErrorString(parser->getErrorCode())));
00235         }
00236         readed = 0;
00237       }
00238     }
00239   }
00240   return cpb->getMethodCall();
00241 }
00242 
00243 
00244 void  MultiProcessRpcServer::serverLoop()
00245 {
00246   ULXR_TRACE(ULXR_PCHAR("Main process start"));
00247 
00248   while(m_iState==RUN)
00249   {
00250     handleRequest();
00251   }//while
00252   terminateAllProcess(true);
00253 }
00254 
00255 
00256 unsigned  MultiProcessRpcServer::getNumProcesses() const
00257 {
00258   return m_mapProcesses.size();
00259 }
00260 
00261 void  MultiProcessRpcServer::setMaxNumProcesses(unsigned uNumProcesses)
00262 {
00263   m_maxProcesses=uNumProcesses;
00264 }
00265 
00266 
00267 unsigned  MultiProcessRpcServer::getMaxNumProcesses() const
00268 {
00269   return m_maxProcesses;
00270 }
00271 
00272 
00273 bool MultiProcessRpcServer::terminateAllProcess(bool bForce,long lTimeout)
00274 {
00275   if(getNumProcesses() == 0)
00276     return true;
00277 
00278   sweepProcessData();
00279   std::map<pid_t,ProcessContext> mapProcInfo1=getProcInfo();
00280   for(std::map<pid_t,ProcessContext>::iterator it=mapProcInfo1.begin(); it!=mapProcInfo1.end();it++)
00281   {
00282     if(it->second.iState == FINISH)
00283       continue;
00284     if((0 > kill(it->first, SIGTERM)) && (errno!=ESRCH))
00285         continue;
00286   }
00287 
00288 
00289   if(!waitChildren(lTimeout))
00290   {
00291       if(bForce)
00292       {
00293       sweepProcessData();
00294       std::map<pid_t,ProcessContext> mapProcInfo2=getProcInfo();
00295       for(std::map<pid_t,ProcessContext>::iterator it=mapProcInfo2.begin(); it!=mapProcInfo2.end();it++)
00296       {
00297         if(it->second.iState == FINISH)
00298           continue;
00299         if((0 > kill(it->first, SIGKILL)) && (errno!=ESRCH))
00300             continue;
00301       }
00302 
00303       waitChildren();
00304       }
00305       else
00306       return false;
00307   }
00308   return false;
00309 }
00310 
00311 void MultiProcessRpcServer::sweepProcessData()
00312 {
00313   for(std::map<pid_t,ProcessContext>::iterator it=m_mapProcesses.begin(); it!=m_mapProcesses.end();)
00314   {
00315     if(it->second.iState == FINISH)
00316       m_mapProcesses.erase(it++);
00317     else
00318       ++it;
00319   }
00320 
00321 }
00322 
00323 MultiProcessRpcServer::MultiProcessRpcServer(unsigned /*uNumProcess*/,
00324                                              bool bHandleSigchld,
00325                                              bool wbxml_mode)
00326 {
00327   m_poDispatcher.reset(new ulxr::Dispatcher());
00328 
00329   m_bHandleSigchld=bHandleSigchld;
00330 
00331   if(m_bHandleSigchld)
00332     sigchild_register();
00333   m_wbxml_mode=wbxml_mode;
00334 }
00335 
00336 MultiProcessRpcServer::MultiProcessRpcServer(ulxr::Protocol* poProtocol,
00337                                              unsigned /*uNumProcess*/,
00338                                              bool bHandleSigchld,
00339                                              bool wbxml_mode)
00340 {
00341   m_poDispatcher.reset(new ulxr::Dispatcher(poProtocol));
00342 
00343   m_bHandleSigchld=bHandleSigchld; // ea added
00344 
00345   if(m_bHandleSigchld)
00346     sigchild_register();
00347   m_wbxml_mode=wbxml_mode;
00348 }
00349 
00350 MultiProcessRpcServer::~MultiProcessRpcServer()
00351 {
00352   terminateAllProcess(false);
00353 
00354   if(m_bHandleSigchld)
00355     sigchild_unregister();
00356 }
00357 
00358 void MultiProcessRpcServer::sigchild_handler(int /*signal*/)
00359 {
00360   pid_t ppid;
00361   while((ppid=waitpid(0,0,WNOHANG))>0)
00362         MultiProcessRpcServer::m_mapProcesses[ppid].iState=FINISH;
00363 
00364 }
00365 
00366 
00367 void  MultiProcessRpcServer::sigchild_register()
00368 {
00369   struct sigaction sa;
00370   sigemptyset(&sa.sa_mask);
00371   sa.sa_handler=MultiProcessRpcServer::sigchild_handler;
00372   sa.sa_flags=SA_NOCLDSTOP | SA_RESTART;
00373   sigaction(SIGCHLD,&sa,&this->old_sigchld);
00374 }
00375 
00376 void  MultiProcessRpcServer::sigchild_unregister()
00377 {
00378   sigaction(SIGCHLD,&this->old_sigchld,0);
00379 }
00380 
00381 ulxr::Dispatcher* MultiProcessRpcServer::getDispatcher() const
00382 {
00383   return m_poDispatcher.get();
00384 }
00385 
00386 void MultiProcessRpcServer::resetDispatcher(ulxr::Protocol* poProtocol)
00387 {
00388   if(poProtocol != 0)
00389   {
00390     m_poDispatcher.reset();
00391     m_poDispatcher.reset(new ulxr::Dispatcher(poProtocol));
00392   }
00393 
00394   else
00395   {
00396     ulxr::Protocol* tmp_poProtocol=m_poDispatcher->getProtocol();
00397     m_poDispatcher.reset();
00398     m_poDispatcher.reset(new ulxr::Dispatcher(tmp_poProtocol));
00399   }
00400 
00401 }
00402 
00403 void  MultiProcessRpcServer::setState(int iState)
00404 {
00405   m_iState=iState;
00406 }
00407 
00408 
00409 int  MultiProcessRpcServer::getState() const
00410 {
00411   return m_iState;
00412 }
00413 
00414 bool MultiProcessRpcServer::waitConnection(bool bInterruptBySig)
00415 {
00416     fd_set fds;
00417     FD_ZERO(&fds);
00418     FD_SET(m_poDispatcher->getProtocol()->getConnection()->getServerHandle(), &fds);
00419 
00420     int ready;
00421     while((ready = select(m_poDispatcher->getProtocol()->getConnection()->getServerHandle()+1, &fds, 0, 0, 0)) < 0)
00422     {
00423   if(errno == EINTR)
00424     {
00425       if(bInterruptBySig)
00426         return false;
00427 //if was received signal then continue select
00428               continue;
00429     }
00430   else
00431           throw ulxr::ConnectionException(ulxr::SystemError,
00432                 ULXR_PCHAR("MultiProcessRpcServer::waitConnection: Could not perform select() call: ")
00433                  + ulxr::getLastErrorString(errno), 500);
00434     }
00435     if(0 == ready)
00436     return false;
00437 
00438     return true;
00439 }
00440 
00441 void MultiProcessRpcServer::printProcess() const
00442 {
00443 
00444   std::cout<<"All: "<<getNumProcesses()<<std::endl;
00445   std::map<pid_t,ProcessContext> mapProcInfo=getProcInfo();
00446   for(std::map<pid_t,ProcessContext>::iterator it=mapProcInfo.begin(); it!=mapProcInfo.end();it++)
00447     std::cout<<"Pid: "<<it->first<<" Start time: "<<it->second.timeStart<<" State: "
00448       <<it->second.iState<<std::endl;
00449 
00450 }
00451 
00452 
00453 void MultiProcessRpcServer::blockSigchld()
00454 {
00455 //check if sigchld already blocked
00456   sigset_t sigcur_set;
00457   while(sigprocmask(0, 0, &sigcur_set)<0)
00458   {
00459     if(EINTR == errno)
00460       continue;
00461 
00462      throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::blockSigchld: sigprocmask error - ") +
00463                       ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00464   }
00465 
00466   if(sigismember(&sigcur_set,SIGCHLD) != 0)
00467     return;
00468 
00469   sigset_t sigchild_set;
00470   sigemptyset( &sigchild_set );
00471   sigaddset( &sigchild_set, SIGCHLD);
00472 
00473   while(sigprocmask(SIG_BLOCK, &sigchild_set, 0)<0)
00474   {
00475     if(EINTR == errno)
00476       continue;
00477 
00478      throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::blockSigchld: sigprocmask error - ") +
00479                       ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00480   }
00481 
00482 }
00483 
00484 void MultiProcessRpcServer::unblockSigchld()
00485 {
00486 //check if sigchld already unblocked
00487   sigset_t sigcur_set;
00488   while(sigprocmask(0, 0, &sigcur_set)<0)
00489   {
00490     if(EINTR == errno)
00491       continue;
00492 
00493      throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::unblockSigchld: sigprocmask error - ") +
00494                       ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00495   }
00496 
00497   if(sigismember(&sigcur_set,SIGCHLD) == 0)
00498     return;
00499 
00500   sigset_t sigchild_set;
00501 
00502   sigemptyset( &sigchild_set );
00503   sigaddset( &sigchild_set, SIGCHLD);
00504 
00505 
00506   while(sigprocmask(SIG_UNBLOCK, &sigchild_set, 0)<0)
00507   {
00508     if(EINTR == errno)
00509       continue;
00510      throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::unblockSigchld: sigprocmask error - ") +
00511                       ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00512   }
00513 }
00514 
00515 void MultiProcessRpcServer::storeProcessData(pid_t pid)
00516 {
00517   m_mapProcesses[pid]=(ProcessContext){time(0),RUN};
00518 }
00519 
00520 std::map<pid_t,MultiProcessRpcServer::ProcessContext> MultiProcessRpcServer::getProcInfo() const
00521 {
00522   return m_mapProcesses;
00523 }
00524 
00525 bool MultiProcessRpcServer::waitChildren(long lTimeout) const
00526 {
00527     lTimeout*=1000;
00528     int wait_flag=lTimeout>0?WNOHANG:0;
00529     long delta=lTimeout>0?lTimeout/4:0;
00530     while(true)
00531     {
00532         int ret,status;
00533     ret=wait3(&status,wait_flag,0);
00534 
00535     if(ret>0)
00536         continue;
00537 
00538     if(ret == -1)
00539     {
00540         if(errno==EINTR)
00541         continue;
00542         if(errno==ECHILD)
00543         return true;
00544 
00545         throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::waitChildren: wait3 error - ") +
00546                         ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00547     }
00548 
00549     if(ret==0 && lTimeout>0)
00550     {
00551         timeval timeout={0,delta};
00552         int sel_ret= select(0,0,0,0,&timeout);
00553         if(sel_ret==-1)
00554       if(errno==EINTR)
00555             continue;
00556       else
00557           throw MultiProcessRpcServerError(std::string("MultiProcessRpcServer::waitChildren: select error - ") +
00558                       ulxr::getLatin1(ulxr::getLastErrorString(errno)));
00559 
00560         if(sel_ret==0)
00561         {
00562         lTimeout-=delta;
00563         continue;
00564         }
00565     }
00566 
00567 
00568     return false;
00569     }
00570     return false;
00571 }
00572 
00573 void MultiProcessRpcServer::storeFuncResult(const ulxr::MethodCall& callMethod,const ulxr::MethodResponse& respMethod) const
00574 {
00575 
00576   std::cout<<"Peer name: ";
00577   struct sockaddr name;
00578   socklen_t namelen=sizeof(name);
00579 
00580   if(!getpeername(m_poDispatcher->getProtocol()->getConnection()->getServerHandle(),&name,&namelen)==0)
00581     std::cout << ulxr::getLatin1(ulxr::getLastErrorString(errno))<<"; ";
00582   else
00583   {
00584 #ifdef HAVE_GETNAMEINFO
00585     char hbuf[NI_MAXHOST],
00586          sbuf[NI_MAXSERV];
00587 
00588 //    if (getnameinfo(&name, name.sa_len, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),NI_NUMERICSERV)!=0)
00589     if (getnameinfo(&name, namelen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),NI_NUMERICSERV)!=0)
00590       std::cout<<"unknow; ";
00591     else
00592       std::cout<<hbuf<<":"<<sbuf<<"; ";
00593 #endif
00594   }
00595   std::cout<<"Call method: "<<ulxr::getLatin1(callMethod.getMethodName())<<"; ";
00596   std::cout<<"Method response: "<<(respMethod.isOK()?"ok":"bad")<<std::endl;
00597 }
00598 
00599 
00600 } // namespace funtik

Generated on Sun Aug 19 20:08:57 2007 for ulxmlrpcpp by  doxygen 1.5.1