Reading messages from a serial port with Boost Asio -
i use boost asio read variable length messages serial port. read , wait long enough sure line idle, not want block completely.
the following code have far, , in process of testing it:
long readdata(void *_pdata, unsigned long _usize, size_t millis) { size_t n = 0; // n return message size. if (millis > 0) // millis acceptable idle time, 0 invalid in case. { size_t ubytestransferred = 0; boost::asio::deadline_timer timeout(m_ioservice); readcallback readcallback(ubytestransferred, timeout); waitcallback waitcallback(m_port); while (_usize - (unsigned long)n > 0) { // setup asynchronous read timeout m_ioservice.reset(); m_port.async_read_some(boost::asio::buffer((char*)_pdata + n, _usize - (unsigned long)n), readcallback); timeout.expires_from_now(boost::posix_time::milliseconds(millis)); timeout.async_wait(waitcallback); // block until asynchronous callbacks finished m_ioservice.run(); // continue if bytes received, stop otherwise if (ubytestransferred > 0) { n += ubytestransferred; m_ubytesreceived += ubytestransferred; } else { break; } } } return n; }
i know if correct way of doing (that is, reading until line idle) boost asio?
here callback handlers:
struct readcallback { readcallback(std::size_t &_ubytestransferred, boost::asio::deadline_timer &_timeout) :m_ubytestransferred(_ubytestransferred), m_timeout(_timeout) {} void operator()(const boost::system::error_code &_error, std::size_t _ubytestransferred) { m_ubytestransferred = _ubytestransferred; if (!_error && (_ubytestransferred > 0) ) { m_timeout.cancel(); } } std::size_t &m_ubytestransferred; boost::asio::deadline_timer &m_timeout; private: readcallback(); readcallback &operator=(const readcallback&); }; struct waitcallback { waitcallback(boost::asio::serial_port &_port) :m_port(_port) {} void operator()(const boost::system::error_code &_error) { if (!_error) { m_port.cancel(); } } boost::asio::serial_port &m_port; private: waitcallback(); waitcallback &operator=(const waitcallback&); };
when "do not want block completely", suggests might better off creating separate thread handle socket i/o. way main thread can available other processing. using 2 threads in client. 1 thread processes messages received server , other thread handles sending messages server. client talks multiple servers. here code:
#include "stdafx.h" #include "sslsocket.h" boost::shared_ptr< boost::asio::io_service > sslsocket::ioservice; bool sslsocket::lobbysocketopen = false; sslsocket* sslsocket::pssllobby = 0; int sslsocket::staticinit = 0; callback sslsocket::callbackfunction; buffermanagement sslsocket::bufmang; volatile bool sslsocket::reqalive = true; logger sslsocket::log; handle sslsocket::hevent; bool sslsocket::displayinhex; concurrentmsgqueue sslsocket::sendmsgq; bool sslsocket::rcvthreadcreated = 0; buffermanagement* message::pbufmang; bool sslsocket::shuttingdown = false; std::vector<sslsocket *> socketlist; sslsocket::sslsocket(const bool logtofile, const bool logtoconsole, const bool displayinhex, const loglevel leveloflog, const string& logfilename, const int bufmanglen) : psocket(0) { // sslsocket constructor. // if static members have not been intialized yet, initialize them. lockcode = new lock(); if (!staticinit) { socketlist.push_back(this); displayinhex = displayinhex; bufmang.init(bufmanglen); message::setbufmang(&bufmang); // constructor enables logging according vars passed in. log.init(logtofile, logtoconsole, leveloflog, logfilename); staticinit = 1; hevent = createevent(null, false, false, null); // define asio io service object. // ioservice = new boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service); boost::shared_ptr<boost::asio::io_service> ioserv(new boost::asio::io_service); ioservice = ioserv; pssllobby = this; } } sslsocket::~sslsocket(void) { if (psocket) delete psocket; if (--staticinit == 0) closehandle(hevent); } void sslsocket::connect(sslsocket* pssls, const string& serverpath, string& port) { // connects server. // serverpath - specifies path server. can either ip address or url. // port - port server listening on. // try { lockcode->acquire(); // single thread code. // locking codelock(socketlock); // single thread code. // if user has tried connect before, make sure clean before trying again. if (psocket) { delete psocket; psocket = 0; } // if serverpath url, resolve address. if ((serverpath[0] < '0') || (serverpath[0] > '9')) // assumes first char of server path not number when resolving ip addr. { // create resolver , query objects resolve host name in serverpath ip address. boost::asio::ip::tcp::resolver resolver(*ioservice); boost::asio::ip::tcp::resolver::query query(serverpath, port); boost::asio::ip::tcp::resolver::iterator endpointiterator = resolver.resolve(query); // set ssl context. boost::asio::ssl::context ctx(*ioservice, boost::asio::ssl::context::tlsv1_client); // specify not verify server certificiate right now. ctx.set_verify_mode(boost::asio::ssl::context::verify_none); // init socket object used communicate server. psocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*ioservice, ctx); // // thread on now, user interface thread. create thread handle incoming socket work messages. // 1 thread created handle socket i/o reading , thread created handle writing. if (!rcvthreadcreated) { workerthreads.create_thread(boost::bind(&sslsocket::rcvworkerthread, this)); rcvthreadcreated = true; workerthreads.create_thread(boost::bind(&sslsocket::sendworkerthread, this)); } // try connect server. note - add timeout logic @ point. boost::asio::async_connect(psocket->lowest_layer(), endpointiterator, boost::bind(&sslsocket::handleconnect, this, boost::asio::placeholders::error)); } else { // serverpath ip address, try connect using that. // // create endpoint specified ip address. const boost::asio::ip::address ip(boost::asio::ip::address::from_string(serverpath)); int iport = atoi(port.c_str()); const boost::asio::ip::tcp::endpoint ep(ip, iport); // set ssl context. boost::asio::ssl::context ctx(*ioservice, boost::asio::ssl::context::tlsv1_client); // specify not verify server certificiate right now. ctx.set_verify_mode(boost::asio::ssl::context::verify_none); // init socket object used communicate server. psocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*ioservice, ctx); // // try connect server. note - add timeout logic @ point. //psocket->core_.engine_.do_connect(void*, int); // psocket->next_layer_.async_connect(ep, &sslsocket::handleconnect) // psocket->next_layer().async_connect(ep, &sslsocket::handleconnect); boost::system::error_code ec; psocket->next_layer().connect(ep, ec); if (ec) { // log error. worker thread should exit gracefully after this. stringstream ss; ss << "sslsocket::connect: connect failed " << sclientip << " : " << uiclientport << ". error: " << ec.message() + ".\n"; log.logstring(ss.str(), logerror); } handleconnect(ec); // boost::asio::async_connect(psocket->lowest_layer(), ep, // boost::bind(&sslsocket::handleconnect, this, boost::asio::placeholders::error)); } } catch (std::exception& e) { stringstream ss; ss << "sslsocket::connect: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } lockcode->release(); } void sslsocket::sendtoserver(const int bytesinmsg, byte* pbuf) { // method creates msg object , saves in sendmsgq object. // sends number of bytes specified bytesinmsg in pbuf server. // message* pmsg = message::getmsg(this, bytesinmsg, pbuf); sendmsgq.push(pmsg); // signal send worker thread wake , send msg server. setevent(hevent); } void sslsocket::sendworkerthread(sslsocket* pssls) { // thread method gets called process messages sent server. // // since has static method, call method on class handle server requests. pssls->processsendrequests(); } void sslsocket::processsendrequests() { // method handles sending msgs server. // std::stringstream ss; dword waitresult; log.logstring("sslsocket::processsendrequests: worker thread " + logger::numbertostring(boost::this_thread::get_id()) + " started.\n", loginfo); // loop until user quits, or error of sort thrown. try { { // if there 1 or more msgs need sent server, send them out. if (sendmsgq.count() > 0) { message* pmsg = sendmsgq.front(); sslsocket* pssl = pmsg->pssl; sendmsgq.pop(); const byte* pbuf = pmsg->pbuf; const int bytesinmsg = pmsg->bytesinmsg; boost::system::error_code error; lockcode->acquire(); // single thread code. // locking codelock(socketlock); // single thread code. try { boost::asio::async_write(*pssl->psocket, boost::asio::buffer(pbuf, bytesinmsg), boost::bind(&sslsocket::handlewrite, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } catch (std::exception& e) { stringstream ss; ss << "sslsocket::processsendrequests: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); // stop(); } ss.str(std::string()); ss << "sslsocket::processsendrequests: # bytes sent = " << bytesinmsg << "\n"; log.logstring(ss.str(), logdebug2); log.logbuf(pbuf, bytesinmsg, displayinhex, logdebug3); lockcode->release(); } else { // nothing send, go wait state. waitresult = waitforsingleobject(hevent, infinite); if (waitresult != 0l) { log.logstring("sslsocket::processsendrequests: waitforsingleobject event error. code = " + logger::numbertostring(getlasterror()) + ". \n", logerror); } } } while (reqalive); log.logstring("sslsocket::processsendrequests: worker thread " + logger::numbertostring(boost::this_thread::get_id()) + " done.\n", loginfo); } catch (std::exception& e) { stringstream ss; ss << "sslsocket::processsendrequests: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } void sslsocket::handlewrite(const boost::system::error_code& error, size_t bytestransferred) { // method called after msg has been written out socket. nothing since reading handled handleread method. // std::stringstream ss; try { if (error) { ss << "sslsocket::handlewrite: failed - " << error.message() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } catch (std::exception& e) { stringstream ss; ss << "sslsocket::handlehandshake: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } void sslsocket::rcvworkerthread(sslsocket* pssls) { // method gets called when receive thread created class. // thread method focuses on processing messages received server. // // since has static method, call method on class handle server requests. pssls->initasynchio(); } void sslsocket::initasynchio() { // method responsible initiating asynch i/o. boost::system::error_code err; string s; stringstream ss; // try { ss << "sslsocket::initasynchio: worker thread - " << logger::numbertostring(boost::this_thread::get_id()) << " started.\n"; log.logstring(ss.str(), loginfo); // enable handlers asynch i/o. thread hang here until stop method has been called or error occurs. // add work object thread dedicated handling asynch i/o. boost::asio::io_service::work work(*ioservice); ioservice->run(); log.logstring("sslsocket::initasynchio: receive worker thread done.\n", loginfo); } catch (std::exception& e) { stringstream ss; ss << "sslsocket::initasynchio: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } void sslsocket::handleconnect(const boost::system::error_code& error) { // method called asynchronously when server has responded connect request. std::stringstream ss; try { if (!error) { psocket->async_handshake(boost::asio::ssl::stream_base::client, boost::bind(&sslsocket::handlehandshake, this, boost::asio::placeholders::error)); ss << "sslsocket::handleconnect: worker thread " << logger::numbertostring(boost::this_thread::get_id()) << ".\n"; log.logstring(ss.str(), loginfo); } else { // log error. worker thread should exit gracefully after this. ss << "sslsocket::handleconnect: connect failed " << sclientip << " : " << uiclientport << ". error: " << error.message() + ".\n"; log.logstring(ss.str(), logerror); stop(); } } catch (std::exception& e) { stringstream ss; ss << "sslsocket::initasynchio: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } void sslsocket::handlehandshake(const boost::system::error_code& error) { // method called asynchronously when server has responded handshake request. std::stringstream ss; try { if (!error) { // try send first message server expecting. msg tells server want connect. // first 4 bytes specifies msg length after first 4 bytes. next 2 bytes specifies msg type. // next 4 bytes specifies source code. next 13 bytes specifies msg "attackpoker". // next 2 bytes specifies locale length. last 2 bytes specifies locale - en english. // unsigned char msg[27] = {0x17, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x41, 0x74, 0x74, 0x61, 0x63, 0x6b, 0x50, 0x6f, 0x6b, 0x65, 0x72, 0x02, 0x00, 0x65, 0x6e}; boost::system::error_code err; // if (pssllobby == this) lobbysocketopen = true; sclientip = psocket->lowest_layer().remote_endpoint().address().to_string(); uiclientport = psocket->lowest_layer().remote_endpoint().port(); reqalive = true; // boost::asio::async_write(*psocket, boost::asio::buffer(msg), boost::bind(&sslsocket::handlefirstwrite, this, // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); int count = boost::asio::write(*psocket, boost::asio::buffer(msg), boost::asio::transfer_exactly(27), err); if (err) { ss << "sslsocket::handlehandshake: write failed - " << error.message() << ".\n"; log.logstring(ss.str(), loginfo); } handlefirstwrite(err, count); // boost::asio::async_write(psocket, boost::asio::buffer(msg, 27), boost::bind(&sslsocket::handlewrite, this, // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); ss.str(""); ss << "sslsocket::handlehandshake: worker thread " << boost::this_thread::get_id() << ".\n"; } else { ss << "sslsocket::handlehandshake: failed - " << error.message() << ".\n"; ioservice->stop(); } log.logstring(ss.str(), loginfo); } catch (std::exception& e) { stringstream ss; ss << "sslsocket::handlehandshake: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } void sslsocket::handlefirstwrite(const boost::system::error_code& error, size_t bytestransferred) { // method called after msg has been written out socket. std::stringstream ss; try { if (!error) { // boost::asio::async_read(psocket, boost::asio::buffer(reply_, bytestransferred), boost::bind(&sslsocket::handle_read, // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); // boost::asio::async_read(psocket, boost::asio::buffer(reply_, 84), boost::bind(&sslsocket::handle_read, // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); // locking codelock(readlock); // single thread code. // signal other threads msgs ready sent , received. // boost::asio::async_read(psocket, boost::asio::buffer(prepbuf), boost::asio::transfer_exactly(4), boost::bind(&sslsocket::handleread, // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); // // notify ui connected. create 6 byte msg this. pdatabuf = bufmang.getptr(6); byte* p = pdatabuf; // create msg type 500 *p = 244; *++p = 1; callbackfunction(this, 2, (void*)pdatabuf); // 1st 4 bytes of next msg, length of msg. pdatabuf = bufmang.getptr(msglenbytes); // int i1=1,i2=2,i3=3,i4=4,i5=5,i6=6,i7=7,i8=8,i9=9; // (boost::bind(&nine_arguments,_9,_2,_1,_6,_3,_8,_4,_5,_7)) // (i1,i2,i3,i4,i5,i6,i7,i8,i9); // boost::asio::read(*psocket, boost::asio::buffer(preqbuf, msglenbytes), boost::asio::transfer_exactly(msglenbytes), err); // boost::asio::async_read(psocket, boost::asio::buffer(preqbuf, msglenbytes), boost::bind(&sslsocket::handleread, _1,_2,_3)) // (this, preqbuf, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred); // boost::asio::async_read(*psocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(bytecount), boost::bind(&client::handle_read, // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); // boost::asio::async_write(*psocket, boost::asio::buffer(pdatabuf, msglenbytes), boost::bind(&sslsocket::handlewrite, this, // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); try { lockcode->acquire(); // single thread code. // locking codelock(socketlock); // single thread code. boost::asio::async_read(*psocket, boost::asio::buffer(pdatabuf, msglenbytes), boost::bind(&sslsocket::handleread, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } catch (std::exception& e) { stringstream ss; ss << "sslsocket::handlefirstwrite: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } lockcode->release(); } else { ss << "sslsocket::handlefirstwrite: failed - " << error.message() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } catch (std::exception& e) { stringstream ss; ss << "sslsocket::handlefirstwrite: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } void sslsocket::handleread(const boost::system::error_code& error, size_t bytestransferred) { // method called process incomming message. // std::stringstream ss; int bytecount; try { // ss << "sslsocket::handleread: worker thread " << boost::this_thread::get_id() << ".\n"; // log.logstring(ss.str(), loginfo); // set exit thread if user done. if (!reqalive) { // ioservice->stop(); return; } if (!error) { // number of bytes in message. if (bytestransferred == 4) { bytecount = bytestoint(pdatabuf); } else { // call c# callback method handle message. ss << "sslsocket::handleread: worker thread " << boost::this_thread::get_id() << "; # bytes transferred = " << bytestransferred << ".\n"; log.logstring(ss.str(), logdebug2); if (bytestransferred > 0) { log.logbuf(pdatabuf, (int)bytestransferred, true, logdebug3); log.logstring("sslsocket::handleread: sending msg c# client.\n\n", logdebug2); callbackfunction(this, bytestransferred, (void*)pdatabuf); } else { // # of bytes transferred = 0. don't anything. bytestransferred = 0; // debugging. } // prepare read in next message length. bytecount = msglenbytes; } pdatabuf = bufmang.getptr(bytecount); boost::system::error_code err; // boost::asio::async_read(psocket, boost::asio::buffer(pdatabuf, bytecount), boost::bind(&sslsocket::handleread, // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); lockcode->acquire(); // single thread code. // locking codelock(socketlock); // single thread code. try { boost::asio::async_read(*psocket, boost::asio::buffer(pdatabuf, bytecount), boost::bind(&sslsocket::handleread, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); // boost::asio::read(psocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(bytecount), err); } catch (std::exception& e) { stringstream ss; ss << "sslsocket::handleread: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); // stop(); } lockcode->release(); } else { log.logstring("sslsocket::handleread failed: " + error.message() + "\n", logerror); stop(); } } catch (std::exception& e) { stringstream ss; ss << "sslsocket::handleread: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } } void sslsocket::stop() { // method calls shutdown method on socket in order stop reads or writes might going on. if not done, exception thrown // when comes time delete object. // boost::system::error_code ec; try { // method can called handler well. once shuttingdown flag set, don't go throught same code again. if (shuttingdown) return; lockcode->acquire(); // single thread code. if (!shuttingdown) { shuttingdown = true; psocket->next_layer().cancel(); psocket->shutdown(ec); if (ec) { stringstream ss; ss << "sslsocket::stop: socket shutdown error - " << ec.message() << ".\n"; // log.logstring(ss.str(), logerror); // - not error. } else { psocket->next_layer().close(); } delete psocket; psocket = 0; reqalive = false; setevent(hevent); ioservice->stop(); lobbysocketopen = false; workerthreads.join_all(); } lockcode->release(); delete lockcode; lockcode = 0; } catch (std::exception& e) { stringstream ss; ss << "sslsocket::handleread: threw error - " << e.what() << ".\n"; log.logstring(ss.str(), logerror); stop(); } }