summaryrefslogtreecommitdiff
path: root/dds_io_pub/Writer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dds_io_pub/Writer.cpp')
-rw-r--r--dds_io_pub/Writer.cpp146
1 files changed, 146 insertions, 0 deletions
diff --git a/dds_io_pub/Writer.cpp b/dds_io_pub/Writer.cpp
new file mode 100644
index 0000000..82d5173
--- /dev/null
+++ b/dds_io_pub/Writer.cpp
@@ -0,0 +1,146 @@
+// -*- C++ -*-
+//
+// $Id: Writer.cpp 784 2007-05-30 19:26:41Z harriss $
+#include "Writer.h"
+#include "PortTypeSupportC.h"
+#include <ace/OS_NS_unistd.h>
+#include <ace/streams.h>
+#include <time.h>
+
+using namespace IOTest;
+
+const int num_instances_per_writer = 1;
+const int num_messages = 10;
+
+Writer::Writer(::DDS::DataWriter_ptr writer)
+: writer_ (::DDS::DataWriter::_duplicate (writer)),
+ finished_instances_ (0),
+ timeout_writes_ (0)
+{
+ cout<<"Writer instanced\n";
+}
+
+void
+Writer::start ()
+{
+ ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::start \n")));
+ // Lanuch num_instances_per_writer threads.
+ // Each thread writes one instance which uses the thread id as the
+ // key value.
+ if (activate (THR_NEW_LWP | THR_JOINABLE, num_instances_per_writer) == -1) {
+ cerr << "Writer::start(): activate failed" << endl;
+ exit(1);
+ }
+}
+
+void
+Writer::end ()
+{
+ ACE_DEBUG((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Writer::end \n")));
+ wait ();
+}
+
+
+int
+Writer::svc ()
+{
+ ACE_DEBUG((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Writer::svc begins.\n")));
+
+ ::DDS::InstanceHandleSeq handles;
+ try {
+
+ cout<<"Writer thread waiting for subscription\n";
+
+ while (1)
+ {
+ writer_->get_matched_subscriptions(handles);
+ if (handles.length() > 0)
+ break;
+ else
+ ACE_OS::sleep(ACE_Time_Value(0,200000));
+
+ cout<<"%\n";
+ }
+
+ cout<<"Writer thread got subscriptions\n";
+
+ ::IOTest::PortDataWriter_var port_dw
+ = ::IOTest::PortDataWriter::_narrow(writer_.in());
+ if (CORBA::is_nil (port_dw.in ())) {
+ cerr << "Data Writer could not be narrowed"<< endl;
+ exit(1);
+ }
+
+ cout<<"Writer thread, DataWriter Cast ok\n";
+
+ struct timespec t_sleep;
+ t_sleep.tv_sec = 1;
+ t_sleep.tv_nsec = 500000;
+
+ IOTest::Port port_obj;
+ ::DDS::InstanceHandle_t handle = port_dw->_cxx_register (port_obj);
+
+ port_obj.no = 1;
+ port_obj.value = 0xff;
+
+ ACE_DEBUG((LM_DEBUG,
+ ACE_TEXT("%T (%P|%t) Writer::svc starting to write.\n")));
+ while(1) {
+ ::DDS::ReturnCode_t ret = port_dw->write(port_obj, handle);
+
+ if (ret != ::DDS::RETCODE_OK) {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT("(%P|%t)ERROR Writer::svc, ")
+ ACE_TEXT ("write() returned %d.\n"),
+ ret));
+ if (ret == ::DDS::RETCODE_TIMEOUT) {
+ timeout_writes_ ++;
+ }
+ cout<<"?\n";
+ }
+
+ if( port_obj.value == 0xff )
+ port_obj.value = 0x00;
+ else
+ port_obj.value = 0xff;
+
+ nanosleep(&t_sleep, NULL);
+
+ cout<<".\n";
+
+ }
+ } catch (CORBA::Exception& e) {
+ cerr << "Exception caught in svc:" << endl
+ << e << endl;
+ }
+
+ while (1)
+ {
+ writer_->get_matched_subscriptions(handles);
+ if (handles.length() == 0)
+ break;
+ else
+ ACE_OS::sleep(1);
+ cout<<"!\n";
+ }
+ ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::svc finished.\n")));
+
+ finished_instances_ ++;
+
+ return 0;
+}
+
+
+bool
+Writer::is_finished () const
+{
+ return finished_instances_ == num_instances_per_writer;
+}
+
+int
+Writer::get_timeout_writes () const
+{
+ return timeout_writes_.value ();
+}