AgentXcpp  Revision:0.1.1
Internals Documentation
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Friends Pages
connector.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2011-2012 Tanjeff-Nicolai Moos <tanjeff@cccmz.de>
3  *
4  * This file is part of the agentXcpp library.
5  *
6  * AgentXcpp is free software: you can redistribute it and/or modify
7  * it under the terms of the AgentXcpp library license, version 1, which
8  * consists of the GNU General Public License and some additional
9  * permissions.
10  *
11  * AgentXcpp is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * See the AgentXcpp library license in the LICENSE file of this package
17  * for more details.
18  */
19 
20 #include <boost/bind.hpp>
21 #include "connector.hpp"
22 #include "util.hpp"
23 #include "timeout_timer.hpp"
24 
25 
26 using namespace agentxcpp;
27 
28 
29 /*
30  ********************************************
31  Local helper function: read_with_timeout()
32  ********************************************
33  */
34 
35 
36 
37 /**
38  * \brief Helper type for *_with_timeout() functions.
39  */
40 enum status_t {
43  fail = 2
44 };
45 
46 
47 
48 /**
49  * \brief Helper function for *_with_timeout() functions.
50  */
51 static void callback(const boost::system::error_code& result,
52  status_t* retval)
53 {
54  if( result.value() == 0 )
55  {
56  // success
57  *retval = success_old;
58  }
59  else
60  {
61  // error
62  *retval = fail;
63  }
64 }
65 
66 
67 
68 
69 /**
70  * \brief Like boost::asio::read(), but with timeout
71  *
72  * Calls boost::asio::read(), but provides a timeout in addition.
73  *
74  * This function calls s.get_io_service.run_one().
75  *
76  * \param s The Stream to read from
77  *
78  * \param buffers The buffers to read into
79  *
80  * \param timeout The desired timeout in milliseconds
81  *
82  * \exception timeout_exception If the timeout expires before the read
83  * operation completes. Some bytes may have been
84  * read.
85  *
86  * \exception network_error If reading failed. Some data may have been read
87  * already or will be read later. Subsequent calls to
88  * this function will lead to undefined results.
89  *
90  * \return How many bytes were read
91  */
92 template<typename AsyncReadStream,
93  typename MutableBufferSequence>
94 static void read_with_timeout(AsyncReadStream& s,
95  const MutableBufferSequence& buffers,
96  unsigned timeout)
97 {
98  //
99  // What this function does:
100  //
101  // 1) start timeout timer
102  // 2) start read
103  // 3) wait until timer or read completes
104  // 4) process result
105  //
106 
107  // 1) Start timeout timer
108  timeout_timer timer(s.get_io_service());
109  timer.expires_from_now( boost::posix_time::milliseconds(timeout) );
110 
111  // 2) Start read
112 
113  // The read_result variable is static because in some circumstances the
114  // callback (which manipulates it) might be called after this function
115  // returned. We avoid a segfault this way.
116  static status_t read_result = in_progress_old;
117  // doesn't throw in boost 1.45.0:
118  async_read(s,
119  buffers,
120  boost::bind(callback,
121  boost::asio::placeholders::error,
122  &read_result));
123 
124  // 3) process asio events until read succeeds or timeout expires
125  try
126  {
127  do
128  {
129  // throws system_error in boost 1.45.0:
130  s.get_io_service().run_one();
131  }
132  while(read_result == in_progress_old &&
133  timer.get_status() == timeout_timer::running);
134  }
135  catch(boost::system::system_error)
136  {
137  timer.cancel();
138 
139  // TODO: How to cancel the async_read operation?
140  throw( network_error() );
141  }
142 
143  // 4) Check read result
144  switch(read_result)
145  {
146  case success_old:
147  // Read succeeded: OK
148  timer.cancel();
149  return;
150 
151  case fail:
152  // read failed: cancel timer, throw exception
153  timer.cancel();
154  throw( network_error() );
155 
156  case in_progress_old:
157 
158  // Look at timer_result:
159  switch(timer.get_status())
160  {
162  // timer fired while reading
163 
164  // TODO: how to cancel the async read operation?
165 
166  throw( timeout_error() );
167 
168  default:
169  // Timer broke or reported an insane status
170  // -> fail with a network error
171 
172  // TODO: how to cancel the async read operation?
173 
174  throw( network_error() );
175  }
176  }
177 
178 }
179 
180 
181 
182 /**
183  * \brief Send some data with timeout
184  *
185  * \note This function calls s.get_io_service.run_one().
186  *
187  * \param s The Stream to send to to
188  *
189  * \param buffers The data to send
190  *
191  * \param timeout The desired timeout in milliseconds
192  *
193  * \exception timeout_error If the timeout expires before the send
194  * operation completes. Some data may have been sent
195  * already. Subsequent calls to this function will
196  * lead to undefined results.
197  *
198  * \exception network_error If sending failed. Some data may have been sent
199  * already or may still be in the send queue.
200  * Subsequent calls to this function will lead to
201  * undefined results.
202  */
203 template<typename ConstBufferSequence>
204 static void send_with_timeout(boost::asio::local::stream_protocol::socket& s,
205  const ConstBufferSequence& buffers,
206  unsigned timeout)
207 {
208  //
209  // What this function does:
210  //
211  // 1) start timeout timer
212  // 2) start send
213  // 3) wait until timer or send completes
214  // 4) process result
215  //
216 
217  // 1) Start timeout timer
218 
219  timeout_timer timer(s.get_io_service());
220  timer.expires_from_now( boost::posix_time::milliseconds(timeout) );
221 
222  // 2) Start send
223 
224  // The send_result variable is static because in some circumstances the
225  // callback (which manipulates it) might be called after this function
226  // returned. We avoid a segfault this way.
227  static status_t send_result = in_progress_old;
228 
229  // doesn't throw in boost 1.45.0:
230  s.async_send(buffers,
231  boost::bind(callback,
232  boost::asio::placeholders::error,
233  &send_result));
234 
235  // 3) process asio events until send succeeds or timeout expires
236 
237  try
238  {
239  do
240  {
241  // throws system_error in boost 1.45.0:
242  s.get_io_service().run_one();
243  }
244  while(send_result == in_progress_old &&
245  timer.get_status() == timeout_timer::running);
246  }
247  catch(boost::system::system_error)
248  {
249  timer.cancel();
250 
251  // TODO: How to cancel the async_send operation?
252  throw( network_error() );
253  }
254 
255  // 4) Check result
256 
257  switch(send_result)
258  {
259  case success_old:
260  // Send succeeded:
261  timer.cancel();
262  return;
263 
264  case fail:
265  // send failed: cancel timer, throw exception
266  timer.cancel();
267  throw( network_error() );
268 
269  case in_progress_old:
270  // Sending still in progress, look at timer_result:
271  switch(timer.get_status())
272  {
274  // timer fired while reading
275 
276  // TODO: how to cancel the async send operation?
277 
278  // throw exception
279  throw( timeout_error() );
280 
281  default:
282  // Timer broke or reported an insane status
283  // -> fail with a network error
284 
285  // TODO: how to cancel the async send operation?
286 
287  throw( network_error() );
288  }
289  }
290 }
291 
292 
293 
294 /*
295  ********************************************
296  Implentation of class connector
297  ********************************************
298  */
299 
300 
301 
302 
303 
304 connector::connector(boost::asio::io_service* io_service,
305  const std::string& unix_domain_socket,
306  unsigned timeout) :
307  timeout(timeout),
308  io_service(io_service),
309  socket(0),
310  endpoint(unix_domain_socket.c_str()),
311  handler(0)
312 {
313 }
314 
316 {
317  // If currently connected: do nothing
318  // (is_open() doesn't throw in boost 1.45.0)
319  if( this->socket )
320  {
321  return;
322  }
323 
324  // Connect to endpoint
325  using boost::asio::local::stream_protocol;
327  try
328  {
329  this->socket->connect(endpoint);
330  }
331  catch(boost::system::system_error)
332  {
333  // Could not connect
334  delete this->socket;
335  this->socket = 0;
336  throw(disconnected());
337  }
338 
339  // Set up socket for next read access
340  // (async_read doesn't throw in boost 1.45.0)
341  async_read(*this->socket,
342  boost::asio::buffer(this->header_buf, 20),
343  boost::bind(&connector::receive_callback,
344  this,
345  boost::asio::placeholders::error));
346 }
347 
349 {
350  // If already disconnected: do nothing
351  if( this->socket == 0 )
352  {
353  return;
354  }
355 
356  // Shutdown socket
357  try
358  {
359 
360  // Cancel all read and write operations. Called on the recommendation
361  // of socket.close() documentation.
362  // (throws system_error in boost 1.45.0)
363  socket->shutdown(
364  boost::asio::local::stream_protocol::socket::shutdown_both);
365  }
366  catch(boost::system::system_error)
367  {
368  // ignore errors
369  }
370 
371  // Close socket
372  try
373  {
374  // (throws system_error in boost 1.45.0)
375  socket->close();
376  }
377  catch(boost::system::system_error)
378  {
379  // ignore errors
380  }
381 
382  // Finally destroy socket
383  delete this->socket;
384  this->socket = 0;
385 }
386 
387 
388 
389 
391 {
392  this->handler = handler;
393 }
394 
395 
396 void connector::receive_callback(const boost::system::error_code& result)
397 {
398  // Check for network errors
399  if( result.value() != 0 )
400  {
401  // result will probably be boost::asio::error::operation_aborted if the
402  // socket is explicitly closed. See boost 1.45.0 docs for
403  // basic_stream_socket::close().
404  if( result.value() == boost::asio::error::operation_aborted )
405  {
406  // Socket was closed. Nothing to do here.
407  }
408  else
409  {
410  // async read operation failed
411  // -> disconnect
412  this->disconnect();
413  }
414 
415  // Nothing left to do
416  return;
417  }
418 
419  // Copy header into PDU buffer
420  binary buf;
421  buf.append(this->header_buf, 20);
422 
423  // read endianess flag
424  bool big_endian = ( this->header_buf[2] & (1<<4) ) ? true : false;
425 
426  // read payload length
427  uint32_t payload_length;
428  binary::const_iterator pos = buf.begin() + 16;
429  payload_length = read32(pos, big_endian);
430  if( payload_length % 4 != 0 )
431  {
432  // payload length must be a multiple of 4!
433  // See RFC 2741, 6.1. "AgentX PDU Header"
434  // -> close socket
435  this->disconnect();
436 
437  // Report parse error
438  if( this->handler )
439  {
440  try
441  {
442  this->handler->handle_pdu(shared_ptr<PDU>(), -1);
443  }
444  catch(...)
445  {
446  // discard exceptions from user handler
447  }
448  }
449  }
450 
451  // Read the payload (TODO: can we avoid the new() operator?)
452  uint8_t* payload = new uint8_t[payload_length];
453  try
454  {
455  read_with_timeout(*this->socket,
456  boost::asio::buffer(payload, payload_length),
457  this->timeout);
458  }
459  catch(...)
460  {
461  // Some error occurred, e.g. timeout
462  // -> disconnect
463  this->disconnect();
464  delete[] payload;
465  return;
466  }
467  buf.append(payload, payload_length);
468  delete[] payload;
469 
470  // Parse PDU
471  shared_ptr<PDU> pdu;
472  try
473  {
474  pdu = PDU::parse_pdu(buf);
475  }
476  catch(version_error)
477  {
478  // Report version error
479  if( this->handler )
480  {
481  try
482  {
483  this->handler->handle_pdu(shared_ptr<PDU>(), -2);
484  }
485  catch(...)
486  {
487  // discard exceptions from user handler
488  }
489  }
490  }
491  catch(parse_error)
492  {
493  // disconnect
494  this->disconnect();
495 
496  // Report parse error
497  if( this->handler )
498  {
499  try
500  {
501  this->handler->handle_pdu(shared_ptr<PDU>(), -1);
502  }
503  catch(...)
504  {
505  // discard exceptions from user handler
506  }
507  }
508  }
509 
510  // Special case: ResponsePDU's
511  shared_ptr<ResponsePDU> response;
512  response = boost::dynamic_pointer_cast<ResponsePDU>(pdu);
513  if(response.get() != 0)
514  {
515  // Was a response
516  std::map< uint32_t, boost::shared_ptr<ResponsePDU> >::iterator i;
517  i = this->responses.find( response->get_packetID() );
518  if(i != this->responses.end())
519  {
520  // Someone is waiting for this response
521  i->second = response;
522  }
523  else
524  {
525  // Nobody was waiting for the response
526  // -> ignore it
527  }
528  }
529  else
530  {
531  // Was not a Response
532  // -> call handler if available
533  if( this->handler )
534  {
535 
536  // Call the handler
537  try
538  {
539  this->handler->handle_pdu(pdu, 0);
540  }
541  catch(...)
542  {
543  // discard exceptions from user handler
544  }
545  }
546  }
547 
548  // Set up socket for next read access
549  // (async_read doesn't throw in boost 1.45.0)
550  async_read(*this->socket,
551  boost::asio::buffer(this->header_buf, 20),
552  boost::bind(&connector::receive_callback,
553  this,
554  boost::asio::placeholders::error));
555 }
556 
557 
558 
559 void connector::send(const PDU& pdu)
560 {
561  if(this->socket == 0)
562  {
563  // We are currently not conected
564  throw disconnected();
565  }
566 
567  try
568  {
569  // throws timeout_error and network_error:
570  binary buf = pdu.serialize();
571  send_with_timeout(*this->socket,
572  boost::asio::buffer(buf.data(), buf.size()),
573  this->timeout);
574  }
575  catch(network_error)
576  {
577  // network errors are fatal: disconnect & throw
578  this->disconnect();
579  throw disconnected();
580  }
581  catch(timeout_error)
582  {
583  // No subsequent calls to send_with_timeout() possible -> disconnect.
584  this->disconnect();
585 
586  // forward timeout_error to caller
587  throw;
588  }
589 }
590 
591 
592 
593 boost::shared_ptr<ResponsePDU>
595 {
596  if(this->socket == 0)
597  {
598  // We are currently not conected
599  throw disconnected();
600  }
601 
602  // Indicate that we are waiting for a specific response:
603  // We add a null pointer to the map
604  responses[packetID] = boost::shared_ptr<ResponsePDU>();
605 
606  // Start timeout timer
607  timeout_timer timer(*(this->io_service));
608  timer.expires_from_now( boost::posix_time::seconds(this->timeout) );
609 
610  // process asio events until ResponsePDU arrives or timeout expires
611  try
612  {
613  while( this->responses[packetID].get() == 0 // no response was stored
614  && timer.get_status() == timeout_timer::running) // no timeout yet
615  {
616  this->io_service->run_one();
617  }
618  }
619  catch(boost::system::system_error)
620  {
621  // run_one() failed
622  // -> disconnect and fail
623  this->disconnect();
624  throw disconnected();
625  }
626 
627  // Check the result
628  if( this->responses[packetID].get() != 0 )
629  {
630  // ResponsePDU arrived:
631  // 1. Cancel timer
632  // 2. Erase response from map
633  // 3. Return response
634  timer.cancel(); // does nothing if timer broke
635  shared_ptr<ResponsePDU> retval = this->responses[packetID];
636  this->responses.erase( packetID );
637  return retval;
638  }
639  else
640  {
641  switch(timer.get_status())
642  {
644  // Timer expired before ResponsePDU arrived
645  throw(timeout_error());
646  default:
647  // Timer broke or reported an insane status
648  // We go to disconnected state to indicate an error
649  this->disconnect();
650  throw disconnected();
651  }
652  }
653 }