// -*- C++ -*- // // $Id: Writer.cpp 784 2007-05-30 19:26:41Z harriss $ #include "Writer.h" #include "PortTypeSupportC.h" #include #include #include using namespace IOTest; const int num_instances_per_writer = 1; const int num_messages = 10; Writer::Writer(::DDS::DataWriter_ptr writer) : writer_ (::DDS::DataWriter::_duplicate (writer)), finished_instances_ (0), timeout_writes_ (0) { cout<<"Writer instanced\n"; } void Writer::start () { ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::start \n"))); // Lanuch num_instances_per_writer threads. // Each thread writes one instance which uses the thread id as the // key value. if (activate (THR_NEW_LWP | THR_JOINABLE, num_instances_per_writer) == -1) { cerr << "Writer::start(): activate failed" << endl; exit(1); } } void Writer::end () { ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::end \n"))); wait (); } int Writer::svc () { ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::svc begins.\n"))); ::DDS::InstanceHandleSeq handles; try { cout<<"Writer thread waiting for subscription\n"; while (1) { writer_->get_matched_subscriptions(handles); if (handles.length() > 0) break; else ACE_OS::sleep(ACE_Time_Value(0,200000)); cout<<"%\n"; } cout<<"Writer thread got subscriptions\n"; ::IOTest::PortDataWriter_var port_dw = ::IOTest::PortDataWriter::_narrow(writer_.in()); if (CORBA::is_nil (port_dw.in ())) { cerr << "Data Writer could not be narrowed"<< endl; exit(1); } cout<<"Writer thread, DataWriter Cast ok\n"; struct timespec t_sleep; t_sleep.tv_sec = 0; t_sleep.tv_nsec = 10000; IOTest::Port port_obj; ::DDS::InstanceHandle_t handle = port_dw->_cxx_register (port_obj); port_obj.no = 1; ACE_DEBUG((LM_DEBUG, ACE_TEXT("%T (%P|%t) Writer::svc starting to write.\n"))); int actval, lastval = 0; while(1) { actval = io.get(); if ( lastval != actval ){ lastval = actval; port_obj.value = actval; ::DDS::ReturnCode_t ret = port_dw->write(port_obj, handle); if (ret != ::DDS::RETCODE_OK) ACE_ERROR ((LM_ERROR, ACE_TEXT("(%P|%t)ERROR Writer::svc, ") ACE_TEXT ("write() returned %d.\n"), ret)); if (ret == ::DDS::RETCODE_TIMEOUT) timeout_writes_ ++; } nanosleep(&t_sleep, NULL); //cout<<".\n"; } } catch (CORBA::Exception& e) { cerr << "Exception caught in svc:" << endl << e << endl; } while (1) { writer_->get_matched_subscriptions(handles); if (handles.length() == 0) break; else ACE_OS::sleep(1); cout<<"!\n"; } ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::svc finished.\n"))); finished_instances_ ++; return 0; } bool Writer::is_finished () const { return finished_instances_ == num_instances_per_writer; } int Writer::get_timeout_writes () const { return timeout_writes_.value (); }