Publish/Subscribe

10 Minute Read

Assumptions

This tutorial assumes the following:

One simple way to get access to Solace messaging quickly is to create a messaging service in Solace Cloud as outlined here. You can find other ways to get access to Solace messaging below.

Goals

The goal of this tutorial is to demonstrate basic messaging interactions using OpenMAMA with the Solace middleware bridge. This tutorial’s samples are similar to the OpenMAMA’s mamapublisherc.c and mamasubscriberc.c, but with a distinct focus on configuring OpenMAMA with Solace messaging. See the Resources section below for some further links to other OpenMAMA tutorials and examples.

This tutorial will show you how to publish a message with one string field to a specific topic on Solace messaging using OpenMAMA C API.

This tutorial will show you how to use OpenMAMA C API:

  • to build and send a message on a topic
  • to subscribe to a topic and receive a message

Installation

Installation instructions for OpenMAMA can be found on OpenMAMA Wiki.

Simplified installation instructions for OpenMAMA with Solace middleware bridge are available.

For building OpenMAMA from source see OpenMAMA Wiki.

Get Solace Messaging

This tutorial requires access Solace PubSub+ messaging and requires that you know several connectivity properties about your Solace messaging. Specifically you need to know the following:

Resources Value Description
Host String This is the address clients use when connecting to the PubSub+ messaging to send and receive messages. (Format: DNS_NAME:Port or IP:Port)
Message VPN String The PubSub+ message router Message VPN that this client should connect to.
Client Username String The client username. (See Notes below)
Client Password String The client password. (See Notes below)

There are several ways you can get access to PubSub+ Messaging and find these required properties.

Option 1: Use PubSub+ Cloud

  • Follow these instructions to quickly spin up a cloud-based PubSub+ messaging service for your applications.

  • The messaging connectivity information is found in the service details in the connectivity tab (shown below). You will need:

    • Host:Port (use the SMF URI)
    • Message VPN
    • Client Username
    • Client Password
Screenshot: Messaging Connectivity Information

Option 2: Start a PubSub+ Software

  • Follow these instructions to start the PubSub+ Software in leading Clouds, Container Platforms or Hypervisors. The tutorials outline where to download and how to install the PubSub+ Software.

  • The messaging connectivity information are the following:

    • Host: <public_ip> (IP address assigned to the VMR in tutorial instructions)

    • Message VPN: default

    • Client Username: sampleUser (can be any value)

    • Client Password: samplePassword (can be any value)

      Note: By default, the PubSub+ Software "default" message VPN has authentication disabled.

Option 3: Get access to a PubSub+ Appliance

  • Contact your PubSub+ appliance administrators and obtain the following:

    • A PubSub+ Message-VPN where you can produce and consume direct and persistent messages
    • The host name or IP address of the Solace appliance hosting your Message-VPN
    • A username and password to access the Solace appliance

Example of specifying these properties see here and detailed explanation of them is in the Solace OpenMAMA “Hello World” tutorial.

Receiving Message

Before everything else, as you already know from the Solace OpenMAMA “Hello World” tutorial, we need to initialize the Solace middleware bridge and create a transport.

To learn more about how to initialize and configure Solace middleware bridge as an OpenMAMA transport see Solace OpenMAMA User Guide.

Now we can implement receiving a message.

First, let’s express interest in messages by subscribing to a topic. Then you can look at publishing a matching message and see it received.

In OpenMAMA subscribing to a topic means creating a subscription. When creating a subscription, we need to refer to a transport, a receiver and to an event queue.

You already know what the transport is.

For details of the event queue see OpenMAMA Developer’s Guide for C.

When multi-threading is not required it is recommended to use the default internal event queue as an event queue:

mamaQueue defaultQueue;
mama_getDefaultEventQueue(global.bridge, &defaultQueue))

The receiver is a data structure with function pointers that are invoked by OpenMAMA when certain events happen.

It has a type of mamaMsgCallbacks and it is expected to have, as a minimum, the following function pointers:

  • onCreate that is invoked when a subscription is created
  • onError that is invoked when an error occurs
  • onMsg that is invoked when a message arrives on the subscribed topic

This is how the routine for creating a subscription is implemented:

void subscribeToTopic(const char * topicName)
{
    global.subscription = NULL;
    mama_status status;
    if ((status = mamaSubscription_allocate(&global.subscription)) == MAMA_STATUS_OK)
    {
        // initialize functions called by MAMA on different subscription events
        memset(&global.receiver, 0, sizeof(global.receiver));
        global.receiver.onCreate = onCreate;
        global.receiver.onError = onError;
        global.receiver.onMsg = onMessage;

        mamaQueue defaultQueue;
        if (((status = mama_getDefaultEventQueue(global.bridge, &defaultQueue)) == MAMA_STATUS_OK) &&
            ((status = mamaSubscription_createBasic(global.subscription, global.transport,
                                                    defaultQueue,
                                                    &global.receiver,
                                                    topicName,
                                                    NULL)) == MAMA_STATUS_OK))
        {
            // normal exit
            return;
        }
    }
    // error exit
    printf("Error subscribing to topic %s, error code: %s\n", topicName,
            mamaStatus_stringForStatus(status));
    mama_close();
    exit(status);
}

