diff options
Diffstat (limited to 'dds_io_pub/Writer.cpp')
| -rw-r--r-- | dds_io_pub/Writer.cpp | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/dds_io_pub/Writer.cpp b/dds_io_pub/Writer.cpp new file mode 100644 index 0000000..82d5173 --- /dev/null +++ b/dds_io_pub/Writer.cpp @@ -0,0 +1,146 @@ +// -*- C++ -*- +// +// $Id: Writer.cpp 784 2007-05-30 19:26:41Z harriss $ +#include "Writer.h" +#include "PortTypeSupportC.h" +#include <ace/OS_NS_unistd.h> +#include <ace/streams.h> +#include <time.h> + +using namespace IOTest; + +const int num_instances_per_writer = 1; +const int num_messages = 10; + +Writer::Writer(::DDS::DataWriter_ptr writer) +: writer_ (::DDS::DataWriter::_duplicate (writer)), + finished_instances_ (0), + timeout_writes_ (0) +{ + cout<<"Writer instanced\n"; +} + +void +Writer::start () +{ + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::start \n"))); + // Lanuch num_instances_per_writer threads. + // Each thread writes one instance which uses the thread id as the + // key value. + if (activate (THR_NEW_LWP | THR_JOINABLE, num_instances_per_writer) == -1) { + cerr << "Writer::start(): activate failed" << endl; + exit(1); + } +} + +void +Writer::end () +{ + ACE_DEBUG((LM_DEBUG, + ACE_TEXT("(%P|%t) Writer::end \n"))); + wait (); +} + + +int +Writer::svc () +{ + ACE_DEBUG((LM_DEBUG, + ACE_TEXT("(%P|%t) Writer::svc begins.\n"))); + + ::DDS::InstanceHandleSeq handles; + try { + + cout<<"Writer thread waiting for subscription\n"; + + while (1) + { + writer_->get_matched_subscriptions(handles); + if (handles.length() > 0) + break; + else + ACE_OS::sleep(ACE_Time_Value(0,200000)); + + cout<<"%\n"; + } + + cout<<"Writer thread got subscriptions\n"; + + ::IOTest::PortDataWriter_var port_dw + = ::IOTest::PortDataWriter::_narrow(writer_.in()); + if (CORBA::is_nil (port_dw.in ())) { + cerr << "Data Writer could not be narrowed"<< endl; + exit(1); + } + + cout<<"Writer thread, DataWriter Cast ok\n"; + + struct timespec t_sleep; + t_sleep.tv_sec = 1; + t_sleep.tv_nsec = 500000; + + IOTest::Port port_obj; + ::DDS::InstanceHandle_t handle = port_dw->_cxx_register (port_obj); + + port_obj.no = 1; + port_obj.value = 0xff; + + ACE_DEBUG((LM_DEBUG, + ACE_TEXT("%T (%P|%t) Writer::svc starting to write.\n"))); + while(1) { + ::DDS::ReturnCode_t ret = port_dw->write(port_obj, handle); + + if (ret != ::DDS::RETCODE_OK) { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("(%P|%t)ERROR Writer::svc, ") + ACE_TEXT ("write() returned %d.\n"), + ret)); + if (ret == ::DDS::RETCODE_TIMEOUT) { + timeout_writes_ ++; + } + cout<<"?\n"; + } + + if( port_obj.value == 0xff ) + port_obj.value = 0x00; + else + port_obj.value = 0xff; + + nanosleep(&t_sleep, NULL); + + cout<<".\n"; + + } + } catch (CORBA::Exception& e) { + cerr << "Exception caught in svc:" << endl + << e << endl; + } + + while (1) + { + writer_->get_matched_subscriptions(handles); + if (handles.length() == 0) + break; + else + ACE_OS::sleep(1); + cout<<"!\n"; + } + ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::svc finished.\n"))); + + finished_instances_ ++; + + return 0; +} + + +bool +Writer::is_finished () const +{ + return finished_instances_ == num_instances_per_writer; +} + +int +Writer::get_timeout_writes () const +{ + return timeout_writes_.value (); +} |
