From 1ae6ba8834b2684ac1895d76440bc61a6dcdb523 Mon Sep 17 00:00:00 2001 From: guest Date: Fri, 5 Oct 2007 23:16:38 +0000 Subject: Initial import. git-svn-id: svn+ssh://mecka.net/home/svn/dds@2 c30cbac5-9f56-4f76-8ed5-5c34e48a65ae --- dds_io_sub/dds_io_sub.cpp | 215 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 dds_io_sub/dds_io_sub.cpp (limited to 'dds_io_sub/dds_io_sub.cpp') diff --git a/dds_io_sub/dds_io_sub.cpp b/dds_io_sub/dds_io_sub.cpp new file mode 100644 index 0000000..6cdaabd --- /dev/null +++ b/dds_io_sub/dds_io_sub.cpp @@ -0,0 +1,215 @@ +// -*- C++ -*- +// ============================================================================ +/** + * @file subscriber.cpp + * + * $Id: subscriber.cpp 899 2007-07-05 16:36:52Z mitza $ + * + * + */ +// ============================================================================ + + +#include "DataReaderListener.h" +#include "PortTypeSupportImpl.h" +#include +#include +#include +#include +#include +#ifdef ACE_AS_STATIC_LIBS +#include +#include +#include +#endif + +#include +#include "ace/Get_Opt.h" + +using namespace IOTest; + +OpenDDS::DCPS::TransportIdType transport_impl_id = 1; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "t:"); + int c; + + while ((c = get_opts ()) != -1) + { + switch (c) + { + case 't': + if (ACE_OS::strcmp (get_opts.opt_arg (), "udp") == 0) { + transport_impl_id = 2; + } + else if (ACE_OS::strcmp (get_opts.opt_arg (), "mcast") == 0) { + transport_impl_id = 3; + } + else if (ACE_OS::strcmp (get_opts.opt_arg (), "reliable_mcast") == 0) { + transport_impl_id = 4; + } + // test with DEFAULT_SIMPLE_TCP_ID. + else if (ACE_OS::strcmp (get_opts.opt_arg (), "default_tcp") == 0) { + transport_impl_id = OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID; + } + // test with DEFAULT_SIMPLE_UDP_ID. + else if (ACE_OS::strcmp (get_opts.opt_arg (), "default_udp") == 0) { + transport_impl_id = OpenDDS::DCPS::DEFAULT_SIMPLE_UDP_ID; + } + else if (ACE_OS::strcmp (get_opts.opt_arg (), "default_mcast_sub") == 0) { + transport_impl_id = OpenDDS::DCPS::DEFAULT_SIMPLE_MCAST_SUB_ID; + } + break; + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-t " + "\n", + argv [0]), + -1); + } + } + // Indicates sucessful parsing of the command line + return 0; +} + + +int main (int argc, char *argv[]) +{ + try + { + DDS::DomainParticipantFactory_var dpf; + DDS::DomainParticipant_var participant; + + dpf = TheParticipantFactoryWithArgs(argc, argv); + if (CORBA::is_nil (dpf.in ())){ + cerr << "get ParticipantFactory failed." << endl; + return 1; + } + + participant = dpf->create_participant(411, + PARTICIPANT_QOS_DEFAULT, + DDS::DomainParticipantListener::_nil()); + if (CORBA::is_nil (participant.in ())) { + cerr << "create_participant failed." << endl; + return 1 ; + } + + if (parse_args (argc, argv) == -1) { + return -1; + } + + PortTypeSupport_var port = new PortTypeSupportImpl(); + + if (DDS::RETCODE_OK != port->register_type(participant.in (), "")) { + cerr << "Failed to register the PortTypeTypeSupport." << endl; + exit(1); + } + + CORBA::String_var type_name = port->get_type_name (); + + DDS::TopicQos topic_qos; + participant->get_default_topic_qos(topic_qos); + DDS::Topic_var topic = participant->create_topic("IOTest", + type_name.in (), + topic_qos, + DDS::TopicListener::_nil()); + if (CORBA::is_nil (topic.in ())) { + cerr << "Failed to create_topic." << endl; + exit(1); + } + + // Initialize the transport + OpenDDS::DCPS::TransportImpl_rch transport_impl = + TheTransportFactory->create_transport_impl (transport_impl_id, + ::OpenDDS::DCPS::AUTO_CONFIG); + + // Create the subscriber and attach to the corresponding + // transport. + DDS::Subscriber_var sub = + participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT, + DDS::SubscriberListener::_nil()); + if (CORBA::is_nil (sub.in ())) { + cerr << "Failed to create_subscriber." << endl; + exit(1); + } + + // Attach the subscriber to the transport. + OpenDDS::DCPS::SubscriberImpl* sub_impl = + OpenDDS::DCPS::reference_to_servant (sub.in ()); + if (0 == sub_impl) { + cerr << "Failed to obtain subscriber servant\n" << endl; + exit(1); + } + + OpenDDS::DCPS::AttachStatus status = sub_impl->attach_transport(transport_impl.in()); + if (status != OpenDDS::DCPS::ATTACH_OK) { + std::string status_str; + switch (status) { + case OpenDDS::DCPS::ATTACH_BAD_TRANSPORT: + status_str = "ATTACH_BAD_TRANSPORT"; + break; + case OpenDDS::DCPS::ATTACH_ERROR: + status_str = "ATTACH_ERROR"; + break; + case OpenDDS::DCPS::ATTACH_INCOMPATIBLE_QOS: + status_str = "ATTACH_INCOMPATIBLE_QOS"; + break; + default: + status_str = "Unknown Status"; + break; + } + cerr << "Failed to attach to the transport. Status == " + << status_str.c_str() << endl; + exit(1); + } + + // activate the listener + DataReaderListenerImpl listener_servant; + DDS::DataReaderListener_var listener = ::OpenDDS::DCPS::servant_to_reference(&listener_servant); + + if (CORBA::is_nil (listener.in ())) { + cerr << "listener is nil." << endl; + exit(1); + } + + // Create the Datareaders + DDS::DataReaderQos dr_qos; + sub->get_default_datareader_qos (dr_qos); + DDS::DataReader_var dr = sub->create_datareader(topic.in (), + dr_qos, + listener.in ()); + if (CORBA::is_nil (dr.in ())) { + cerr << "create_datareader failed." << endl; + exit(1); + } + + + int expected = 10; + while ( listener_servant.num_reads() < expected) { + ACE_OS::sleep (1); + } + + if (!CORBA::is_nil (participant.in ())) { + participant->delete_contained_entities(); + } + if (!CORBA::is_nil (dpf.in ())) { + dpf->delete_participant(participant.in ()); + } + ACE_OS::sleep(2); + + TheTransportFactory->release(); + TheServiceParticipant->shutdown (); + + } + catch (CORBA::Exception& e) + { + cerr << "SUB: Exception caught in main ():" << endl << e << endl; + return 1; + } + + return 0; +} -- cgit v1.2.3