Publish/Subscribe

6 Minute Read

This tutorial will introduce you to the fundamentals of connecting an MQTT client to Solace messaging by illustrating how to add a topic subscription and send a message matching this topic subscription. This forms the basis for any publish / subscribe message exchange.

Assumptions

This tutorial assumes the following:

  • You are familiar with Solace core concepts.
  • You have access to Solace messaging with the following configuration details:
    • Connectivity information for a Solace message-VPN
    • Enabled client username and password
    • Enabled MQTT services

One simple way to get access to Solace messaging quickly is to create a messaging service in Solace Cloud as outlined here. You can find other ways to get access to Solace messaging below.

Goals

The goal of this tutorial is to demonstrate the MQTT messaging interaction using Solace. This tutorial will show you:

  • How to build and send a message on a topic
  • How to subscribe to a topic and receive a message

MQ Telemetry Transport (MQTT) Introduction

MQTT is a standard lightweight protocol for sending and receiving messages. As such, in addition to information provided on the Solace developer portal, you may also look at some external sources for more details about MQTT. The following are good places to start

  1. http://mqtt.org/
  2. https://www.eclipse.org/paho/

Get Solace Messaging

This tutorial requires access Solace PubSub+ messaging and requires that you know several connectivity properties about your Solace messaging. Specifically you need to know the following:

Resources Value Description
Host String This is the address clients use when connecting to the PubSub+ messaging to send and receive messages. (Format: DNS_NAME:Port or IP:Port)
Message VPN String The PubSub+ message router Message VPN that this client should connect to.
Client Username String The client username. (See Notes below)
Client Password String The client password. (See Notes below)

There are several ways you can get access to PubSub+ Messaging and find these required properties.

Option 1: Use PubSub+ Cloud

  • Follow these instructions to quickly spin up a cloud-based PubSub+ messaging service for your applications.

  • The messaging connectivity information is found in the service details in the connectivity tab (shown below). You will need:

    • Host:Port (use the SMF URI)
    • Message VPN
    • Client Username
    • Client Password
Screenshot: Messaging Connectivity Information

Option 2: Start a PubSub+ Software

  • Follow these instructions to start the PubSub+ Software in leading Clouds, Container Platforms or Hypervisors. The tutorials outline where to download and how to install the PubSub+ Software.

  • The messaging connectivity information are the following:

    • Host: <public_ip> (IP address assigned to the VMR in tutorial instructions)

    • Message VPN: default

    • Client Username: sampleUser (can be any value)

    • Client Password: samplePassword (can be any value)

      Note: By default, the PubSub+ Software "default" message VPN has authentication disabled.

Option 3: Get access to a PubSub+ Appliance

  • Contact your PubSub+ appliance administrators and obtain the following:

    • A PubSub+ Message-VPN where you can produce and consume direct and persistent messages
    • The host name or IP address of the Solace appliance hosting your Message-VPN
    • A username and password to access the Solace appliance

Obtaining an MQTT Client Library

Although, you can use any MQTT Client library of your choice to connect to Solace, this tutorial uses the Paho Java Client library. Here are a few easy ways to get the Paho API. The instructions in the Building section assume you're using Gradle and pulling the jars from maven central. If your environment differs then adjust the build instructions appropriately.

Get the API: Using Gradle

    compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0")

Get the API: Using Maven

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.1.0</version>
</dependency>

Connecting to the Solace Messaging

In order to send or receive messages, a MQTT client must connect a session. The MQTT session is the basis for all client communication with Solace messaging.

In the Paho Java client library, MQTT sessions are created from the MqttClient class using a set of properties:

MqttClient mqttClient = new MqttClient(host, "HelloWorldSub");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
mqttClient.connect(connOpts);

A MQTT client can maintain state information between sessions. The state information is used to ensure delivery and receipt of messages, and include subscriptions created by an MQTT client. This tutorial sets the "clean session" flag to true, via the MqttConnectOptions class, to clear the state information after each session disconnect.

At this point your client is connected to the Solace messaging. You can use PubSub+ Manager to view the client connection and related details.

