1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
// -*- C++ -*-
// ============================================================================
/**
* @file subscriber.cpp
*
* $Id: subscriber.cpp 899 2007-07-05 16:36:52Z mitza $
*
*
*/
// ============================================================================
#include "DataReaderListener.h"
#include "PortTypeSupportImpl.h"
#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/Marked_Default_Qos.h>
#include <dds/DCPS/SubscriberImpl.h>
#include <dds/DCPS/transport/framework/TheTransportFactory.h>
#include <dds/DCPS/transport/simpleTCP/SimpleTcpConfiguration.h>
#ifdef ACE_AS_STATIC_LIBS
#include <dds/DCPS/transport/simpleTCP/SimpleTcp.h>
#include <dds/DCPS/transport/simpleUnreliableDgram/SimpleUnreliableDgram.h>
#include <dds/DCPS/transport/ReliableMulticast/ReliableMulticast.h>
#endif
#include <ace/streams.h>
#include "ace/Get_Opt.h"
using namespace IOTest;
OpenDDS::DCPS::TransportIdType transport_impl_id = 1;
int
parse_args (int argc, char *argv[])
{
ACE_Get_Opt get_opts (argc, argv, "t:");
int c;
while ((c = get_opts ()) != -1)
{
switch (c)
{
case 't':
if (ACE_OS::strcmp (get_opts.opt_arg (), "udp") == 0) {
transport_impl_id = 2;
}
else if (ACE_OS::strcmp (get_opts.opt_arg (), "mcast") == 0) {
transport_impl_id = 3;
}
else if (ACE_OS::strcmp (get_opts.opt_arg (), "reliable_mcast") == 0) {
transport_impl_id = 4;
}
// test with DEFAULT_SIMPLE_TCP_ID.
else if (ACE_OS::strcmp (get_opts.opt_arg (), "default_tcp") == 0) {
transport_impl_id = OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID;
}
// test with DEFAULT_SIMPLE_UDP_ID.
else if (ACE_OS::strcmp (get_opts.opt_arg (), "default_udp") == 0) {
transport_impl_id = OpenDDS::DCPS::DEFAULT_SIMPLE_UDP_ID;
}
else if (ACE_OS::strcmp (get_opts.opt_arg (), "default_mcast_sub") == 0) {
transport_impl_id = OpenDDS::DCPS::DEFAULT_SIMPLE_MCAST_SUB_ID;
}
break;
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
"usage: %s "
"-t <tcp/udp/default> "
"\n",
argv [0]),
-1);
}
}
// Indicates sucessful parsing of the command line
return 0;
}
int main (int argc, char *argv[])
{
try
{
DDS::DomainParticipantFactory_var dpf;
DDS::DomainParticipant_var participant;
dpf = TheParticipantFactoryWithArgs(argc, argv);
if (CORBA::is_nil (dpf.in ())){
cerr << "get ParticipantFactory failed." << endl;
return 1;
}
participant = dpf->create_participant(411,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil());
if (CORBA::is_nil (participant.in ())) {
cerr << "create_participant failed." << endl;
return 1 ;
}
if (parse_args (argc, argv) == -1) {
return -1;
}
PortTypeSupport_var port = new PortTypeSupportImpl();
if (DDS::RETCODE_OK != port->register_type(participant.in (), "")) {
cerr << "Failed to register the PortTypeTypeSupport." << endl;
exit(1);
}
CORBA::String_var type_name = port->get_type_name ();
DDS::TopicQos topic_qos;
participant->get_default_topic_qos(topic_qos);
DDS::Topic_var topic = participant->create_topic("IOTest",
type_name.in (),
topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (topic.in ())) {
cerr << "Failed to create_topic." << endl;
exit(1);
}
// Initialize the transport
OpenDDS::DCPS::TransportImpl_rch transport_impl =
TheTransportFactory->create_transport_impl (transport_impl_id,
::OpenDDS::DCPS::AUTO_CONFIG);
// Create the subscriber and attach to the corresponding
// transport.
DDS::Subscriber_var sub =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil());
if (CORBA::is_nil (sub.in ())) {
cerr << "Failed to create_subscriber." << endl;
exit(1);
}
// Attach the subscriber to the transport.
OpenDDS::DCPS::SubscriberImpl* sub_impl =
OpenDDS::DCPS::reference_to_servant<OpenDDS::DCPS::SubscriberImpl> (sub.in ());
if (0 == sub_impl) {
cerr << "Failed to obtain subscriber servant\n" << endl;
exit(1);
}
OpenDDS::DCPS::AttachStatus status = sub_impl->attach_transport(transport_impl.in());
if (status != OpenDDS::DCPS::ATTACH_OK) {
std::string status_str;
switch (status) {
case OpenDDS::DCPS::ATTACH_BAD_TRANSPORT:
status_str = "ATTACH_BAD_TRANSPORT";
break;
case OpenDDS::DCPS::ATTACH_ERROR:
status_str = "ATTACH_ERROR";
break;
case OpenDDS::DCPS::ATTACH_INCOMPATIBLE_QOS:
status_str = "ATTACH_INCOMPATIBLE_QOS";
break;
default:
status_str = "Unknown Status";
break;
}
cerr << "Failed to attach to the transport. Status == "
<< status_str.c_str() << endl;
exit(1);
}
// activate the listener
DataReaderListenerImpl listener_servant;
DDS::DataReaderListener_var listener = ::OpenDDS::DCPS::servant_to_reference(&listener_servant);
if (CORBA::is_nil (listener.in ())) {
cerr << "listener is nil." << endl;
exit(1);
}
// Create the Datareaders
DDS::DataReaderQos dr_qos;
sub->get_default_datareader_qos (dr_qos);
DDS::DataReader_var dr = sub->create_datareader(topic.in (),
dr_qos,
listener.in ());
if (CORBA::is_nil (dr.in ())) {
cerr << "create_datareader failed." << endl;
exit(1);
}
while(1) {
ACE_OS::sleep (1);
}
if (!CORBA::is_nil (participant.in ())) {
participant->delete_contained_entities();
}
if (!CORBA::is_nil (dpf.in ())) {
dpf->delete_participant(participant.in ());
}
ACE_OS::sleep(2);
TheTransportFactory->release();
TheServiceParticipant->shutdown ();
}
catch (CORBA::Exception& e)
{
cerr << "SUB: Exception caught in main ():" << endl << e << endl;
return 1;
}
return 0;
}
|