1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
// -*- C++ -*-
//
// $Id: Writer.cpp 784 2007-05-30 19:26:41Z harriss $
#include "Writer.h"
#include "PortTypeSupportC.h"
#include <ace/OS_NS_unistd.h>
#include <ace/streams.h>
#include <time.h>
using namespace IOTest;
const int num_instances_per_writer = 1;
const int num_messages = 10;
Writer::Writer(::DDS::DataWriter_ptr writer)
: writer_ (::DDS::DataWriter::_duplicate (writer)),
finished_instances_ (0),
timeout_writes_ (0)
{
cout<<"Writer instanced\n";
}
void
Writer::start ()
{
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::start \n")));
// Lanuch num_instances_per_writer threads.
// Each thread writes one instance which uses the thread id as the
// key value.
if (activate (THR_NEW_LWP | THR_JOINABLE, num_instances_per_writer) == -1) {
cerr << "Writer::start(): activate failed" << endl;
exit(1);
}
}
void
Writer::end ()
{
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("(%P|%t) Writer::end \n")));
wait ();
}
int
Writer::svc ()
{
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("(%P|%t) Writer::svc begins.\n")));
::DDS::InstanceHandleSeq handles;
try {
cout<<"Writer thread waiting for subscription\n";
while (1)
{
writer_->get_matched_subscriptions(handles);
if (handles.length() > 0)
break;
else
ACE_OS::sleep(ACE_Time_Value(0,200000));
cout<<"%\n";
}
cout<<"Writer thread got subscriptions\n";
::IOTest::PortDataWriter_var port_dw
= ::IOTest::PortDataWriter::_narrow(writer_.in());
if (CORBA::is_nil (port_dw.in ())) {
cerr << "Data Writer could not be narrowed"<< endl;
exit(1);
}
cout<<"Writer thread, DataWriter Cast ok\n";
struct timespec t_sleep;
t_sleep.tv_sec = 0;
t_sleep.tv_nsec = 5000;
IOTest::Port port_obj;
::DDS::InstanceHandle_t handle = port_dw->_cxx_register (port_obj);
port_obj.no = 1;
ACE_DEBUG((LM_DEBUG,
ACE_TEXT("%T (%P|%t) Writer::svc starting to write.\n")));
int actval, lastval = 0;
while(1) {
actval = io.get();
if ( lastval != actval ){
lastval = actval;
port_obj.value = actval;
//std::cout<<"new: "<<lastval<<std::endl;
::DDS::ReturnCode_t ret = port_dw->write(port_obj, handle);
if (ret != ::DDS::RETCODE_OK)
ACE_ERROR ((LM_ERROR,
ACE_TEXT("(%P|%t)ERROR Writer::svc, ")
ACE_TEXT ("write() returned %d.\n"),
ret));
if (ret == ::DDS::RETCODE_TIMEOUT)
timeout_writes_ ++;
}
nanosleep(&t_sleep, NULL);
//cout<<".\n";
}
} catch (CORBA::Exception& e) {
cerr << "Exception caught in svc:" << endl
<< e << endl;
}
while (1)
{
writer_->get_matched_subscriptions(handles);
if (handles.length() == 0)
break;
else
ACE_OS::sleep(1);
cout<<"!\n";
}
ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Writer::svc finished.\n")));
finished_instances_ ++;
return 0;
}
bool
Writer::is_finished () const
{
return finished_instances_ == num_instances_per_writer;
}
int
Writer::get_timeout_writes () const
{
return timeout_writes_.value ();
}
|