iSpike  2.1
Spike conversion library for robotics
D:/Home/Programs/iSpike/src/YarpConnection.cpp
Go to the documentation of this file.
00001 //iSpike includes
00002 #include <iSpike/YarpConnection.hpp>
00003 #include <iSpike/YarpPortDetails.hpp>
00004 #include <iSpike/ISpikeException.hpp>
00005 #include <iSpike/Log/Log.hpp>
00006 using namespace ispike;
00007 
00008 //Other includes
00009 #include <iostream>
00010 #include <sstream>
00011 #include <iterator>
00012 #include <string>
00013 #include <map>
00014 #include <utility>
00015 #include <boost/array.hpp>
00016 #include <boost/asio.hpp>
00017 #include <boost/regex.hpp>
00018 #include <boost/lexical_cast.hpp>
00019 #include <boost/mpl/vector.hpp>
00020 using namespace std;
00021 using boost::asio::ip::tcp;
00022 
00024 YarpConnection::YarpConnection(string ip, unsigned port){
00025         //boost::asio::io_service io_service;
00026         LOG(LOG_DEBUG) << "creating a socket";
00027         this->connectionSocket = new tcp::socket(this->io_service);
00028         LOG(LOG_DEBUG) << "connecting to port";
00029         int result = connect_to_port(ip, port);
00030         LOG(LOG_DEBUG) << "connected";
00031         if (!result) {
00032                 boost::system::error_code error = boost::asio::error::host_not_found;
00033                 throw boost::system::system_error(error);
00034         }
00035 
00036         LOG(LOG_DEBUG) << "writing commands";
00037         write_text("CONNECT foo\n");
00038         read_until("\n");
00039         write_text("d\n");
00040         write_text("list\n");
00041 
00042         boost::system::error_code read_error;
00043         LOG(LOG_DEBUG) << "reading reply";
00044         string response_string = read_text();
00045         LOG(LOG_DEBUG) << "disconnecting";
00046         disconnect();
00047         LOG(LOG_DEBUG) << "disconnected";
00048         boost::cmatch matches;
00049         boost::regex new_line("\n");
00050         list<string> lines;
00051         boost::regex_split(back_inserter(lines), response_string, new_line);
00052         boost::regex expression("registration name (.+?) ip (.+?) port (.+?) type (.+?)");
00053         portMap.clear();
00054         while(lines.size())  {
00055                 string current_string = *(lines.begin());
00056                 lines.pop_front();
00057 
00058                 if (boost::regex_match(current_string.c_str(), matches, expression)) {
00059                         string portName(matches[1].first, matches[1].second);
00060                         string ip(matches[2].first, matches[2].second);
00061                         string portStr(matches[3].first, matches[3].second);
00062                         string type(matches[4].first, matches[4].second);
00063                         unsigned port = boost::lexical_cast<unsigned>(portStr.c_str());
00064                         YarpPortDetails details(ip, port, type);
00065                         portMap[portName] = details;
00066                 }
00067         }
00068 
00069         LOG(LOG_DEBUG) << "YarpConnection: Received the following Yarp portmap:";
00070         for (map<string, YarpPortDetails>::iterator it=portMap.begin() ; it != portMap.end(); ++it )   {
00071                 LOG(LOG_DEBUG) << (*it).first << " => " << (*it).second.getIp() << ":" << (*it).second.getPort();
00072         }
00073 }
00074 
00075 
00077 YarpConnection::~YarpConnection(){
00078 }
00079 
00080 
00081 /*------------------------------------------------------------------*/
00082 /*---------               PUBLIC METHODS                     -------*/
00083 /*------------------------------------------------------------------*/
00084 
00088 int YarpConnection::write_text(string message){
00089         boost::system::error_code connection_error;
00090         boost::asio::write( *this->connectionSocket, boost::asio::buffer(message),
00091                                                 boost::asio::transfer_all(), connection_error);
00092         if (connection_error) throw boost::system::system_error(connection_error);
00093         return(true);
00094 }
00095 
00096 
00097 string YarpConnection::getSocketString(){
00098         unsigned int timeoutCtr = 0;
00099         unsigned int timeout = 10000;
00100         while(read_char() != 'o' && timeoutCtr < timeout){
00101                 timeoutCtr++;
00102         }
00103         if(timeoutCtr >= timeout){
00104                 throw ISpikeException("Timeout exceeded in getSocketString");
00105         }
00106         timeoutCtr = 0;
00107         ostringstream strStream (ostringstream::out);
00108         unsigned char newChar = this->read_char();
00109         while(newChar != 'd' && timeoutCtr != timeout)  {
00110                 strStream << newChar;
00111                 newChar = this->read_char();
00112                 ++timeoutCtr;
00113         }
00114         if(timeoutCtr >= timeout){
00115                 throw ISpikeException("Timeout exceeded in getSocketString");
00116         }
00117         return strStream.str();
00118 }
00119 
00120 
00124 string YarpConnection::read_until(string until){
00125         boost::asio::streambuf response;
00126         boost::asio::read_until(*this->connectionSocket, response, until);
00127         istream response_stream(&response);
00128         string response_string((istreambuf_iterator<char>(response_stream)), istreambuf_iterator<char>());
00129         return(response_string);
00130 }
00131 
00132 
00137 int YarpConnection::connect_to_port(string ip, unsigned port){
00138         boost::asio::io_service io_service;
00139         boost::asio::ip::tcp::endpoint endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip.c_str()), port);
00140         boost::system::error_code error = boost::asio::error::host_not_found;
00141 
00142         this->connectionSocket->close();
00143         this->connectionSocket->connect(endpoint, error);
00144 
00145         if (error || !this->connectionSocket->is_open()){
00146                 LOG(LOG_DEBUG) << "Exception thrown connecting to port: "<<error.message();
00147                 throw ISpikeException(error.message());
00148         }
00149         return true;
00150 }
00151 
00152 
00156 void YarpConnection::prepare_to_read_text(){
00157         write_text("CONNECT foo\n");
00158         boost::asio::streambuf response;
00159         boost::asio::read_until(*this->connectionSocket, response, "\n");
00160         write_text("r\n");
00161 }
00162 
00163 
00167 void YarpConnection::prepare_to_read_binary(){
00168         // Send header to select connection type.
00169         // this header is for fast_tcp, so we don't have to deal with flow control
00170         unsigned char hdr[8] = {'Y','A',0x64, 0x1E, 0, 0,'R','P'};
00171         write_binary(hdr,8);
00172 
00173         // Send name of our port - there is none, so send a name that
00174         // does not start with "/"
00175         unsigned char port[8] = {4,0,0,0,'m','i','n',0};
00176         write_binary(port,8);
00177         hdr[7] = '\0';
00178 
00179         // Check for acknowledgement
00180         read_binary(hdr,8);
00181         if (hdr[7]!='P') {
00182                 throw ISpikeException("Cannot make connection handshake");
00183         }
00184 
00185         // Send header for payload (a command to reverse connection)
00186         unsigned char load_hdr[8] = {'Y','A',10, 0, 0, 0,'R','P'};
00187         write_binary(load_hdr,8);
00188         unsigned char load_hdr2[10] = { 1, 1, 255, 255, 255, 255, 255, 255, 255, 255 };
00189         write_binary(load_hdr2,10);
00190         unsigned char load_len[4] = { 8, 0, 0, 0 };
00191         write_binary(load_len,4);
00192         unsigned char reply_len[4] = { 0, 0, 0, 0 };
00193         write_binary(reply_len,4);
00194 
00195         // send data - this is a command to reverse the connection for reading
00196         unsigned char command_reverse[8] = { 0, 0, 0, 0, '~', 'r', 0, 1 };
00197         write_binary(command_reverse,8);
00198 }
00199 
00200 
00203 bool YarpConnection::read_image(Bitmap& bitmap){
00204         int result = read_data_header();
00205         if (result>=0) {
00206                 unsigned char header[4*15];
00207                 int image_len = result - sizeof(header);
00208                 result = read_binary((unsigned char*)header,sizeof(header));
00209                 if (result<0) {
00210                         throw ISpikeException("Failed to read image header");
00211                 }
00212 
00213                 //Extract the width, height and depth of the image
00214                 int depth = read_int((unsigned char*)(header+4*8)); // header.depth
00215                 int width = read_int((unsigned char*)(header+4*11)); // header.width
00216                 int height = read_int((unsigned char*)(header+4*12)); // header.height
00217 
00218                 //Create new bitmap contents if width, height and depth do not match
00219                 if (bitmap.getWidth() != width || bitmap.getHeight() != height || bitmap.getDepth() != depth) {
00220                         bitmap.reset(width, height, depth);
00221                 }
00222 
00223                 //Read into bitmap
00224                 unsigned char* contents = bitmap.getContents();
00225                 read_binary(contents, bitmap.size());
00226                 return true;
00227         }
00228         return false;
00229 }
00230 
00231 
00232 /*------------------------------------------------------------------*/
00233 /*---------              PRIVATE METHODS                     -------*/
00234 /*------------------------------------------------------------------*/
00235 
00237 void YarpConnection::disconnect(){
00238         write_text("q\n");
00239         this->connectionSocket->close();
00240 }
00241 
00242 
00246 int YarpConnection::read_binary(unsigned char* buffer, int length){
00247         return boost::asio::read(*this->connectionSocket, boost::asio::buffer(buffer, length));
00248 }
00249 
00250 
00251 unsigned char YarpConnection::read_char(){
00252         vector<unsigned char> buf(1);
00253         try     {
00254                 size_t len = read(*this->connectionSocket, boost::asio::buffer(buf));
00255                 assert(len == 1);
00256                 //cout << buf[0];
00257                 return buf[0];
00258                 // process the 4 bytes in buf
00259         }
00260         catch (...)     {
00261                 throw new ISpikeException("Error in read_char!" );
00262         }
00263 }
00264 
00265 
00267 int YarpConnection::read_data_header(){
00268         int i;
00269         unsigned char load_hdr_ref[8] = {'Y','A',10, 0, 0, 0,'R','P'};
00270         unsigned char load_hdr[8] = {0,0,0,0,0,0,0,0};
00271         read_binary(load_hdr,8);
00272         for (i=0; i<8; i++) {
00273                 if (load_hdr[i]!=load_hdr_ref[i]) {
00274                         throw ISpikeException("Unexpected data received");
00275                 }
00276         }
00277         unsigned char load_hdr2[10];
00278         load_hdr2[1] = 0;
00279         read_binary(load_hdr2,10);
00280         if (load_hdr2[1]!=1) {
00281                 throw ISpikeException("Corrupt data received");
00282         }
00283         int blocks = load_hdr2[0];
00284         int len = 0;
00285         unsigned char load_len[4];
00286         unsigned int port_message_len = 0;
00287         for (i=0; i<blocks; i++) {
00288                 read_binary((unsigned char *)load_len,4);
00289                 len += load_len[0] + (load_len[1]<<8) + (load_len[2]<<16) +
00290                                 (load_len[3]<<24);
00291                 if (i==0) {
00292                         port_message_len = len;
00293                 }
00294         }
00295         read_binary((unsigned char *)load_len,4);
00296         for (i=0; i<4; i++) {
00297                 if (load_len[i]!=0)
00298                         throw ISpikeException("Unexpected lengths received");
00299         }
00300 
00301         // extract the port header part
00302         unsigned char command_header[8];
00303         command_header[4] = '\0';
00304         read_binary(command_header,8);
00305         len -= 8;
00306         port_message_len -= 8;
00307         if (command_header[4]!='~') {
00308                 throw ISpikeException("Unexpected port command received");
00309         }
00310         unsigned char cmd[256] = "?";
00311         cmd[0] = command_header[5];
00312         if (cmd[0]=='\0') {
00313                 if (port_message_len>sizeof(cmd)) {
00314                         throw ISpikeException("Port command too big\n");
00315                 }
00316                 read_binary(cmd,port_message_len);
00317                 len -= port_message_len;
00318         }
00319         if (cmd[0]!='d') {
00320                 throw ISpikeException("Unexpected port command!");
00321         }
00322 
00323         return len;
00324 }
00325 
00326 
00328 int YarpConnection::read_int(unsigned char* buf){
00329         unsigned char *ubuf = (unsigned char *)buf;
00330         // this could be optimized away on little-endian machines!
00331         return ubuf[0] + (ubuf[1]<<8) + (ubuf[2]<<16) + (ubuf[3]<<24);
00332 }
00333 
00334 
00337 string YarpConnection::read_text(){
00338         return read_until("*** end of message");
00339 }
00340 
00341 
00343 int YarpConnection::write_binary(unsigned char* buffer, int length){
00344         boost::system::error_code connection_error;
00345         boost::asio::write( *this->connectionSocket, boost::asio::buffer(buffer, length),
00346                                                 boost::asio::transfer_all(), connection_error);
00347         if (connection_error) throw boost::system::system_error(connection_error);
00348         return(true);
00349 }
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Defines