iSpike
2.1
Spike conversion library for robotics
|
00001 //iSpike includes 00002 #include <iSpike/YarpConnection.hpp> 00003 #include <iSpike/YarpPortDetails.hpp> 00004 #include <iSpike/ISpikeException.hpp> 00005 #include <iSpike/Log/Log.hpp> 00006 using namespace ispike; 00007 00008 //Other includes 00009 #include <iostream> 00010 #include <sstream> 00011 #include <iterator> 00012 #include <string> 00013 #include <map> 00014 #include <utility> 00015 #include <boost/array.hpp> 00016 #include <boost/asio.hpp> 00017 #include <boost/regex.hpp> 00018 #include <boost/lexical_cast.hpp> 00019 #include <boost/mpl/vector.hpp> 00020 using namespace std; 00021 using boost::asio::ip::tcp; 00022 00024 YarpConnection::YarpConnection(string ip, unsigned port){ 00025 //boost::asio::io_service io_service; 00026 LOG(LOG_DEBUG) << "creating a socket"; 00027 this->connectionSocket = new tcp::socket(this->io_service); 00028 LOG(LOG_DEBUG) << "connecting to port"; 00029 int result = connect_to_port(ip, port); 00030 LOG(LOG_DEBUG) << "connected"; 00031 if (!result) { 00032 boost::system::error_code error = boost::asio::error::host_not_found; 00033 throw boost::system::system_error(error); 00034 } 00035 00036 LOG(LOG_DEBUG) << "writing commands"; 00037 write_text("CONNECT foo\n"); 00038 read_until("\n"); 00039 write_text("d\n"); 00040 write_text("list\n"); 00041 00042 boost::system::error_code read_error; 00043 LOG(LOG_DEBUG) << "reading reply"; 00044 string response_string = read_text(); 00045 LOG(LOG_DEBUG) << "disconnecting"; 00046 disconnect(); 00047 LOG(LOG_DEBUG) << "disconnected"; 00048 boost::cmatch matches; 00049 boost::regex new_line("\n"); 00050 list<string> lines; 00051 boost::regex_split(back_inserter(lines), response_string, new_line); 00052 boost::regex expression("registration name (.+?) ip (.+?) port (.+?) type (.+?)"); 00053 portMap.clear(); 00054 while(lines.size()) { 00055 string current_string = *(lines.begin()); 00056 lines.pop_front(); 00057 00058 if (boost::regex_match(current_string.c_str(), matches, expression)) { 00059 string portName(matches[1].first, matches[1].second); 00060 string ip(matches[2].first, matches[2].second); 00061 string portStr(matches[3].first, matches[3].second); 00062 string type(matches[4].first, matches[4].second); 00063 unsigned port = boost::lexical_cast<unsigned>(portStr.c_str()); 00064 YarpPortDetails details(ip, port, type); 00065 portMap[portName] = details; 00066 } 00067 } 00068 00069 LOG(LOG_DEBUG) << "YarpConnection: Received the following Yarp portmap:"; 00070 for (map<string, YarpPortDetails>::iterator it=portMap.begin() ; it != portMap.end(); ++it ) { 00071 LOG(LOG_DEBUG) << (*it).first << " => " << (*it).second.getIp() << ":" << (*it).second.getPort(); 00072 } 00073 } 00074 00075 00077 YarpConnection::~YarpConnection(){ 00078 } 00079 00080 00081 /*------------------------------------------------------------------*/ 00082 /*--------- PUBLIC METHODS -------*/ 00083 /*------------------------------------------------------------------*/ 00084 00088 int YarpConnection::write_text(string message){ 00089 boost::system::error_code connection_error; 00090 boost::asio::write( *this->connectionSocket, boost::asio::buffer(message), 00091 boost::asio::transfer_all(), connection_error); 00092 if (connection_error) throw boost::system::system_error(connection_error); 00093 return(true); 00094 } 00095 00096 00097 string YarpConnection::getSocketString(){ 00098 unsigned int timeoutCtr = 0; 00099 unsigned int timeout = 10000; 00100 while(read_char() != 'o' && timeoutCtr < timeout){ 00101 timeoutCtr++; 00102 } 00103 if(timeoutCtr >= timeout){ 00104 throw ISpikeException("Timeout exceeded in getSocketString"); 00105 } 00106 timeoutCtr = 0; 00107 ostringstream strStream (ostringstream::out); 00108 unsigned char newChar = this->read_char(); 00109 while(newChar != 'd' && timeoutCtr != timeout) { 00110 strStream << newChar; 00111 newChar = this->read_char(); 00112 ++timeoutCtr; 00113 } 00114 if(timeoutCtr >= timeout){ 00115 throw ISpikeException("Timeout exceeded in getSocketString"); 00116 } 00117 return strStream.str(); 00118 } 00119 00120 00124 string YarpConnection::read_until(string until){ 00125 boost::asio::streambuf response; 00126 boost::asio::read_until(*this->connectionSocket, response, until); 00127 istream response_stream(&response); 00128 string response_string((istreambuf_iterator<char>(response_stream)), istreambuf_iterator<char>()); 00129 return(response_string); 00130 } 00131 00132 00137 int YarpConnection::connect_to_port(string ip, unsigned port){ 00138 boost::asio::io_service io_service; 00139 boost::asio::ip::tcp::endpoint endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip.c_str()), port); 00140 boost::system::error_code error = boost::asio::error::host_not_found; 00141 00142 this->connectionSocket->close(); 00143 this->connectionSocket->connect(endpoint, error); 00144 00145 if (error || !this->connectionSocket->is_open()){ 00146 LOG(LOG_DEBUG) << "Exception thrown connecting to port: "<<error.message(); 00147 throw ISpikeException(error.message()); 00148 } 00149 return true; 00150 } 00151 00152 00156 void YarpConnection::prepare_to_read_text(){ 00157 write_text("CONNECT foo\n"); 00158 boost::asio::streambuf response; 00159 boost::asio::read_until(*this->connectionSocket, response, "\n"); 00160 write_text("r\n"); 00161 } 00162 00163 00167 void YarpConnection::prepare_to_read_binary(){ 00168 // Send header to select connection type. 00169 // this header is for fast_tcp, so we don't have to deal with flow control 00170 unsigned char hdr[8] = {'Y','A',0x64, 0x1E, 0, 0,'R','P'}; 00171 write_binary(hdr,8); 00172 00173 // Send name of our port - there is none, so send a name that 00174 // does not start with "/" 00175 unsigned char port[8] = {4,0,0,0,'m','i','n',0}; 00176 write_binary(port,8); 00177 hdr[7] = '\0'; 00178 00179 // Check for acknowledgement 00180 read_binary(hdr,8); 00181 if (hdr[7]!='P') { 00182 throw ISpikeException("Cannot make connection handshake"); 00183 } 00184 00185 // Send header for payload (a command to reverse connection) 00186 unsigned char load_hdr[8] = {'Y','A',10, 0, 0, 0,'R','P'}; 00187 write_binary(load_hdr,8); 00188 unsigned char load_hdr2[10] = { 1, 1, 255, 255, 255, 255, 255, 255, 255, 255 }; 00189 write_binary(load_hdr2,10); 00190 unsigned char load_len[4] = { 8, 0, 0, 0 }; 00191 write_binary(load_len,4); 00192 unsigned char reply_len[4] = { 0, 0, 0, 0 }; 00193 write_binary(reply_len,4); 00194 00195 // send data - this is a command to reverse the connection for reading 00196 unsigned char command_reverse[8] = { 0, 0, 0, 0, '~', 'r', 0, 1 }; 00197 write_binary(command_reverse,8); 00198 } 00199 00200 00203 bool YarpConnection::read_image(Bitmap& bitmap){ 00204 int result = read_data_header(); 00205 if (result>=0) { 00206 unsigned char header[4*15]; 00207 int image_len = result - sizeof(header); 00208 result = read_binary((unsigned char*)header,sizeof(header)); 00209 if (result<0) { 00210 throw ISpikeException("Failed to read image header"); 00211 } 00212 00213 //Extract the width, height and depth of the image 00214 int depth = read_int((unsigned char*)(header+4*8)); // header.depth 00215 int width = read_int((unsigned char*)(header+4*11)); // header.width 00216 int height = read_int((unsigned char*)(header+4*12)); // header.height 00217 00218 //Create new bitmap contents if width, height and depth do not match 00219 if (bitmap.getWidth() != width || bitmap.getHeight() != height || bitmap.getDepth() != depth) { 00220 bitmap.reset(width, height, depth); 00221 } 00222 00223 //Read into bitmap 00224 unsigned char* contents = bitmap.getContents(); 00225 read_binary(contents, bitmap.size()); 00226 return true; 00227 } 00228 return false; 00229 } 00230 00231 00232 /*------------------------------------------------------------------*/ 00233 /*--------- PRIVATE METHODS -------*/ 00234 /*------------------------------------------------------------------*/ 00235 00237 void YarpConnection::disconnect(){ 00238 write_text("q\n"); 00239 this->connectionSocket->close(); 00240 } 00241 00242 00246 int YarpConnection::read_binary(unsigned char* buffer, int length){ 00247 return boost::asio::read(*this->connectionSocket, boost::asio::buffer(buffer, length)); 00248 } 00249 00250 00251 unsigned char YarpConnection::read_char(){ 00252 vector<unsigned char> buf(1); 00253 try { 00254 size_t len = read(*this->connectionSocket, boost::asio::buffer(buf)); 00255 assert(len == 1); 00256 //cout << buf[0]; 00257 return buf[0]; 00258 // process the 4 bytes in buf 00259 } 00260 catch (...) { 00261 throw new ISpikeException("Error in read_char!" ); 00262 } 00263 } 00264 00265 00267 int YarpConnection::read_data_header(){ 00268 int i; 00269 unsigned char load_hdr_ref[8] = {'Y','A',10, 0, 0, 0,'R','P'}; 00270 unsigned char load_hdr[8] = {0,0,0,0,0,0,0,0}; 00271 read_binary(load_hdr,8); 00272 for (i=0; i<8; i++) { 00273 if (load_hdr[i]!=load_hdr_ref[i]) { 00274 throw ISpikeException("Unexpected data received"); 00275 } 00276 } 00277 unsigned char load_hdr2[10]; 00278 load_hdr2[1] = 0; 00279 read_binary(load_hdr2,10); 00280 if (load_hdr2[1]!=1) { 00281 throw ISpikeException("Corrupt data received"); 00282 } 00283 int blocks = load_hdr2[0]; 00284 int len = 0; 00285 unsigned char load_len[4]; 00286 unsigned int port_message_len = 0; 00287 for (i=0; i<blocks; i++) { 00288 read_binary((unsigned char *)load_len,4); 00289 len += load_len[0] + (load_len[1]<<8) + (load_len[2]<<16) + 00290 (load_len[3]<<24); 00291 if (i==0) { 00292 port_message_len = len; 00293 } 00294 } 00295 read_binary((unsigned char *)load_len,4); 00296 for (i=0; i<4; i++) { 00297 if (load_len[i]!=0) 00298 throw ISpikeException("Unexpected lengths received"); 00299 } 00300 00301 // extract the port header part 00302 unsigned char command_header[8]; 00303 command_header[4] = '\0'; 00304 read_binary(command_header,8); 00305 len -= 8; 00306 port_message_len -= 8; 00307 if (command_header[4]!='~') { 00308 throw ISpikeException("Unexpected port command received"); 00309 } 00310 unsigned char cmd[256] = "?"; 00311 cmd[0] = command_header[5]; 00312 if (cmd[0]=='\0') { 00313 if (port_message_len>sizeof(cmd)) { 00314 throw ISpikeException("Port command too big\n"); 00315 } 00316 read_binary(cmd,port_message_len); 00317 len -= port_message_len; 00318 } 00319 if (cmd[0]!='d') { 00320 throw ISpikeException("Unexpected port command!"); 00321 } 00322 00323 return len; 00324 } 00325 00326 00328 int YarpConnection::read_int(unsigned char* buf){ 00329 unsigned char *ubuf = (unsigned char *)buf; 00330 // this could be optimized away on little-endian machines! 00331 return ubuf[0] + (ubuf[1]<<8) + (ubuf[2]<<16) + (ubuf[3]<<24); 00332 } 00333 00334 00337 string YarpConnection::read_text(){ 00338 return read_until("*** end of message"); 00339 } 00340 00341 00343 int YarpConnection::write_binary(unsigned char* buffer, int length){ 00344 boost::system::error_code connection_error; 00345 boost::asio::write( *this->connectionSocket, boost::asio::buffer(buffer, length), 00346 boost::asio::transfer_all(), connection_error); 00347 if (connection_error) throw boost::system::system_error(connection_error); 00348 return(true); 00349 }