Register Now for EDA Summit 2024 - Virtual EventRegister for Free

Persistence with Queues

6 Minute Read

This tutorial builds on the basic concepts introduced in the publish/subscribe tutorial, and will show you how to send and receive persistent messages from Solace messaging queue in a point to point fashion.

At the end, this tutorial walks through downloading and running the sample from source.

This tutorial focuses on using a non-Solace JMS API implementation. For using the Solace JMS API see Solace Getting Started JMS Tutorials.

Assumptions

This tutorial assumes the following:

  • You are familiar with Solace core concepts.
  • You have access to Solace messaging with the following configuration details:
    • Connectivity information for a Solace message-VPN configured for guaranteed messaging support
    • Enabled client username and password
    • Client-profile enabled with guaranteed messaging permissions.

One simple way to get access to Solace messaging quickly is to create a messaging service in Solace Cloud as outlined here. You can find other ways to get access to Solace messaging below.

Goals

The goal of this tutorial is to demonstrate how to use Apache Qpid JMS 1.1 over AMQP using Solace messaging. This tutorial will show you:

  1. How to send a persistent message to a durable queue with Solace messaging
  2. How to bind to this queue and receive a persistent message

Get Solace Messaging

This tutorial requires access Solace PubSub+ messaging and requires that you know several connectivity properties about your Solace messaging. Specifically you need to know the following:

Resources Value Description
Host String This is the address clients use when connecting to the PubSub+ messaging to send and receive messages. (Format: DNS_NAME:Port or IP:Port)
Message VPN String The PubSub+ message router Message VPN that this client should connect to.
Client Username String The client username. (See Notes below)
Client Password String The client password. (See Notes below)

There are several ways you can get access to PubSub+ Messaging and find these required properties.

Option 1: Use PubSub+ Cloud

  • Follow these instructions to quickly spin up a cloud-based PubSub+ messaging service for your applications.

  • The messaging connectivity information is found in the service details in the connectivity tab (shown below). You will need:

    • Host:Port (use the SMF URI)
    • Message VPN
    • Client Username
    • Client Password
Screenshot: Messaging Connectivity Information

Option 2: Start a PubSub+ Software

  • Follow these instructions to start the PubSub+ Software in leading Clouds, Container Platforms or Hypervisors. The tutorials outline where to download and how to install the PubSub+ Software.

  • The messaging connectivity information are the following:

    • Host: <public_ip> (IP address assigned to the VMR in tutorial instructions)

    • Message VPN: default

    • Client Username: sampleUser (can be any value)

    • Client Password: samplePassword (can be any value)

      Note: By default, the PubSub+ Software "default" message VPN has authentication disabled.

Option 3: Get access to a PubSub+ Appliance

  • Contact your PubSub+ appliance administrators and obtain the following:

    • A PubSub+ Message-VPN where you can produce and consume direct and persistent messages
    • The host name or IP address of the Solace appliance hosting your Message-VPN
    • A username and password to access the Solace appliance

Obtaining Apache Qpid JMS

This tutorial assumes you have downloaded and successfully installed the Apache Qpid JMS client. If your environment differs from the example, then adjust the build instructions appropriately.

The easiest way to install it is through Gradle or Maven.

Get the API: Using Gradle

dependencies {
    compile("org.apache.qpid:qpid-jms-client:1.6.0")
}

Get the API: Using Maven

<dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-jms-client</artifactId>
    <version>1.6.0</version>
</dependency>

Java Messaging Service (JMS) Introduction

JMS is a standard API for sending and receiving messages. As such, in addition to information provided on the Solace developer portal, you may also look at some external sources for more details about JMS. The following are good places to start

  1. https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html
  2. https://en.wikipedia.org/wiki/Java_Message_Service
  3. https://docs.oracle.com/javaee/7/tutorial/partmessaging.htm#GFIRP3

The last (Oracle docs) link points you to the JEE official tutorials which provide a good introduction to JMS.

This tutorial focuses on using JMS 1.1 (April 12, 2002), for JMS 2.0 (May 21, 2013) see Solace Getting Started AMQP JMS 2.0 Tutorials.

Connecting to Solace Messaging

In order to send or receive messages, an application must start a JMS connection.

For establishing the JMS connection you need to know the Solace messaging host name with the AMQP service port number, the client username and optional password.

