AgentXcpp  Revision:4ac4848
Internals Documentation
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends
/home/tanjeff/projekte/agentxcpp/src/connector.cpp
Go to the documentation of this file.
00001 /*
00002  * Copyright 2011 Tanjeff-Nicolai Moos <tanjeff@cccmz.de>
00003  *
00004  * This file is part of the agentXcpp library.
00005  *
00006  * AgentXcpp is free software: you can redistribute it and/or modify
00007  * it under the terms of the AgentXcpp library license, version 1, which 
00008  * consists of the GNU General Public License and some additional 
00009  * permissions.
00010  *
00011  * AgentXcpp is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * See the AgentXcpp library license in the LICENSE file of this package 
00017  * for more details.
00018  */
00019 
00020 #include <boost/bind.hpp>
00021 #include "connector.hpp"
00022 #include "helper.hpp"
00023 
00024 
00025 using namespace agentxcpp;
00026 
00027 
00028 /*
00029  ********************************************
00030   Local helper function: read_with_timeout()
00031  ********************************************
00032  */
00033 
00034 
00035 
00036 /**
00037  * \brief Helper type for *_with_timeout() functions.
00038  */
00039 enum status_t {
00040     in_progress = 0,
00041     success     = 1,
00042     fail        = 2
00043 };
00044 
00045 
00046 
00047 /**
00048  * \brief Helper function for *_with_timeout() functions.
00049  */
00050 static void callback(const boost::system::error_code& result,
00051                      status_t* retval)
00052 {
00053     if( result.value() == 0 )
00054     {
00055         // success
00056         *retval = success;
00057     }
00058     else
00059     {
00060         // error
00061         *retval = fail;
00062     }
00063 }
00064 
00065 
00066 
00067 
00068 /**
00069  * \brief Like boost::asio::read(), but with timeout
00070  *
00071  * Calls boost::asio::read(), but provides a timeout in addition.
00072  *
00073  * This function calls s.get_io_service.run_one().
00074  *
00075  * \param s The Stream to read from
00076  *
00077  * \param buffers The buffers to read into
00078  *
00079  * \param timeout The desired timeout in milliseconds
00080  *
00081  * \exception timeout_exception If the timeout expires before the read
00082  *                              operation completes. Some bytes may have been 
00083  *                              read.
00084  *
00085  * \exception network_error If reading failed. Some data may have been read
00086  *                          already or will be read later. Subsequent calls to 
00087  *                          this function will lead to undefined results.
00088  *
00089  * \return How many bytes were read
00090  */
00091 template<typename AsyncReadStream,
00092          typename MutableBufferSequence>
00093 static void read_with_timeout(AsyncReadStream& s,
00094                               const MutableBufferSequence& buffers,
00095                               unsigned int timeout)
00096 {
00097     //
00098     // What this function does:
00099     //
00100     // 1) start timeout timer
00101     // 2) start read
00102     // 3) wait until timer or read completes
00103     // 4) process result
00104     //
00105 
00106     // The timer_result and read_result variables are static because in some 
00107     // circumstances the callback (which manipulates them) might be called 
00108     // after this function returned. We avoid a segfault this way.
00109     static status_t timer_result;
00110     static status_t read_result;
00111     
00112     // 1) Start timeout timer
00113     boost::asio::deadline_timer timer(s.get_io_service());
00114     timer_result = in_progress;
00115     try
00116     {
00117         // throws system_error in boost 1.45.0
00118         timer.expires_from_now( boost::posix_time::milliseconds(timeout) );
00119     }
00120     catch(boost::system::system_error)
00121     {
00122         throw( network_error() );
00123     }
00124     // doesn't throw in boost 1.45.0:
00125     timer.async_wait( boost::bind(callback,
00126                                   boost::asio::placeholders::error,
00127                                   &timer_result) );
00128 
00129     // 2) Start read
00130     read_result = in_progress;
00131     // doesn't throw in boost 1.45.0:
00132     async_read(s,
00133                buffers,
00134                boost::bind(callback,
00135                            boost::asio::placeholders::error,
00136                            &read_result));
00137 
00138     // 3) process asio events until read succeeds or timeout expires
00139     try
00140     {
00141         do
00142         {
00143             // throws system_error in boost 1.45.0:
00144             s.get_io_service().run_one();
00145         }
00146         while(read_result == in_progress && timer_result == in_progress);
00147     }
00148     catch(boost::system::system_error)
00149     {
00150         try
00151         {
00152             // throws system_error in boost 1.45.0
00153             timer.cancel();
00154 
00155             // TODO: How to cancel the async_read operation?
00156         }
00157         catch(boost::system::system_error)
00158         {
00159             // Is the timer uncancelled now? Will it possibly fire our 
00160             // callback? On the other hand, leaving this function will 
00161             // destroy the deadline_timer object anyway.
00162 
00163             // -> ignore
00164         }
00165         throw( network_error() );
00166     }
00167 
00168     // 4) Check read result
00169     switch(read_result)
00170     {
00171         case success:
00172             // Read succeeded: OK
00173             try
00174             {
00175                 // throws system_error in bost 1.45.0:
00176                 timer.cancel();
00177             }
00178             catch(boost::system::system_error)
00179             {
00180                 // Is the timer uncancelled now? Will it possibly fire our 
00181                 // callback? On the other hand, leaving this function will 
00182                 // destroy the deadline_timer object anyway.
00183 
00184                 // -> ignore
00185             }
00186             return;
00187 
00188         case fail:
00189             // read failed: cancel timer, throw exception
00190             try
00191             {
00192                 // throws system_error in bost 1.45.0:
00193                 timer.cancel();
00194             }
00195             catch(boost::system::system_error)
00196             {
00197                 // Is the timer uncancelled now? Will it possibly fire our 
00198                 // callback? On the other hand, leaving this function will 
00199                 // destroy the deadline_timer object anyway.
00200 
00201                 // -> ignore
00202             }
00203             throw( network_error() );
00204 
00205         case in_progress:
00206 
00207             // Look at timer_result:
00208             switch(timer_result)
00209             {
00210                 case success:
00211                     // timer fired while reading
00212                     
00213                     // TODO: how to cancel the async read operation?
00214                     
00215                     throw( timeout_error() );
00216 
00217                 case fail:
00218                     // timer failed while reading
00219                     // what now?
00220                     // I think we should fail with a network error
00221                     
00222                     // TODO: how to cancel the async read operation?
00223                     
00224                     throw( network_error() );
00225 
00226                 case in_progress:
00227                     // It didn't happen -> ignore
00228                     break;
00229             }
00230     }
00231 
00232 }
00233 
00234 
00235 
00236 /**
00237  * \brief Send some data with timeout
00238  *
00239  * \note This function calls s.get_io_service.run_one().
00240  *
00241  * \param s The Stream to send to to
00242  *
00243  * \param buffers The data to send
00244  *
00245  * \param timeout The desired timeout in milliseconds
00246  *
00247  * \exception timeout_error If the timeout expires before the send
00248  *                          operation completes. Some data may have been sent 
00249  *                          already. Subsequent calls to this function will 
00250  *                          lead to undefined results.
00251  *
00252  * \exception network_error If sending failed. Some data may have been sent
00253  *                          already or may still be in the send queue.  
00254  *                          Subsequent calls to this function will lead to 
00255  *                          undefined results.
00256  */
00257 template<typename ConstBufferSequence>
00258 static void send_with_timeout(boost::asio::local::stream_protocol::socket& s,
00259                               const ConstBufferSequence& buffers,
00260                               unsigned timeout)
00261 {
00262     //
00263     // What this function does:
00264     //
00265     // 1) start timeout timer
00266     // 2) start send
00267     // 3) wait until timer or send completes
00268     // 4) process result
00269     //
00270     
00271     // The timer_result and send_result variables are static because in some 
00272     // circumstances the callback (which manipulates them) might be called 
00273     // after this function returned. We avoid a segfault this way.
00274     static status_t timer_result;
00275     static status_t send_result;
00276 
00277 
00278     // 1) Start timeout timer
00279     
00280     // doesn't throw in boost 1.45.0:
00281     boost::asio::deadline_timer timer(s.get_io_service());
00282     timer_result = in_progress;
00283     try
00284     {
00285         // throws system_error in boost 1.45.0
00286         timer.expires_from_now( boost::posix_time::milliseconds(timeout) );
00287     }
00288     catch(boost::system::system_error)
00289     {
00290         throw( network_error() );
00291     }
00292     // doesn't throw in boost 1.45.0:
00293     timer.async_wait( boost::bind(callback,
00294                                   boost::asio::placeholders::error,
00295                                   &timer_result) );
00296 
00297     // 2) Start send
00298     send_result = in_progress;
00299     // doesn't throw in boost 1.45.0:
00300     s.async_send(buffers,
00301                  boost::bind(callback,
00302                              boost::asio::placeholders::error,
00303                              &send_result));
00304 
00305     // 3) process asio events until send succeeds or timeout expires
00306     try
00307     {
00308         do
00309         {
00310             // throws system_error in boost 1.45.0:
00311             s.get_io_service().run_one();
00312         }
00313         while(send_result == in_progress && timer_result == in_progress);
00314     }
00315     catch(boost::system::system_error)
00316     {
00317         try
00318         {
00319             // throws system_error in boost 1.45.0
00320             timer.cancel();
00321 
00322             // TODO: How to cancel the async_send operation?
00323         }
00324         catch(boost::system::system_error)
00325         {
00326             // Is the timer uncancelled now? Will it possibly fire our 
00327             // callback? On the other hand, leaving this function will 
00328             // destroy the deadline_timer object anyway.
00329 
00330             // -> ignore
00331         }
00332         throw( network_error() );
00333     }
00334 
00335     // 4) Check result
00336     switch(send_result)
00337     {
00338         case success:
00339             // Send succeeded:
00340             try
00341             {
00342                 // throws system_error in bost 1.45.0:
00343                 timer.cancel();
00344             }
00345             catch(boost::system::system_error)
00346             {
00347                 // Is the timer uncancelled now? Will it possibly fire our 
00348                 // callback? On the other hand, leaving this function will 
00349                 // destroy the deadline_timer object anyway.
00350 
00351                 // -> ignore
00352             }
00353             return;
00354 
00355         case fail:
00356             // send failed: cancel timer, throw exception
00357             try
00358             {
00359                 // throws system_error in bost 1.45.0:
00360                 timer.cancel();
00361             }
00362             catch(boost::system::system_error)
00363             {
00364                 // Is the timer uncancelled now? Will it possibly fire our 
00365                 // callback? On the other hand, leaving this function will 
00366                 // destroy the deadline_timer object anyway.
00367 
00368                 // -> ignore
00369             }
00370             throw( network_error() );
00371 
00372         case in_progress:
00373 
00374             // Sending still in progress, look at timer_result:
00375             switch(timer_result)
00376             {
00377                 case success:
00378                     // timer fired while reading
00379                     
00380                     // TODO: how to cancel the async send operation?
00381 
00382                     // throw exception
00383                     throw( timeout_error() );
00384 
00385                 case fail:
00386                     // timer failed while sending
00387                     // what now?
00388                     // I think we should fail with a network error
00389                     
00390                     // TODO: how to cancel the async send operation?
00391                     
00392                     throw( network_error() );
00393 
00394                 case in_progress:
00395                     // It didn't happen -> ignore
00396                     break;
00397             }
00398     }
00399 }
00400 
00401 
00402 
00403 /*
00404  ********************************************
00405   Implentation of class connector
00406  ********************************************
00407  */
00408             
00409 
00410 connector::connector(boost::shared_ptr<boost::asio::io_service> io_service,
00411                        const std::string& unix_domain_socket,
00412                        unsigned timeout) :
00413     timeout(timeout),
00414     io_service(io_service),
00415     socket(0),
00416     endpoint(unix_domain_socket.c_str()),
00417     handler(0)
00418 {
00419 }
00420 
00421 void connector::connect()
00422 {
00423     // If currently connected: do nothing
00424     // (is_open() doesn't throw in boost 1.45.0)
00425     if( this->socket )
00426     {
00427         return;
00428     }
00429 
00430     // Connect to endpoint
00431     using boost::asio::local::stream_protocol;
00432     this->socket = new stream_protocol::socket(*io_service);
00433     try
00434     {
00435         this->socket->connect(endpoint);
00436     }
00437     catch(boost::system::system_error)
00438     {
00439         // Could not connect
00440         delete this->socket;
00441         this->socket = 0;
00442         throw(disconnected());
00443     }
00444     
00445     // Set up socket for next read access
00446     // (async_read doesn't throw in boost 1.45.0)
00447     async_read(*this->socket,
00448                boost::asio::buffer(this->header_buf, 20),
00449                boost::bind(&connector::receive_callback,
00450                            this,
00451                            boost::asio::placeholders::error));
00452 }
00453 
00454 void connector::disconnect()
00455 {
00456     // If already disconnected: do nothing
00457     if( this->socket == 0 )
00458     {
00459         return;
00460     }
00461     
00462     // Shutdown socket
00463     try
00464     {
00465 
00466         // Cancel all read and write operations. Called on the recommendation 
00467         // of socket.close() documentation.
00468         // (throws system_error in boost 1.45.0)
00469         socket->shutdown(
00470                 boost::asio::local::stream_protocol::socket::shutdown_both);
00471     }
00472     catch(boost::system::system_error)
00473     {
00474         // ignore errors
00475     }
00476 
00477     // Close socket
00478     try
00479     {
00480         // (throws system_error in boost 1.45.0)
00481         socket->close();
00482     }
00483     catch(boost::system::system_error)
00484     {
00485         // ignore errors
00486     }
00487     
00488     // Finally destroy socket
00489     delete this->socket;
00490     this->socket = 0;
00491 }
00492 
00493 
00494 
00495 
00496 void connector::register_handler( pdu_handler *handler )
00497 {
00498     this->handler = handler;
00499 }
00500 
00501 
00502 void connector::receive_callback(const boost::system::error_code& result)
00503 {
00504     // Check for network errors
00505     if( result.value() != 0 )
00506     {
00507         // result will probably be boost::asio::error::operation_aborted if the 
00508         // socket is explicitly closed. See boost 1.45.0 docs for 
00509         // basic_stream_socket::close().
00510         if( result.value() == boost::asio::error::operation_aborted )
00511         {
00512             // Socket was closed. Nothing to do here.
00513         }
00514         else
00515         {
00516             // async read operation failed
00517             // -> disconnect
00518             this->disconnect();
00519         }
00520 
00521         // Nothing left to do
00522         return;
00523     }
00524 
00525     // Copy header into PDU buffer
00526     data_t buf;
00527     buf.append(this->header_buf, 20);
00528 
00529     // read endianess flag
00530     bool big_endian = ( this->header_buf[2] & (1<<4) ) ? true : false;
00531 
00532     // read payload length
00533     uint32_t payload_length;
00534     data_t::const_iterator pos = buf.begin() + 16;
00535     payload_length = read32(pos, big_endian);
00536     if( payload_length % 4 != 0 )
00537     {
00538         // payload length must be a multiple of 4!
00539         // See RFC 2741, 6.1. "AgentX PDU Header"
00540         // -> close socket
00541         this->disconnect();
00542     }
00543 
00544     // Read the payload (TODO: can we avoid the new() operator?)
00545     byte_t* payload = new byte_t[payload_length];
00546     try
00547     {
00548         read_with_timeout(*this->socket,
00549                           boost::asio::buffer(payload, payload_length),
00550                           this->timeout);
00551     }
00552     catch(...)
00553     {
00554         // Some error occurred, e.g. timeout
00555         // -> disconnect
00556         this->disconnect();
00557         delete[] payload;
00558         return;
00559     }
00560     buf.append(payload, payload_length);
00561     delete[] payload;
00562         
00563     // Parse PDU
00564     shared_ptr<PDU> pdu;
00565     try
00566     {
00567         pdu = PDU::parse_pdu(buf);
00568     }
00569     catch(version_error)
00570     {
00571         // We cannot handle this PDU.
00572         // -> ignore
00573     }
00574     catch(parse_error)
00575     {
00576         // disconnect
00577         this->disconnect();
00578     }
00579 
00580     // Special case: ResponsePDU's
00581     shared_ptr<ResponsePDU> response;
00582     response = boost::dynamic_pointer_cast<ResponsePDU>(pdu);
00583     if(response.get() != 0)
00584     {
00585         // Was a response
00586         std::map< uint32_t, boost::shared_ptr<ResponsePDU> >::iterator i;
00587         i = this->responses.find( response->get_packetID() );
00588         if(i != this->responses.end())
00589         {
00590             // Someone is waiting for this response
00591             i->second = response;
00592         }
00593         else
00594         {
00595             // Nobody was waiting for the response
00596             // -> ignore it
00597         }
00598     }
00599     else
00600     {
00601         // Was not a Response
00602         // -> call handler if available
00603         if( this->handler )
00604         {
00605 
00606             // Call the handler
00607             try
00608             {
00609                 this->handler->handle_pdu(pdu);
00610             }
00611             catch(...)
00612             {
00613                 // discard exceptions from user handler
00614             }
00615         }
00616     }
00617 
00618     // Set up socket for next read access
00619     // (async_read doesn't throw in boost 1.45.0)
00620     async_read(*this->socket,
00621                boost::asio::buffer(this->header_buf, 20),
00622                boost::bind(&connector::receive_callback,
00623                            this,
00624                            boost::asio::placeholders::error));
00625 }
00626 
00627 
00628 
00629 void connector::send(const PDU& pdu)
00630 {
00631     if(this->socket == 0)
00632     {
00633         // We are currently not conected
00634         throw disconnected();
00635     }
00636 
00637     try
00638     {
00639         // throws timeout_error and network_error:
00640         data_t buf = pdu.serialize();
00641         send_with_timeout(*this->socket,
00642                           boost::asio::buffer(buf.data(), buf.size()),
00643                           this->timeout);
00644     }
00645     catch(network_error)
00646     {
00647         // network errors are fatal: disconnect & throw
00648         this->disconnect();
00649         throw disconnected();
00650     }
00651     catch(timeout_error)
00652     {
00653         // No subsequent calls to send_with_timeout() possible -> disconnect.
00654         this->disconnect();
00655         
00656         // forward timeout_error to caller
00657         throw;
00658     }
00659 }
00660 
00661 
00662 
00663 // Helper function for master_proxy::wait_for_response(). See there for an 
00664 // explanation.
00665 static void callback_for_response(const boost::system::error_code& result,
00666                                  status_t* retval)
00667 {
00668     if( result.value() == 0 )
00669     {
00670         // success
00671         *retval = success;
00672     }
00673     else
00674     {
00675         // error
00676         *retval = fail;
00677     }
00678 }
00679 /*
00680  * Timeout handling:
00681  * 
00682  * The timeout handling is realized using a boost::asio::deadline_timer 
00683  * object, which is set up to call the callback_for_response() function when 
00684  * it expires (or when it fails). The variable 'timer_result' is intially set 
00685  * to 'in_progress'.  The callback will set it to 'success' or 'fail' when the 
00686  * timer expires respectively fails.
00687  *
00688  * Then the io_service.run_one() function is invoked repeatedly until either 
00689  * the timer expired or the desired ResponsePDU arrived. Note that 
00690  * io_service.run_one() may service other asynchronous operations first, e.g.  
00691  * a get request.
00692  *
00693  * Finally either the received ResponsePDU is returned or a timeout exception 
00694  * is thrown.
00695  */
00696 boost::shared_ptr<ResponsePDU>
00697 connector::wait_for_response(uint32_t packetID)
00698 {
00699     if(this->socket == 0)
00700     {
00701         // We are currently not conected
00702         throw disconnected();
00703     }
00704 
00705     // Indicate that we are waiting for a specific response:
00706     // We add a null pointer to the map
00707     responses[packetID] = boost::shared_ptr<ResponsePDU>();
00708 
00709     // Start timeout timer
00710     boost::asio::deadline_timer timer(*(this->io_service));
00711     status_t timer_result = in_progress; // callback stores result here
00712     timer.expires_from_now( boost::posix_time::seconds(this->timeout) );
00713     timer.async_wait( boost::bind(callback_for_response,
00714                                   boost::asio::placeholders::error,
00715                                   &timer_result) );
00716 
00717     // process asio events until ResponsePDU arrives or timeout expires
00718     try
00719     {
00720         while(   this->responses[packetID].get() == 0 // no response was stored
00721                  && timer_result == in_progress)
00722         {
00723             this->io_service->run_one();
00724         }
00725     }
00726     catch(boost::system::system_error)
00727     {
00728         // run_one() failed
00729         // -> disconnect and fail
00730         this->disconnect();
00731         throw disconnected();
00732     }
00733 
00734     // Check the result
00735     if( this->responses[packetID].get() != 0 )
00736     {
00737         // ResponsePDU arrived:
00738         // 1. Cancel timer
00739         // 2. Erase response from map
00740         // 3. Return response
00741         timer.cancel();
00742         shared_ptr<ResponsePDU> retval = this->responses[packetID];
00743         this->responses.erase( packetID );
00744         return retval;
00745     }
00746     else
00747     {
00748         // Timer expired or failed before ResponsePDU arrived
00749         throw(timeout_error());
00750     }
00751 }