Orocos Real-Time Toolkit
2.6.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.cpp 00003 00004 ConnFactory.cpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #include "../Port.hpp" 00040 #include "ConnFactory.hpp" 00041 #include "../base/InputPortInterface.hpp" 00042 #include "../DataFlowInterface.hpp" 00043 #include "../types/TypeMarshaller.hpp" 00044 00045 using namespace std; 00046 using namespace RTT; 00047 using namespace RTT::internal; 00048 00049 bool LocalConnID::isSameID(ConnID const& id) const 00050 { 00051 LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id); 00052 if (!real_id) 00053 return false; 00054 else return real_id->ptr == this->ptr; 00055 } 00056 00057 ConnID* LocalConnID::clone() const { 00058 return new LocalConnID(this->ptr); 00059 } 00060 00061 bool StreamConnID::isSameID(ConnID const& id) const 00062 { 00063 StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id); 00064 if (!real_id) 00065 return false; 00066 else return real_id->name_id == this->name_id; 00067 } 00068 00069 ConnID* StreamConnID::clone() const { 00070 return new StreamConnID(this->name_id); 00071 } 00072 00073 base::ChannelElementBase::shared_ptr RTT::internal::ConnFactory::createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, const ConnPolicy& policy) 00074 { 00075 // Remote connection 00076 // if the policy's transport is set to zero, use the input ports server protocol, 00077 // otherwise, use the policy's protocol 00078 int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport; 00079 types::TypeInfo const* type_info = output_port.getTypeInfo(); 00080 if (!type_info || input_port.getTypeInfo() != type_info) 00081 { 00082 log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog(); 00083 // There is no type info registered for this type 00084 return base::ChannelElementBase::shared_ptr(); 00085 } 00086 else if ( !type_info->getProtocol( transport ) ) 00087 { 00088 log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog(); 00089 // This type cannot be marshalled into the right transporter 00090 return base::ChannelElementBase::shared_ptr(); 00091 } 00092 else 00093 { 00094 return input_port. 00095 buildRemoteChannelOutput(output_port, type_info, input_port, policy); 00096 } 00097 return base::ChannelElementBase::shared_ptr(); 00098 } 00099 00100 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy) { 00101 // Register the channel's input to the output port. 00102 if ( output_port.addConnection( input_port.getPortID(), channel_input, policy ) ) { 00103 // notify input that the connection is now complete. 00104 if ( input_port.channelReady( channel_input->getOutputEndPoint() ) == false ) { 00105 output_port.disconnect( &input_port ); 00106 log(Error) << "The input port "<< input_port.getName() 00107 << " could not successfully read from the connection from output port " << output_port.getName() <<endlog(); 00108 00109 return false; 00110 } 00111 log(Debug) << "Connected output port "<< output_port.getName() 00112 << " successfully to " << input_port.getName() <<endlog(); 00113 return true; 00114 } 00115 // setup failed. 00116 channel_input->disconnect(true); 00117 log(Error) << "The output port "<< output_port.getName() 00118 << " could not successfully use the connection to input port " << input_port.getName() <<endlog(); 00119 return false; 00120 } 00121 00122 bool ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id) { 00123 if (policy.transport == 0 ) { 00124 log(Error) << "Need a transport for creating streams." <<endlog(); 00125 return false; 00126 } 00127 const types::TypeInfo* type = output_port.getTypeInfo(); 00128 if ( type->getProtocol(policy.transport) == 0 ) { 00129 log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog(); 00130 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog(); 00131 return false; 00132 } 00133 types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) ); 00134 if (ttt) { 00135 int size_hint = ttt->getSampleSize( output_port.getDataSource() ); 00136 policy.data_size = size_hint; 00137 } else { 00138 log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog(); 00139 } 00140 RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, true); 00141 00142 if ( !chan_stream ) { 00143 log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog(); 00144 return false; 00145 } 00146 chan->setOutput( chan_stream ); 00147 00148 if ( output_port.addConnection( new StreamConnID(policy.name_id), chan, policy) ) { 00149 log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog(); 00150 return true; 00151 } 00152 // setup failed. 00153 log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog(); 00154 return false; 00155 } 00156 00157 bool ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) { 00158 if (policy.transport == 0 ) { 00159 log(Error) << "Need a transport for creating streams." <<endlog(); 00160 return false; 00161 } 00162 const types::TypeInfo* type = input_port.getTypeInfo(); 00163 if ( type->getProtocol(policy.transport) == 0 ) { 00164 log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog(); 00165 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog(); 00166 return false; 00167 } 00168 00169 // note: don't refcount this final input chan, because no one will 00170 // take a reference to it. It would be destroyed upon return of this function. 00171 RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port,policy, false); 00172 00173 if ( !chan ) { 00174 log(Error) << "Transport failed to create remote channel for input stream of port "<<input_port.getName() << endlog(); 00175 return false; 00176 } 00177 00178 // In stream mode, a buffer is always installed at input side. 00179 // 00180 ConnPolicy policy2 = policy; 00181 policy2.pull = false; 00182 // pass new name upwards. 00183 policy.name_id = policy2.name_id; 00184 conn_id->name_id = policy2.name_id; 00185 00186 chan->getOutputEndPoint()->setOutput( outhalf ); 00187 if ( input_port.channelReady( chan->getOutputEndPoint() ) == true ) { 00188 log(Info) << "Created input stream for input port "<< input_port.getName() <<endlog(); 00189 return true; 00190 } 00191 // setup failed: manual cleanup. 00192 chan = 0; // deleted by channelReady() above ! 00193 log(Error) << "Failed to create input stream for input port "<< input_port.getName() <<endlog(); 00194 return false; 00195 } 00196 00197 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port, 00198 base::InputPortInterface& input_port, 00199 ConnPolicy const& policy, 00200 base::ChannelElementBase::shared_ptr output_half, 00201 StreamConnID* conn_id) 00202 { 00203 // create input half using a transport. 00204 const types::TypeInfo* type = output_port.getTypeInfo(); 00205 if ( type->getProtocol(policy.transport) == 0 ) { 00206 log(Error) << "Could not create out-of-band transport for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog(); 00207 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog(); 00208 return 0; 00209 } 00210 00211 // we force the creation of a buffer on input side 00212 ConnPolicy policy2 = policy; 00213 policy2.pull = false; 00214 conn_id->name_id = policy2.name_id; 00215 00216 // check if marshaller supports size hints: 00217 types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*>( type->getProtocol(policy.transport) ); 00218 if (ttt) { 00219 policy2.data_size = ttt->getSampleSize( output_port.getDataSource() ); 00220 } else { 00221 log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog(); 00222 } 00223 // XXX: this seems to be always true 00224 if ( input_port.isLocal() ) { 00225 RTT::base::ChannelElementBase::shared_ptr ceb_input = type->getProtocol(policy.transport)->createStream(&input_port, policy2, false); 00226 if (ceb_input) { 00227 log(Info) <<"Receiving data for port "<<input_port.getName() << " from out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id<<endlog(); 00228 } else { 00229 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << input_port.getName()<<endlog(); 00230 return 0; 00231 } 00232 ceb_input->getOutputEndPoint()->setOutput(output_half); 00233 output_half = ceb_input; 00234 } 00235 00236 // XXX: this seems to be always true 00237 if ( output_port.isLocal() ) { 00238 00239 RTT::base::ChannelElementBase::shared_ptr ceb_output = type->getProtocol(policy.transport)->createStream(&output_port, policy2, true); 00240 if (ceb_output) { 00241 log(Info) <<"Redirecting data for port "<< output_port.getName() << " to out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id <<endlog(); 00242 } else { 00243 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << output_port.getName()<<endlog(); 00244 return 0; 00245 } 00246 // this mediates the 'channel ready leads to initial data sample'. 00247 // it is probably not necessary, since streams don't assume this relation. 00248 ceb_output->getOutputEndPoint()->setOutput(output_half); 00249 output_half = ceb_output; 00250 } 00251 // Important ! since we made a copy above, we need to set the original to the changed name_id. 00252 policy.name_id = policy2.name_id; 00253 conn_id->name_id = policy2.name_id; 00254 00255 return output_half; 00256 00257 } 00258