00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include <vector>
00029 #include <rtt/Logger.hpp>
00030 #include <rtt/os/Mutex.hpp>
00031 #include <rtt/Property.hpp>
00032 #include <rtt/PropertyIntrospection.hpp>
00033 #include "socket.hpp"
00034 #include "socketmarshaller.hpp"
00035 #include "datasender.hpp"
00036 #include "command.hpp"
00037 #include "TcpReporting.hpp"
00038 #include <rtt/TemplateTypeInfo.hpp>
00039
00040 namespace OCL
00041 {
00042 namespace TCP
00043 {
00044 Datasender::Datasender(RTT::SocketMarshaller* _marshaller, Orocos::TCP::Socket* _os):
00045 NonPeriodicActivity(10), os( _os ), marshaller(_marshaller)
00046 {
00047 limit = 0;
00048 curframe = 0;
00049 reporter = marshaller->getReporter();
00050 silenced = true;
00051 interpreter = new TcpReportingInterpreter(this);
00052 }
00053
00054 Datasender::~Datasender()
00055 {
00056 subscriptions.clear();
00057 delete interpreter;
00058 delete os;
00059 }
00060
00061 void Datasender::loop()
00062 {
00063 *os << "100 Orocos 1.0 TcpReporting Server 1.0" << std::endl;
00064 while( os->isValid() )
00065 {
00066 interpreter->process();
00067 }
00068 Logger::log() << Logger::Info << "Connection closed!" << Logger::endl;
00069 }
00070
00071 bool Datasender::breakloop()
00072 {
00073 os->close();
00074 return true;
00075 }
00076
00077 RTT::SocketMarshaller* Datasender::getMarshaller() const
00078 {
00079 return marshaller;
00080 }
00081
00082 Socket& Datasender::getSocket() const
00083 {
00084 return *os;
00085 }
00086
00087 bool Datasender::isValid() const
00088 {
00089 return os && os->isValid();
00090 }
00091
00092 bool Datasender::addSubscription(const std::string name )
00093 {
00094 lock.lock();
00095 log(Debug)<<"Datasender::addSubscription: "<<name<<endlog();
00096
00097 if(reporter->getReport()->find(name)!=NULL){
00098
00099 std::vector<std::string>::const_iterator pos =
00100 find(subscriptions.begin(),subscriptions.end(),name);
00101 if(pos!=subscriptions.end()){
00102 Logger::In("DataSender");
00103 log(Info)<<"Already subscribed to "<<name<<endlog();
00104 lock.unlock();
00105 return false;
00106 }else{
00107 Logger::In("DataSender");
00108 log(Info)<<"Adding subscription for "<<name<<endlog();
00109 subscriptions.push_back(name);
00110 lock.unlock();
00111 return true;
00112 }
00113 }else{
00114 Logger::In("DataSender");
00115 log(Error)<<name<<" is not available for reporting"<<endlog();
00116 lock.unlock();
00117 return false;
00118 }
00119 }
00120
00121 void Datasender::remove()
00122 {
00123 getMarshaller()->removeConnection( this );
00124 }
00125
00126 bool Datasender::removeSubscription( const std::string& name )
00127 {
00128 lock.lock();
00129
00130 std::vector<std::string>::iterator pos =
00131 find(subscriptions.begin(),subscriptions.end(),name);
00132 if(pos!=subscriptions.end()){
00133 Logger::In("DataSender");
00134 log(Info)<<"Removing subscription for "<<name<<endlog();
00135 subscriptions.erase(pos);
00136 lock.unlock();
00137 return true;
00138 }else{
00139 Logger::In("DataSenser");
00140 log(Error)<<"No subscription found for "<<name<<endlog();
00141 lock.unlock();
00142 return false;
00143 }
00144 }
00145
00146 void Datasender::listSubscriptions()
00147 {
00148 for(std::vector<std::string>::const_iterator elem=subscriptions.begin();
00149 elem!=subscriptions.end();elem++)
00150 *os<<"305 "<< *elem<<std::endl;
00151 *os << "306 End of list" << std::endl;
00152 }
00153
00154 void Datasender::writeOut(PropertyBase* v)
00155 {
00156 *os<<"202 "<<v->getName()<<"\n";
00157 Property<PropertyBag>* bag = dynamic_cast< Property<PropertyBag>* >( v );
00158 if ( bag )
00159 this->writeOut( bag->value() );
00160 else {
00161 *os<<"205 " <<v->getDataSource()<<"\n";
00162 }
00163
00164 }
00165
00166 void Datasender::writeOut(const PropertyBag &v)
00167 {
00168 for (
00169 PropertyBag::const_iterator i = v.getProperties().begin();
00170 i != v.getProperties().end();
00171 i++ )
00172 {
00173 this->writeOut( *i );
00174 }
00175
00176 }
00177
00178
00179 void Datasender::checkbag(const PropertyBag &v)
00180 {
00181 log(Debug)<<"Let's check the subscriptions"<<endlog();
00182 for(std::vector<std::string>::iterator elem = subscriptions.begin();
00183 elem!=subscriptions.end();elem++){
00184 PropertyBase* prop = reporter->getReport()->find(*elem);
00185 if(prop!=NULL){
00186 writeOut(prop);
00187 }else{
00188 Logger::In("DataSender");
00189 log(Error)<<*elem<<" not longer available for reporting,"<<
00190 ", removing the subscription."<<endlog();
00191 subscriptions.erase(elem);
00192 elem--;
00193 }
00194 }
00195 }
00196
00197 void Datasender::silence(bool newstate)
00198 {
00199 silenced = newstate;
00200 }
00201
00202 void Datasender::setLimit(unsigned long long newlimit)
00203 {
00204 limit = newlimit;
00205 }
00206
00207 void Datasender::serialize(const PropertyBag &v)
00208 {
00209 if( silenced ) {
00210 return;
00211 }
00212
00213 lock.lock();
00214 if( !subscriptions.empty() && ( limit == 0 || curframe <= limit ) ){
00215 *os << "201 " <<curframe << " -- begin of frame\n";
00216 checkbag(v);
00217 *os << "203 " << curframe << " -- end of frame" << std::endl;
00218 curframe++;
00219 if( curframe > limit && limit != 0 )
00220 {
00221 *os << "204 Limit reached" << std::endl;
00222 }
00223 }
00224 lock.unlock();
00225 }
00226
00227 }
00228 }