CorbaBuffer.hpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #ifndef ORO_CORBA_BUFFER_HPP
00041 #define ORO_CORBA_BUFFER_HPP
00042
00043 #include "../BufferInterface.hpp"
00044 #include "DataFlowI.h"
00045 #include "DataFlowC.h"
00046 #include "DataFlowS.h"
00047 #include "orbsvcs/CosEventChannelAdminC.h"
00048 #include "orbsvcs/CosEventCommC.h"
00049
00050
00051 namespace RTT
00052 { namespace Corba {
00053
00058 template<class T>
00059 class CorbaBuffer
00060 : public virtual POA_RTT::Corba::BufferChannel,
00061 public virtual PortableServer::RefCountServantBase
00062 {
00063
00064 class CorbaBufferPushI
00065 : public virtual POA_CosEventComm::PushConsumer,
00066 public virtual PortableServer::RefCountServantBase
00067 {
00068 typename BufferInterface<T>::shared_ptr mimpl;
00070 CosEventChannelAdmin::EventChannel_var mec;
00072 CosEventChannelAdmin::ProxyPushSupplier_var cproxy;
00073 public:
00074
00075 typedef typename ReadInterface<T>::reference_t reference_t;
00076 typedef typename WriteInterface<T>::param_t param_t;
00077 typedef typename BufferInterface<T>::size_type size_type;
00078 typedef T value_t;
00079
00085 CorbaBufferPushI(typename BufferInterface<T>::shared_ptr buf, CosEventChannelAdmin::EventChannel_ptr ec )
00086 : mimpl(buf), mec(CosEventChannelAdmin::EventChannel::_duplicate(ec) )
00087 {
00088
00089 CosEventChannelAdmin::ConsumerAdmin_var cadm = mec->for_consumers();
00090 cproxy = cadm->obtain_push_supplier();
00091 CosEventComm::PushConsumer_var puscon = POA_CosEventComm::PushConsumer::_this();
00092 cproxy->connect_push_consumer( puscon.in() );
00093 }
00094
00098 ~CorbaBufferPushI() {
00099 try {
00100 cproxy->disconnect_push_supplier();
00101 } catch(...) {}
00102 }
00103
00104 virtual CosEventChannelAdmin::EventChannel_ptr getChannel()
00105 ACE_THROW_SPEC ((
00106 CORBA::SystemException
00107 ))
00108 {
00109 return CosEventChannelAdmin::EventChannel::_duplicate(mec);
00110 }
00111
00112
00113 virtual void push (
00114 const ::CORBA::Any & data
00115 )
00116 ACE_THROW_SPEC ((
00117 CORBA::SystemException,
00118 ::CosEventComm::Disconnected
00119 ))
00120 {
00121 value_t new_value = value_t();
00122 ReferenceDataSource<T> rds( new_value );
00123 rds.ref();
00124 if ( rds.update( data ) == false) {
00125 Logger::log() <<Logger::Error << "Could not accept remote value: wrong data type."<<Logger::endl;
00126 return;
00127 }
00128
00129 mimpl->Push( new_value );
00130 }
00131
00132 virtual void disconnect_push_consumer ()
00133 ACE_THROW_SPEC ((
00134 CORBA::SystemException
00135 ))
00136 {
00137
00138 }
00139
00140 };
00141
00142 class CorbaBufferPullI
00143 : public virtual POA_CosEventComm::PullSupplier,
00144 public virtual PortableServer::RefCountServantBase
00145 {
00146 typename BufferInterface<T>::shared_ptr mimpl;
00148 CosEventChannelAdmin::EventChannel_var mec;
00150 CosEventChannelAdmin::ProxyPullConsumer_var sproxy;
00151 public:
00152
00153 typedef typename ReadInterface<T>::reference_t reference_t;
00154 typedef typename WriteInterface<T>::param_t param_t;
00155 typedef typename BufferInterface<T>::size_type size_type;
00156 typedef T value_t;
00157
00163 CorbaBufferPullI(typename BufferInterface<T>::shared_ptr buf, CosEventChannelAdmin::EventChannel_ptr ec )
00164 : mimpl(buf), mec(CosEventChannelAdmin::EventChannel::_duplicate(ec) )
00165 {
00166
00167 CosEventChannelAdmin::SupplierAdmin_var sadm = mec->for_suppliers();
00168 sproxy = sadm->obtain_pull_consumer();
00169 CosEventComm::PullSupplier_var pulsup = POA_CosEventComm::PullSupplier::_this();
00170 sproxy->connect_pull_supplier( pulsup.in() );
00171 }
00172
00176 ~CorbaBufferPullI() {
00177 try {
00178 sproxy->disconnect_pull_consumer();
00179 } catch(...) {}
00180 }
00181
00185 virtual CORBA::Any * pull ()
00186 ACE_THROW_SPEC ((
00187 CORBA::SystemException,
00188 ::CosEventComm::Disconnected
00189 ))
00190 {
00191 ReferenceDataSource<T> rds( mimpl->front() );
00192 rds.ref();
00193 CORBA::Any_var toset = (CORBA::Any_ptr)rds.createBlob(ORO_CORBA_PROTOCOL_ID);
00194 return toset._retn();
00195 }
00196
00200 virtual CORBA::Any * try_pull (
00201 ::CORBA::Boolean_out has_event
00202 )
00203 ACE_THROW_SPEC ((
00204 CORBA::SystemException,
00205 ::CosEventComm::Disconnected
00206 ))
00207 {
00208 value_t data = value_t();
00209 has_event = mimpl->Pop( data );
00210 ReferenceDataSource<T> rds( data );
00211 rds.ref();
00212 CORBA::Any_var toset = (CORBA::Any_ptr)rds.createBlob(ORO_CORBA_PROTOCOL_ID);
00213 return toset._retn();
00214 }
00215
00216 virtual void disconnect_pull_supplier ()
00217 ACE_THROW_SPEC ((
00218 CORBA::SystemException
00219 ))
00220 {
00221
00222 }
00223
00224 };
00225
00226 typename BufferInterface<T>::shared_ptr mimpl;
00227
00229 CosEventChannelAdmin::EventChannel_var mec;
00230
00231 CosEventComm::PullSupplier_var pulsup;
00232 CosEventComm::PushConsumer_var puscon;
00233 public:
00234
00235 typedef typename ReadInterface<T>::reference_t reference_t;
00236 typedef typename WriteInterface<T>::param_t param_t;
00237 typedef typename BufferInterface<T>::size_type size_type;
00238 typedef T value_t;
00239
00245 CorbaBuffer(typename BufferInterface<T>::shared_ptr buf, CosEventChannelAdmin::EventChannel_ptr ec )
00246 : mimpl(buf), mec(CosEventChannelAdmin::EventChannel::_duplicate(ec) )
00247 {
00248 pulsup = (new CorbaBufferPullI(buf, CosEventChannelAdmin::EventChannel::_duplicate(ec) ))->_this();
00249 puscon = (new CorbaBufferPushI(buf, CosEventChannelAdmin::EventChannel::_duplicate(ec) ))->_this();
00250 }
00251
00255 ~CorbaBuffer() {
00256 }
00257
00258 virtual CosEventChannelAdmin::EventChannel_ptr getChannel()
00259 ACE_THROW_SPEC ((
00260 CORBA::SystemException
00261 ))
00262 {
00263 return CosEventChannelAdmin::EventChannel::_duplicate(mec);
00264 }
00265
00266 virtual CORBA::Long capacity()
00267 ACE_THROW_SPEC ((
00268 CORBA::SystemException
00269 ))
00270 {
00271 return mimpl->capacity();
00272 }
00273
00274 virtual CORBA::Long size()
00275 ACE_THROW_SPEC ((
00276 CORBA::SystemException
00277 ))
00278 {
00279 return mimpl->size();
00280 }
00281
00282 virtual void clear()
00283 ACE_THROW_SPEC ((
00284 CORBA::SystemException
00285 ))
00286 {
00287 mimpl->clear();
00288 }
00289
00290 virtual CORBA::Boolean empty()
00291 ACE_THROW_SPEC ((
00292 CORBA::SystemException
00293 ))
00294 {
00295 return mimpl->empty();
00296 }
00297
00298 virtual CORBA::Boolean full()
00299 ACE_THROW_SPEC ((
00300 CORBA::SystemException
00301 ))
00302 {
00303 return mimpl->full();
00304 }
00305 };
00306 }}
00307 #endif