Notice that global.receiver is assigned with pointers to functions we want to be invoked by OpenMAMA when corresponding events happen.

When a subscription is created, we want to see its topic name on the console:

void onCreate(mamaSubscription subscription, void * closure)
{
    const char * topicName = NULL;
    if (mamaSubscription_getSymbol(subscription, &topicName) == MAMA_STATUS_OK)
    {
        printf("Created subscription to topic \"%s\"\n", topicName);
    }
}

When an error occurs, we want to see on the console the error code:

void onError(mamaSubscription subscription, mama_status status,
        void * platformError, const char * subject, void * closure)
{
    const char * topicName = NULL;
    if (mamaSubscription_getSymbol(subscription, &topicName) == MAMA_STATUS_OK)
    printf("Error occured with subscription to topic \"%s\", error code: %s\n",
            topicName, mamaStatus_stringForStatus(status));
}

When a message arrives, we extract from the message the topic name and a custom string field we know was added to that message:

void onMessage(mamaSubscription subscription, mamaMsg message, void * closure, void * itemClosure)
{
    const char * topicName = NULL;
    if (mamaSubscription_getSymbol(subscription, &topicName) == MAMA_STATUS_OK)
    {
        printf("Message of type %s received on topic \"%s\"\n", mamaMsgType_stringForMsg(message), topicName);
    }
    // extract from the message our own custom field
    const char * const MY_FIELD_NAME = "MdMyTimestamp";
    const int MY_FIELD_ID = 99, BUFFER_SIZE = 32;
    char buffer[BUFFER_SIZE];
    if (mamaMsg_getFieldAsString(message, MY_FIELD_NAME, MY_FIELD_ID, buffer, BUFFER_SIZE)
            == MAMA_STATUS_OK)
    {
        printf("This message has a field \"%s\" with value: %s", MY_FIELD_NAME, buffer);
    }
}

Now we can start receiving messages:

mama_start(global.bridge);

This mama_start call blocks execution of the current thread until mama_stop is called, that is why we need to implement a handler for stopping our program by pressing Ctrl-C from console.

void stopHandler(int value)
{
    signal(value, SIG_IGN);
    printf(" Do you want to stop the program? [y/n]: ");
    char answer = getchar();
    if (answer == 'y')
        stopAll();
    else
        signal(SIGINT, stopHandler);
    getchar();
}

The stopAll routine has destroy calls for the created transport and subscription.

Order of calls in that routine is very important and the very first one must be mama_stop:

void stopAll()
{
    mama_stop(global.bridge);
    if (global.subscription)
    {
        mamaSubscription_destroy(global.subscription);
        global.subscription = NULL;
    }
    if (global.transport)
    {
        mamaTransport_destroy(global.transport);
        global.transport = NULL;
    }
}

Sending Message

As usual, before everything else, we need to initialize the Solace middleware bridge and create a transport.

Now we can implement sending a message, and as you already know from the Solace OpenMAMA “Hello World” tutorial, to publish a message we need to create a publisher.

A publisher is created for a specific topic and refers to already created transport:

global.publisher = NULL;
mamaPublisher_create(&global.publisher, global.transport, publishTopicName, NULL, NULL)

We want our program to publish messages periodically, until we stop it (by pressing Ctrl-C from console).

OpenMAMA provides a very convenient mechanism for a periodic event in a forms of mamaTimer. Such timers created with a reference to an event queue and we can use the default internal event queue for it:

global.publishTimer = NULL;
mamaTimer_create(&global.publishTimer, defaultQueue, timerCallback, intervalSeconds, NULL);

The timerCallback parameter is a pointer to a function that will be invoked by OpenMAMA when the timer expires.

We’re going to implement this function to send one message with three different fields, one of them is a custom field with the message timestamp as a string:

void timerCallback(mamaTimer timer, void* closure)
{
    // generate a timestamp as one of the message fields
    char buffer[32];
    time_t currTime = time(NULL);
    sprintf(buffer, "%s", asctime(localtime(&currTime)));
    // create message and add three fields to it
    mamaMsg message = NULL;
    mama_status status;
    if (((status = mamaMsg_create(&message)) == MAMA_STATUS_OK) &&
        ((status = mamaMsg_addI32(message, MamaFieldMsgType.mName, MamaFieldMsgType.mFid,
                                  MAMA_MSG_TYPE_INITIAL)) == MAMA_STATUS_OK) &&
        ((status = mamaMsg_addI32(message, MamaFieldMsgStatus.mName, MamaFieldMsgStatus.mFid,
                                  MAMA_MSG_STATUS_OK)) == MAMA_STATUS_OK) &&
        ((status = mamaMsg_addString(message, "MdMyTimestamp", 99,
                                     buffer)) == MAMA_STATUS_OK))
    {
        if ((status = mamaPublisher_send(global.publisher, message)) == MAMA_STATUS_OK)
        {
            printf("Message published: %s", buffer);
            mamaMsg_destroy(message);
            // normal exit
            return;
        }
    }
    // error exit
    printf("Error publishing message: %s\n", mamaStatus_stringForStatus(status));
}