Receive a message

This tutorial uses Quality of Service (QoS) level of 0 (equivalent to Solace "Direct" messages), which are at most once delivery messages. So first, let's express interest in the messages by subscribing to a topic filter. Then you can look at publishing a matching message and see it received.

With a session connected in the previous step, the next step is to use the MQTT client to subscribe to a topic filter to receive messages. A topic filter in MQTT differs from a Solace SMF topic subscription. Users can learn more about the differences between the two in the Topic Names and Topic Filters section of MQTT Specification Conformance - Operational Behavior.

Messages are received asynchronously through callbacks. These callbacks are defined in MQTT by the MqttCallback interface.

mqttClient.setCallback(new MqttCallback() {

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String time = new Timestamp(System.currentTimeMillis()).toString();
        System.out.println("\nReceived a Message!" +
            "\n\tTime:    " + time +
            "\n\tTopic:   " + topic +
            "\n\tMessage: " + new String(message.getPayload()) +
            "\n\tQoS:     " + message.getQos() + "\n");
        latch.countDown(); // unblock main thread
    }

    public void connectionLost(Throwable cause) {
        System.out.println("Connection to Solace broker lost!" + cause.getMessage());
        latch.countDown();
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }

});

The message consumer code uses a countdown latch in this hello world example to block the consumer thread until a single message has been received.

Then you must subscribe to a topic filter in order to express interest in receiving messages. This tutorial uses the topic "T/GettingStarted/pubsub" and QoS level of 0 for at most once delivery.

mqttClient.subscribe("T/GettingStarted/pubsub", 0);

Then after the subscription is added, the consumer is started. At this point the consumer is ready to receive messages.

try {
    latch.await(); // block here until message received, and latch will flip
} catch (InterruptedException e) {
    System.out.println("I was awoken while waiting");
}

Sending a message

Now it is time to send a message to the waiting consumer.

Diagram: Sending a Message

To send a message, you must create a message using the MqttMessage class and set the QoS level. This tutorial will send a message to topic "T/GettingStarted/pubsub" with contents "Hello world from MQTT!" and a QoS level of 0, which are at most once delivery messages. We then use the MQTT client created earlier to publish the message

MqttMessage message = new MqttMessage("Hello world from MQTT!".getBytes());
message.setQos(0);
mqttClient.publish("T/GettingStarted/pubsub", message);

At this point the producer has sent a message to the Solace messaging and your waiting consumer will have received the message and printed its contents to the screen.

Summarizing

The full source code for this example is available on GitHub. If you combine the example source code shown above results in the following source:

Getting the Source

Clone the GitHub repository containing the Solace samples.

git clone https://github.com/SolaceSamples/solace-samples-mqtt
cd solace-samples-mqtt

Building

The project uses Gradle. To build, execute the following command.

./gradlew build

This builds all of the Java Samples with OS specific launch scripts. The files are staged in the build/staged directory.

Running the Sample

If you start the topicSubscriber with arguments specifying your Solace messaging connection details, it will connect and wait for a message.

$ ./build/staged/bin/topicSubscriber <host:port> <client-username> <client-password>
TopicSubscriber initializing...
Connecting to Solace messaging at <host:port>
Connected
Subscribing client to topic: T/GettingStarted/pubsub

Then you can send a message using the topicPublisher with the same arguments. If successful, the output for the producer will look like the following:

$ ./build/staged/bin/topicPublisher <host:port> <client-username> <client-password>
opicPublisher initializing...
Connecting to Solace messaging at <host:port>
Connected
Publishing message: Hello world from MQTT!
Message published. Exiting

With the message delivered the subscriber output will look like the following:

Received a Message!
        Time:    2015-10-16 17:09:45.379
        Topic:   T/GettingStarted/pubsub
        Message: Hello world from MQTT!
QoS:     0

Exiting

The received message is printed to the screen. The message contents were "Hello world from MQTT!" as expected and the output contains extra information about the Solace message that was received.

You have now successfully connected a MQTT client, subscribed to a topic and exchanged messages using this topic.