QueueProducer.java/QueueConsumer.java


ConnectionFactory connectionFactory = new JmsConnectionFactory(solaceUsername, solacePassword, solaceHost);
Connection connection = connectionFactory.createConnection();

Created a non-transacted session. Use two different session acknowledge modes: one that automatically acknowledges a client's receipt of a message, and the other that requires the client acknowledge to call message.acknowledge() for that.

QueueProducer.java

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

QueueConsumer.java

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

At this point the application is connected to Solace messaging and ready to send and receive messages.

Sending a persistent message to a queue

In order to send a message to a queue a JMS MessageProducer needs to be created.

Diagram: Sending a Persistent Message to a Queue

There is no difference in the actual method calls to the JMS MessageProducer when sending a JMS persistent message as compared to a JMS non-persistent message shown in the publish/subscribe tutorial. The difference in the JMS persistent message is that Solace messaging will acknowledge the message once it is successfully stored by Solace messaging and the MessageProducer.send() call will not return until it has successfully received this acknowledgement. This means that in JMS, all calls to the MessageProducer.send() are blocking calls and they wait for message confirmation from Solace messaging before proceeding. This is outlined in the JMS 1.1 specification and Solace JMS adheres to this requirement.

The queue for sending messages will be created on the Solace router as a durable queue.

See Configuring Queues for details on how to configure durable queues on Solace Message Routers with Solace CLI.

See Management Tools for other tools for configure durable queues.

QueueProducer.java

final String QUEUE_NAME = "Q/tutorial";

Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(null);

Now send the message:

QueueProducer.java

TextMessage message = session.createTextMessage("Hello world Queues!");
messageProducer.send(queue, message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);

Receiving a persistent message from a queue

In order to receive a persistent message from a queue a JMS MessageConsumer needs to be created.

Diagram: Receiving a Persistent Message from a Queue

The name of the queue is the same as the one to which we send messages.

QueueConsumer.java

final String QUEUE_NAME = "Q/tutorial";

Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);

As in the publish/subscribe tutorial, we will be using the anonymous inner class for receiving messages asynchronously, with an addition of the message.acknowledge() call.

QueueConsumer.java

messageConsumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                System.out.printf("TextMessage received: '%s'%n", ((TextMessage) message).getText());
            } else {
                System.out.println("Message received.");
            }

            message.acknowledge();

            System.out.printf("Message Content:%n%s%n", message.toString());
            latch.countDown(); // unblock the main thread
        } catch (Exception ex) {
            System.out.println("Error processing incoming message.");
            ex.printStackTrace();
        }
    }
});
connection.start();
latch.await();

Summarizing

Combining the example source code shown above results in the following source code files:

Getting the Source

Clone the GitHub repository containing the Solace samples.

git clone https://github.com/SolaceSamples/solace-samples-amqp-qpid-jms1
cd solace-samples-amqp-qpid-jms1

Building

You can build and run both example files directly from Eclipse or with Gradle.

./gradlew assemble

The examples can be run as:

cd build/staged/bin
./queueConsumer amqp://<HOST:AMQP_PORT> <USERNAME> <PASSWORD>
./queueProducer amqp://<HOST:AMQP_PORT> <USERNAME> <PASSWORD>

Sample Output

First start the QueueConsumer so that it is up and waiting for messages.

$ queueConsumer amqp://<HOST:AMQP_PORT> <USERNAME> <PASSWORD>
QueueConsumer is connecting to Solace messaging at amqp://<HOST:AMQP_PORT>...
Awaiting message...

Then you can start the QueueProducer to send the message.

$ queueProducer amqp://<HOST:AMQP_PORT> <USERNAME> <PASSWORD>
QueueProducer is connecting to Solace messaging at amqp://<HOST:AMQP_PORT>...
Connected with username 'clientUsername'.
Sending message 'Hello world Queues!' to queue 'Q/tutorial'...
Sent successfully. Exiting...

Notice how the message is received by the QueueConsumer.

Awaiting message...
TextMessage received: 'Hello world Queues!'
Message Content:
JmsTextMessage { org.apache.qpid.jms.provider.amqp.message.AmqpJmsTextMessageFacade@529bd520 }

Now you know how to use Apache Qpid JMS 1.1 over AMQP using Solace messaging to send and receive persistent messages from a queue.