Now we can start sending messages:

mama_start(global.bridge);

Because mama_start call blocks execution of the current thread until mama_stop is called, we’re going to use the handler mechanism for stopping our program by pressing Ctrl-C from console.

That handler calls stopAll. As you already know, order of calls in stopAll routine is very important and the very first one must be mama_stop:

void stopAll()
{
    mama_stop(global.bridge);
    // order is important
    if (global.publishTimer)
    {
        mamaTimer_destroy(global.publishTimer);
        global.publishTimer = NULL;
    }
    if (global.publisher)
    {
        mamaPublisher_destroy(global.publisher);
        global.publisher = NULL;
    }
    if (global.transport)
    {
        mamaTransport_destroy(global.transport);
        global.transport = NULL;
    }
}

Summarizing

Our application consists of two executables: topicSubscriber and topicPublisher.

If you combine the example source code shown above and split them into the two mentioned executables, it results in the source that is available for download:

Building

To build on Linux, assuming OpenMAMA installed into /opt/openmama:

$ gcc -o topicSubscriber topicSubscriber.c -I/opt/openmama/include -L/opt/openmama/lib -lmama
$ gcc -o topicPublisher topicPublisher.c -I/opt/openmama/include -L/opt/openmama/lib -lmama

Running the Application

First, start the topicSubscriber:

$ ./topicSubscriber
Solace OpenMAMA tutorial.
Receiving messages with OpenMAMA.
2016-07-14 14:22:38:
********************************************************************************
Note: This build of the MAMA API is not enforcing entitlement checks.
Please see the Licensing file for details
**********************************************************************************
2016-07-14 14:22:38: mamaTransport_create(): No entitlement bridge specified for transport vmr. Defaulting to noop.
Created subscription to topic "tutorial.topic"

Now in a separate console start the topicPublisher that begins publishing messages periodically:

$ ./topicPublisher
Solace OpenMAMA tutorial.
Publishing messages with OpenMAMA.
2016-07-14 14:22:48:
********************************************************************************
Note: This build of the MAMA API is not enforcing entitlement checks.
Please see the Licensing file for details
**********************************************************************************
2016-07-14 14:22:48: mamaTransport_create(): No entitlement bridge specified for transport vmr. Defaulting to noop.
Message published: Thu Jul 14 14:22:51 2016
Message published: Thu Jul 14 14:22:54 2016
Message published: Thu Jul 14 14:22:57 2016
Message published: Thu Jul 14 14:23:00 2016

You will see the topicSubscriber receiving messages and logging them to the connsole:

$ ./topicSubscriber
Solace OpenMAMA tutorial.
Receiving messages with OpenMAMA.
2016-07-14 14:22:38:
********************************************************************************
Note: This build of the MAMA API is not enforcing entitlement checks.
Please see the Licensing file for details
**********************************************************************************
2016-07-14 14:22:38: mamaTransport_create(): No entitlement bridge specified for transport vmr. Defaulting to noop.
Created subscription to topic "tutorial.topic"
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:22:51 2016
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:22:54 2016
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:22:57 2016
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:23:00 2016

To stop the application press Ctrl-C and then y on each console. First, stop the publisher:

$ ./topicPublisher
Solace OpenMAMA tutorial.
Publishing messages with OpenMAMA.
2016-07-14 14:22:48: Failed to open properties file.

2016-07-14 14:22:48:
********************************************************************************
Note: This build of the MAMA API is not enforcing entitlement checks.
Please see the Licensing file for details
**********************************************************************************
2016-07-14 14:22:48: mamaTransport_create(): No entitlement bridge specified for transport vmr. Defaulting to noop.
Message published: Thu Jul 14 14:22:51 2016
Message published: Thu Jul 14 14:22:54 2016
Message published: Thu Jul 14 14:22:57 2016
Message published: Thu Jul 14 14:23:00 2016
^C Do you want to stop the program? [y/n]: y
Closing Solace middleware bridge.

Now we can stop the subscriber:

$ ./topicSubscriber
Solace OpenMAMA tutorial.
Receiving messages with OpenMAMA.
2016-07-14 14:22:38:
********************************************************************************
Note: This build of the MAMA API is not enforcing entitlement checks.
Please see the Licensing file for details
**********************************************************************************
2016-07-14 14:22:38: mamaTransport_create(): No entitlement bridge specified for transport vmr. Defaulting to noop.
Created subscription to topic "tutorial.topic"
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:22:51 2016
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:22:54 2016
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:22:57 2016
Message of type INITIAL received on topic "tutorial.topic"
This message has a field "MdMyTimestamp" with value: Thu Jul 14 14:23:00 2016
^C Do you want to stop the program? [y/n]: y
Closing Solace middleware bridge.

Congratulations! You have now successfully subscribed to a topic and exchanged messages using this topic on using OpenMAMA with the Solace middleware bridge.

Resources

For more information about OpenMAMA:

For more information about Solace technology:

Other tutorials and related links: