Register Now for EDA Summit 2024 - Virtual EventRegister for Free

Transactions

2 Minute Read

This sample demonstrates how to acknowledge the receipt of a message and send a new message in a single atomic transaction.

Feature Overview

Transacted Sessions enable client applications to group multiple message send and/or receive operations together in single, atomic units known as local transactions. Each transacted Session can support a single series of transactions.

Messages that are published and received in a transaction are staged on the message broker. To complete all of the message send and receive operations in a transaction, the transaction must be committed. Alternatively, a transaction can also be rolled back so that all of its publish and receive operations are canceled. Once a transaction is completed, another transaction automatically begins.

This feature can be used to guarantee that a message is not removed from the message broker until a reply message has been sent.

Prerequisite

The Client Profile must be configured to allow transacted sessions.

NOTE: This is the default configuration in PubSub+ Cloud messaging services.

Code

The following code shows how to acknowledge the receipt of a message and send a reply message in a single atomic transaction, guaranteeing that the original message is not acknowledged until the reply message is received by the message broker.

First, create a transacted session.

protected JCSMPSession session = null;
//...
public TransactedSession txSession;
//...
txSession = session.createTransactedSession();

Next, create a producer and consumer from the transacted session.

public XMLMessageProducer producer;
public FlowReceiver receiver;
//...
ProducerFlowProperties prodFlowProps = new ProducerFlowProperties();
prodFlowProps.setWindowSize(100);
producer = txSession.createProducer(prodFlowProps, this);

ConsumerFlowProperties consFlowProps = new ConsumerFlowProperties();
consFlowProps.setEndpoint(queue);
consFlowProps.setStartState(true);
EndpointProperties endpointProps = new EndpointProperties();
endpointProps.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
receiver = txSession.createFlow(this, consFlowProps, endpointProps);

Now when receiving a message, send a message with the transacted session and commit the transaction.

Committing the transaction automatically acknowledges receipt of the message that was received on the transacted session.

public void onReceive(BytesXMLMessage message) {
    //...
    BytesXMLMessage reply = JCSMPFactory.onlyInstance().createMessage(BytesXMLMessage.class);
    reply.setDeliveryMode(DeliveryMode.PERSISTENT);
    reply.setSenderId("Replier");
    producer.send(reply, message.getReplyTo());

    // this commit will acknowledge the received message and
    // deliver the sent message.
    txSession.commit();
    //...
}

Learn More