00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include <sys/socket.h>
00029 #include <netinet/in.h>
00030 #include <sys/types.h>
00031 #include <errno.h>
00032
00033 #include "TcpReporting.hpp"
00034 #include "rtt/NonPeriodicActivity.hpp"
00035 #include "rtt/Logger.hpp"
00036 #include "rtt/os/Mutex.hpp"
00037 #include "socket.hpp"
00038 #include "socketmarshaller.hpp"
00039
00040 using RTT::Logger;
00041 using RTT::OS::Mutex;
00042
00043 #include "ocl/ComponentLoader.hpp"
00044 ORO_LIST_COMPONENT_TYPE(OCL::TcpReporting);
00045
00046 namespace OCL
00047 {
00052 class ListenThread
00053 : public RTT::NonPeriodicActivity
00054 {
00055 private:
00056 bool inBreak;
00057 static ListenThread* _instance;
00058 RTT::SocketMarshaller* _marshaller;
00059 unsigned short _port;
00060 bool _accepting;
00061 int _sock;
00062
00063 bool listen()
00064 {
00065 _sock = ::socket(PF_INET, SOCK_STREAM, 0);
00066 if( _sock < 0 )
00067 {
00068 Logger::log() << Logger::Error << "Socket creation failed." << Logger::endl;
00069 return false;
00070 }
00071
00072 struct sockaddr_in localsocket;
00073 struct sockaddr remote;
00074 int adrlen = sizeof(remote);
00075
00076 localsocket.sin_family = AF_INET;
00077 localsocket.sin_port = htons(_port);
00078 localsocket.sin_addr.s_addr = INADDR_ANY;
00079 if( ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) ) < 0 )
00080 {
00081
00082
00083
00084
00085 #define TRY_OTHER_PORTS
00086
00087 #ifdef TRY_OTHER_PORTS
00088 int i = 1;
00089 int r = -1;
00090 while( errno == EADDRINUSE && i < 5 && r < 0 )
00091 {
00092 localsocket.sin_port = htons(_port + i);
00093 r = ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) );
00094 i++;
00095 }
00096 if( r >= 0 )
00097 {
00098 Logger::log() << Logger::Info << "Port occupied, use port " << (_port+i-1) << " instead." << Logger::endl;
00099 } else {
00100 #endif
00101 if( errno == EADDRINUSE )
00102 {
00103 Logger::log() << Logger::Error << "Binding of port failed: address already in use." << Logger::endl;
00104 } else {
00105 Logger::log() << Logger::Error << "Binding of port failed with errno " << errno << Logger::endl;
00106 }
00107 ::close(_sock);
00108 return false;
00109 #ifdef TRY_OTHER_PORTS
00110 }
00111 #endif
00112 }
00113
00114 if( ::listen(_sock, 2) < 0 )
00115 {
00116 Logger::log() << Logger::Info << "Cannot listen on socket" << Logger::endl;
00117 ::close(_sock);
00118 return true;
00119 }
00120 while(_accepting)
00121 {
00122 int socket = ::accept( _sock, &remote,
00123 reinterpret_cast<socklen_t*>(&adrlen) );
00124 if( socket == -1 )
00125 {
00126 return false;
00127 }
00128 if( _accepting )
00129 {
00130 Logger::log() << Logger::Info << "Incoming connection" << Logger::endl;
00131 _marshaller->addConnection( new Orocos::TCP::Socket(socket) );
00132 }
00133 }
00134 return true;
00135 }
00136
00137 ListenThread( RTT::SocketMarshaller* marshaller, unsigned short port )
00138 : NonPeriodicActivity(10), _marshaller(marshaller)
00139 {
00140 inBreak = false;
00141 removeInstance();
00142 _accepting = true;
00143 _port = port;
00144 Logger::log() << Logger::Info << "Starting server on port " << port << Logger::endl;
00145 this->NonPeriodicActivity::start();
00146 }
00147
00148
00149 void removeInstance()
00150 {
00151 if( _instance )
00152 {
00153 delete _instance;
00154 }
00155 }
00156
00157 public:
00158 ~ListenThread()
00159 {
00160 _accepting = false;
00161 }
00162
00163 virtual void loop()
00164 {
00165 if( !inBreak )
00166 {
00167 if( !listen() )
00168 {
00169 Logger::log() << Logger::Error << "Could not listen on port " << _port << Logger::endl;
00170 } else {
00171 Logger::log() << Logger::Info << "Shutting down server" << Logger::endl;
00172 }
00173 }
00174 }
00175
00176 virtual bool breakLoop()
00177 {
00178 inBreak = true;
00179 _accepting = false;
00180 ::close( _sock );
00181
00182 int sock = ::socket(PF_INET, SOCK_STREAM, 0);
00183 if( sock > 0 )
00184 {
00185 struct sockaddr_in socket;
00186 socket.sin_family = AF_INET;
00187 socket.sin_port = htons(_port);
00188 socket.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
00189 ::connect( sock, (struct sockaddr*)&socket, sizeof(socket) );
00190 ::close( sock );
00191 }
00192 return true;
00193 }
00194
00195 static void createInstance( RTT::SocketMarshaller* marshaller, unsigned short port = 3142 )
00196 {
00197
00198
00199
00200 ListenThread::_instance = new ListenThread( marshaller, port );
00201
00202 }
00203
00204 static void destroyInstance()
00205 {
00206 ListenThread::_instance->breakLoop();
00207 }
00208 };
00209 ListenThread* ListenThread::_instance = 0;
00210 }
00211
00212 namespace OCL
00213 {
00214 TcpReporting::TcpReporting(std::string fr_name )
00215 : ReportingComponent( fr_name ),
00216 port_prop("port","port to listen/send to",3142)
00217 {
00218 _finishing = false;
00219 this->properties()->addProperty(&port_prop);
00220 }
00221
00222 TcpReporting::~TcpReporting()
00223 {
00224 }
00225
00226 const RTT::PropertyBag* TcpReporting::getReport()
00227 {
00228 makeReport();
00229 return &report;
00230 }
00231
00232 bool TcpReporting::configureHook(){
00233 port=port_prop.value();
00234 return true;
00235 }
00236
00237 bool TcpReporting::startHook()
00238 {
00239 RTT::Logger::In in("TcpReporting::startup");
00240 fbody = new RTT::SocketMarshaller(this);
00241 this->addMarshaller( 0, fbody );
00242 ListenThread::createInstance( fbody, port );
00243 return ReportingComponent::startHook();
00244 }
00245
00246 void TcpReporting::stopHook()
00247 {
00248 _finishing = true;
00249 ListenThread::destroyInstance();
00250 fbody->shutdown();
00251 ReportingComponent::stopHook();
00252 this->removeMarshallers();
00253 }
00254 }