00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <boost/bind.hpp>
00021 #include "connector.hpp"
00022 #include "helper.hpp"
00023
00024
00025 using namespace agentxcpp;
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 enum status_t {
00040 in_progress = 0,
00041 success = 1,
00042 fail = 2
00043 };
00044
00045
00046
00047
00048
00049
00050 static void callback(const boost::system::error_code& result,
00051 status_t* retval)
00052 {
00053 if( result.value() == 0 )
00054 {
00055
00056 *retval = success;
00057 }
00058 else
00059 {
00060
00061 *retval = fail;
00062 }
00063 }
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091 template<typename AsyncReadStream,
00092 typename MutableBufferSequence>
00093 static void read_with_timeout(AsyncReadStream& s,
00094 const MutableBufferSequence& buffers,
00095 unsigned int timeout)
00096 {
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109 static status_t timer_result;
00110 static status_t read_result;
00111
00112
00113 boost::asio::deadline_timer timer(s.get_io_service());
00114 timer_result = in_progress;
00115 try
00116 {
00117
00118 timer.expires_from_now( boost::posix_time::milliseconds(timeout) );
00119 }
00120 catch(boost::system::system_error)
00121 {
00122 throw( network_error() );
00123 }
00124
00125 timer.async_wait( boost::bind(callback,
00126 boost::asio::placeholders::error,
00127 &timer_result) );
00128
00129
00130 read_result = in_progress;
00131
00132 async_read(s,
00133 buffers,
00134 boost::bind(callback,
00135 boost::asio::placeholders::error,
00136 &read_result));
00137
00138
00139 try
00140 {
00141 do
00142 {
00143
00144 s.get_io_service().run_one();
00145 }
00146 while(read_result == in_progress && timer_result == in_progress);
00147 }
00148 catch(boost::system::system_error)
00149 {
00150 try
00151 {
00152
00153 timer.cancel();
00154
00155
00156 }
00157 catch(boost::system::system_error)
00158 {
00159
00160
00161
00162
00163
00164 }
00165 throw( network_error() );
00166 }
00167
00168
00169 switch(read_result)
00170 {
00171 case success:
00172
00173 try
00174 {
00175
00176 timer.cancel();
00177 }
00178 catch(boost::system::system_error)
00179 {
00180
00181
00182
00183
00184
00185 }
00186 return;
00187
00188 case fail:
00189
00190 try
00191 {
00192
00193 timer.cancel();
00194 }
00195 catch(boost::system::system_error)
00196 {
00197
00198
00199
00200
00201
00202 }
00203 throw( network_error() );
00204
00205 case in_progress:
00206
00207
00208 switch(timer_result)
00209 {
00210 case success:
00211
00212
00213
00214
00215 throw( timeout_error() );
00216
00217 case fail:
00218
00219
00220
00221
00222
00223
00224 throw( network_error() );
00225
00226 case in_progress:
00227
00228 break;
00229 }
00230 }
00231
00232 }
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257 template<typename ConstBufferSequence>
00258 static void send_with_timeout(boost::asio::local::stream_protocol::socket& s,
00259 const ConstBufferSequence& buffers,
00260 unsigned timeout)
00261 {
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274 static status_t timer_result;
00275 static status_t send_result;
00276
00277
00278
00279
00280
00281 boost::asio::deadline_timer timer(s.get_io_service());
00282 timer_result = in_progress;
00283 try
00284 {
00285
00286 timer.expires_from_now( boost::posix_time::milliseconds(timeout) );
00287 }
00288 catch(boost::system::system_error)
00289 {
00290 throw( network_error() );
00291 }
00292
00293 timer.async_wait( boost::bind(callback,
00294 boost::asio::placeholders::error,
00295 &timer_result) );
00296
00297
00298 send_result = in_progress;
00299
00300 s.async_send(buffers,
00301 boost::bind(callback,
00302 boost::asio::placeholders::error,
00303 &send_result));
00304
00305
00306 try
00307 {
00308 do
00309 {
00310
00311 s.get_io_service().run_one();
00312 }
00313 while(send_result == in_progress && timer_result == in_progress);
00314 }
00315 catch(boost::system::system_error)
00316 {
00317 try
00318 {
00319
00320 timer.cancel();
00321
00322
00323 }
00324 catch(boost::system::system_error)
00325 {
00326
00327
00328
00329
00330
00331 }
00332 throw( network_error() );
00333 }
00334
00335
00336 switch(send_result)
00337 {
00338 case success:
00339
00340 try
00341 {
00342
00343 timer.cancel();
00344 }
00345 catch(boost::system::system_error)
00346 {
00347
00348
00349
00350
00351
00352 }
00353 return;
00354
00355 case fail:
00356
00357 try
00358 {
00359
00360 timer.cancel();
00361 }
00362 catch(boost::system::system_error)
00363 {
00364
00365
00366
00367
00368
00369 }
00370 throw( network_error() );
00371
00372 case in_progress:
00373
00374
00375 switch(timer_result)
00376 {
00377 case success:
00378
00379
00380
00381
00382
00383 throw( timeout_error() );
00384
00385 case fail:
00386
00387
00388
00389
00390
00391
00392 throw( network_error() );
00393
00394 case in_progress:
00395
00396 break;
00397 }
00398 }
00399 }
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410 connector::connector(boost::shared_ptr<boost::asio::io_service> io_service,
00411 const std::string& unix_domain_socket,
00412 unsigned timeout) :
00413 timeout(timeout),
00414 io_service(io_service),
00415 socket(0),
00416 endpoint(unix_domain_socket.c_str()),
00417 handler(0)
00418 {
00419 }
00420
00421 void connector::connect()
00422 {
00423
00424
00425 if( this->socket )
00426 {
00427 return;
00428 }
00429
00430
00431 using boost::asio::local::stream_protocol;
00432 this->socket = new stream_protocol::socket(*io_service);
00433 try
00434 {
00435 this->socket->connect(endpoint);
00436 }
00437 catch(boost::system::system_error)
00438 {
00439
00440 delete this->socket;
00441 this->socket = 0;
00442 throw(disconnected());
00443 }
00444
00445
00446
00447 async_read(*this->socket,
00448 boost::asio::buffer(this->header_buf, 20),
00449 boost::bind(&connector::receive_callback,
00450 this,
00451 boost::asio::placeholders::error));
00452 }
00453
00454 void connector::disconnect()
00455 {
00456
00457 if( this->socket == 0 )
00458 {
00459 return;
00460 }
00461
00462
00463 try
00464 {
00465
00466
00467
00468
00469 socket->shutdown(
00470 boost::asio::local::stream_protocol::socket::shutdown_both);
00471 }
00472 catch(boost::system::system_error)
00473 {
00474
00475 }
00476
00477
00478 try
00479 {
00480
00481 socket->close();
00482 }
00483 catch(boost::system::system_error)
00484 {
00485
00486 }
00487
00488
00489 delete this->socket;
00490 this->socket = 0;
00491 }
00492
00493
00494
00495
00496 void connector::register_handler( pdu_handler *handler )
00497 {
00498 this->handler = handler;
00499 }
00500
00501
00502 void connector::receive_callback(const boost::system::error_code& result)
00503 {
00504
00505 if( result.value() != 0 )
00506 {
00507
00508
00509
00510 if( result.value() == boost::asio::error::operation_aborted )
00511 {
00512
00513 }
00514 else
00515 {
00516
00517
00518 this->disconnect();
00519 }
00520
00521
00522 return;
00523 }
00524
00525
00526 data_t buf;
00527 buf.append(this->header_buf, 20);
00528
00529
00530 bool big_endian = ( this->header_buf[2] & (1<<4) ) ? true : false;
00531
00532
00533 uint32_t payload_length;
00534 data_t::const_iterator pos = buf.begin() + 16;
00535 payload_length = read32(pos, big_endian);
00536 if( payload_length % 4 != 0 )
00537 {
00538
00539
00540
00541 this->disconnect();
00542 }
00543
00544
00545 byte_t* payload = new byte_t[payload_length];
00546 try
00547 {
00548 read_with_timeout(*this->socket,
00549 boost::asio::buffer(payload, payload_length),
00550 this->timeout);
00551 }
00552 catch(...)
00553 {
00554
00555
00556 this->disconnect();
00557 delete[] payload;
00558 return;
00559 }
00560 buf.append(payload, payload_length);
00561 delete[] payload;
00562
00563
00564 shared_ptr<PDU> pdu;
00565 try
00566 {
00567 pdu = PDU::parse_pdu(buf);
00568 }
00569 catch(version_error)
00570 {
00571
00572
00573 }
00574 catch(parse_error)
00575 {
00576
00577 this->disconnect();
00578 }
00579
00580
00581 shared_ptr<ResponsePDU> response;
00582 response = boost::dynamic_pointer_cast<ResponsePDU>(pdu);
00583 if(response.get() != 0)
00584 {
00585
00586 std::map< uint32_t, boost::shared_ptr<ResponsePDU> >::iterator i;
00587 i = this->responses.find( response->get_packetID() );
00588 if(i != this->responses.end())
00589 {
00590
00591 i->second = response;
00592 }
00593 else
00594 {
00595
00596
00597 }
00598 }
00599 else
00600 {
00601
00602
00603 if( this->handler )
00604 {
00605
00606
00607 try
00608 {
00609 this->handler->handle_pdu(pdu);
00610 }
00611 catch(...)
00612 {
00613
00614 }
00615 }
00616 }
00617
00618
00619
00620 async_read(*this->socket,
00621 boost::asio::buffer(this->header_buf, 20),
00622 boost::bind(&connector::receive_callback,
00623 this,
00624 boost::asio::placeholders::error));
00625 }
00626
00627
00628
00629 void connector::send(const PDU& pdu)
00630 {
00631 if(this->socket == 0)
00632 {
00633
00634 throw disconnected();
00635 }
00636
00637 try
00638 {
00639
00640 data_t buf = pdu.serialize();
00641 send_with_timeout(*this->socket,
00642 boost::asio::buffer(buf.data(), buf.size()),
00643 this->timeout);
00644 }
00645 catch(network_error)
00646 {
00647
00648 this->disconnect();
00649 throw disconnected();
00650 }
00651 catch(timeout_error)
00652 {
00653
00654 this->disconnect();
00655
00656
00657 throw;
00658 }
00659 }
00660
00661
00662
00663
00664
00665 static void callback_for_response(const boost::system::error_code& result,
00666 status_t* retval)
00667 {
00668 if( result.value() == 0 )
00669 {
00670
00671 *retval = success;
00672 }
00673 else
00674 {
00675
00676 *retval = fail;
00677 }
00678 }
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696 boost::shared_ptr<ResponsePDU>
00697 connector::wait_for_response(uint32_t packetID)
00698 {
00699 if(this->socket == 0)
00700 {
00701
00702 throw disconnected();
00703 }
00704
00705
00706
00707 responses[packetID] = boost::shared_ptr<ResponsePDU>();
00708
00709
00710 boost::asio::deadline_timer timer(*(this->io_service));
00711 status_t timer_result = in_progress;
00712 timer.expires_from_now( boost::posix_time::seconds(this->timeout) );
00713 timer.async_wait( boost::bind(callback_for_response,
00714 boost::asio::placeholders::error,
00715 &timer_result) );
00716
00717
00718 try
00719 {
00720 while( this->responses[packetID].get() == 0
00721 && timer_result == in_progress)
00722 {
00723 this->io_service->run_one();
00724 }
00725 }
00726 catch(boost::system::system_error)
00727 {
00728
00729
00730 this->disconnect();
00731 throw disconnected();
00732 }
00733
00734
00735 if( this->responses[packetID].get() != 0 )
00736 {
00737
00738
00739
00740
00741 timer.cancel();
00742 shared_ptr<ResponsePDU> retval = this->responses[packetID];
00743 this->responses.erase( packetID );
00744 return retval;
00745 }
00746 else
00747 {
00748
00749 throw(timeout_error());
00750 }
00751 }