High Quality Enterprise Messaging.

AMQP 1.0 Client

Introduction

The SwiftMQ AMQP 1.0 Java Client is an implementation of the AMQP 1.0 specification in Java and can be used to connect to any AMQP 1.0 capable endpoints such as a SwiftMQ Router. The license (included in the client distribution) permits to use the SwiftMQ AMQP 1.0 Java Client with brokers from other vendors.

The client API is similar to JMS but easier to handle, thread-safe if not specified otherwise and consists only of a few classes:

Class Package Description
AMQPMessage com.swiftmq.amqp.v100.messaging Represents an AMQP message.
QoS com.swiftmq.amqp.v100.client Constants representing the different quality of service modes.
AMQPContext com.swiftmq.amqp Represents the context of the client.
Connection com.swiftmq.amqp.v100.client Connection to an AMQP endpoint.
ExceptionListener com.swiftmq.amqp.v100.client Can be registered at a Connection to listen for disconnects.
Session com.swiftmq.amqp.v100.client Grouping context for Links.
Producer com.swiftmq.amqp.v100.client Outgoing (sending) Link.
Consumer com.swiftmq.amqp.v100.client Incoming (receiving) Link.
DurableConsumer com.swiftmq.amqp.v100.client Incoming (receiving) Durable Link.
MessageAvailabilityListener com.swiftmq.amqp.v100.client Used for non-blocking receives.
DeliveryMemory com.swiftmq.amqp.v100.client Used for settlement and link recovery.
TransactionController com.swiftmq.amqp.v100.client Can control multiple transactions per Session.

javadoc, Specification and Samples

Other classes referenced from the above are part of SwiftMQ's AMQP library (amqp.jar) which is almost directly generated from the specification's XML files. Look HERE to view the complete javadoc.

The AMQP 1.0 specification can be found HERE.

There are also several example programs provided under the SwiftMQ distribution's directory "samples/amqp". Please have a look!

Debugging

The following system properties can be set at the client to enable debugging. All output goes to System.out.

System Property Description
swiftmq.amqp.frame.debug Shows all sent and received AMQP frames.
swiftmq.amqp.debug Shows quite verbose debug output of the SwiftMQ AMQP library.

Example:

-Dswiftmq.amqp.frame.debug=true -Dswiftmq.amqp.debug=true

AMQPMessage

The AMQP specification does not define a message type. It only specified sections that can make a message. AMQPMessage is SwiftMQ's custom type of a message that handles access to the sections. With the exception of "accept()" and "reject()" an AMQPMessage is not thread-safe.

The following fields are overwritten from the Producer during send and therefore MUST not be set from the application:

Section Field
Header Durable
Header Priority
Header Ttl
Properties MessageId
Properties To
Properties UserId

QoS

For convenience the client API pre-defines 3 modes of quality of service (QoS):

Mode Description
AT_MOST_ONCE Messages are pre-settled at the sender endpoint. Messages may be lost.
AT_LEAST_ONCE Messages are received and settled at the receiver without waiting for the sender to settle. Duplicates may occur.
EXACTLY_ONCE Messages are (1) received, (2) the sender settles and then (3) the receiver settles. Messages are delivered once and only once.

AMQPContext

The client requires thread pool and tracing facilities from its environment. This is provided from the AMQPContext class which has 2 modes, CLIENT and ROUTER. Mode ROUTER is for internal use only so the context always needs to be created with mode CLIENT:

        AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);

Connection

Without SASL

A Connection without SASL authentication is created by setting the "doAuth" parameter to false:

        Connection connection = new Connection(ctx, host, port, false);

With SASL

A Connection with SASL authentication is created by setting the "doAuth" parameter to true. If "username" and "password" are not specified, connect takes place via the SASL mechanism ANONYMOUS:

        Connection connection = new Connection(ctx, host, port, true);

Otherwise as the specified user with SASL mechanism PLAIN by default:

        Connection connection = new Connection(ctx, host, port, true, username, password);

With SASL EXTERNAL

Here is a nice writeup by Jakub Scholz about using SSL and SASL EXTERNAL authentication with the Apache Qpid C++ Broker.

Connection Configuration and actual connect

The Connection is in unconnected state after construction and can now be configured. For example, the maximum frame size and an exception listener can be set or the SASL mechanism to use can be specified. The SASL mechanism depends on the provision of the remote endpoint. SwiftMQ provides PLAIN, ANONYMOUS and the platform mechanisms CRAM-MD5 and Digest-MD5. For use of ANONYMOUS mechanism please use the resp. constructor of the connection.

        connection.setMechanism("CRAM-MD5");
        connection.setMaxFrameSize(1024);
        connection.setIdleTimeout(120000); // Sends and expects heart beat frames
        connection.setExceptionListener(this);

There are 2 socket factories that can be used to create the TCP connections: "com.swiftmq.net.PlainSocketFactory" (default) and "com.swiftmq.net.JSSESocketFactory". The latter must be specified to connect via SSL/TLS:

        connection.setSocketFactory(new JSSESocketFactory());

To use SSL certificates, look HERE. For HTTP tunneling, look HERE.

Finally, do the connect:

        connection.connect();

Session

A Session is created from a Connection. It multiplexes traffic between link endpoints and can handle non-transacted and transacted transfers with multiple active transactions simultaneously. It maintains incoming and outgoing windows of unsettled transfers (depending on the maximum frame size, a message can be splitted over multiple transfers). Both needs to be specified when creating the Session:

        Session session = connection.createSession(100, 100);

Producer

A Producer is created from a Session on a target (queue or topic) with a specific QoS. The whole settlement interaction will be done internally so there is nothing more to do than to send messages. The transfer itself is asynchronously as well as the settlement. Only the close() of the Producer is done synchronously and ensures that all settlement has been done for the previously sent messages when the close()-Method returns.

If the connection is dropped before close() can be called, unsettled deliveries can be recovered. See last section "Link Recovery" on the end of this page.

        Producer p = session.createProducer("orderqueue@router4", QoS.AT_LEAST_ONCE);
        ...
        p.send(msg); // Always asynchronously
        ...
        p.close(); // Settlement completed after this call returns

Consumer

A Consumer is created from a Session on a source (queue or topic) with a specific QoS. Each Consumer has a client-side message cache which is dimensioned by the link credit, a value that is passed to the source.

On a non-temporary Source

A Consumer on a non-temporary source (regular queue or topic) is created by specifying the name of the source, the link credit (consumer cache size), the QoS, a "NoLocal" flag and an optional message selector. If the "NoLocal" flag is set to true (default) then messages sent from the same connection on a topic are not received. This is the case when a client sends and receives on the same topic but don't want to receive its own messages. The message selector is a string which the connected message broker must understand. For SwiftMQ this is a JMS message selector.

        Consumer c = session.createConsumer("orderqueue", 200, QoS.EXACTLY_ONCE,
                                            true, "orderid between 0 and 100");

On a temporary Sources

A Consumer on a temporary source (temporary queue, e.g. for request/reply) is created like above but without specifying a source parameter. This automatically creates the temporary source with a lifetime of that of this link and attaches the Consumer to it. The address of the temporary source can be obtained by calling "getRemoteAddress()":

        Consumer c = session.createConsumer(200, QoS.AT_MOST_ONCE);
        AddressIF remoteAddress = c.getRemoteAddress(); // Use it for request/reply

Non-blocking Receive

To consume messages, the various "receive()" methods of a consumer can be called. There is no message listener like in JMS but a way to perform non-blocking receives. This is done by calling "receiveNoWait(listener)" and passing a MessageAvailabilityListener as a parameter. If there is no message available at the time of the "receiveNoWait(listener)" call, the MessageAvailabilityListener is stored and called later when a message is available:

      // Implementation of MessageAvailabilityListener
      // Notifies a Poller thread to call poll()
      public void messageAvailable(Consumer consumer)
      {
        poller.enqueue(this);
      }

      // Called from the Poller thread
      public void poll()
      {
        AMQPMessage msg = c.receiveNoWait(this);
        if (msg != null)
        {
          // process message
        }
      }

Settlement

Settlement on the Consumer side needs to be done by the application and can be done in 2 ways. If a message is unsettled, it can be accpeted or rejected. The latter will lead to a redelivery of the message. Note that AMQP settles each single message (and not streams of messages like in JMS). Settlement can be done by different threads so it's completely legal to call "receive()" by different threads and process them in parallel, e.g. by using worker thread pools. The order of settlement is not relevant, that is, message #3 can be settled before message #1.

Settlement is done asynchronously. Only the close() of the Consumer is done synchronously and ensures that all settlement has been done for the previously consumed messages when the close()-Method returns. If the connection is dropped before close() can be called, unsettled deliveries can be recovered. See last section "Link Recovery" on the end of this page.

        AMQPMessage msg = c.receive();
        if (msg != null) // null is returned if the consumer was closed
        {
          if (!msg.isSettled())
          {
            if (processed(msg))
              msg.accept();
            else
              msg.reject();
          }
        }

DurableConsumer

A DurableConsumer is created from a Session on a topic source with a specific QoS. The durable link is identified by a link name. If the link does not exists, it will be created with a terminus expiry policy of NEVER and a terminus durability of CONFIGURATION. If it exists, it will be attached to the DurableConsumer. The durable link will remain in place after detaching the DurableConsumer so messages are received while the DurableConsumer is disconnected and delivered to it once the DurableConsumer reconnects. A call to "unsubscribe()" will set the terminus expiry policy to LINK_DETACH and the durable link will be deleted.

If connected to a SwiftMQ Router, a DurableConsumer is used to create a durable subscription on a topic like in JMS. The durable subscription is identified by the container id and the link name. Therefore, the container id and the link name needs to be the same to connect to the same durable subscription.

        // Set up the connection
        AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
        Connection connection = new Connection(ctx, host, port, true);
        // Set the container id
        connection.setContainerId("orderprocessor");
        connection.connect();
        Session session = connection.createSession(Integer.MAX_VALUE, Integer.MAX_VALUE);
        // Create the durable on topic hierarchy "orders.%"
        DurableConsumer c = session.createDurableConsumer("ordermailbox", "orders.%", 20,
                                                          QoS.EXACTLY_ONCE, true, null);

        // Do processing

        // Close the link
        c.close();
        // Delete the durable if not needed anymore
        if (!neededAnymore)
          c.unsubscribe();
        session.close();
        connection.close();

TransactionController

A TransactionController can be obtained from a Session to do transactional work on it. A Session does not have to be created as a transacted Session as in JMS. Rather a Session can handle non-transacted and transacted traffic at the same time.

The Session's TransactionController is obtained by use of the corresponding getter method and the transaction capabilities can be retrieved:

        TransactionController txc = session.getTransactionController();
        boolean multiTxnsPerSession = txc.isSupportMultiTxnsPerSsn();

The SwiftMQ Router can handle local transactions (LOCAL_TRANSACTIONS) and multiple transactions per session (MULTI_TXNS_PER_SSN).

To do transactional work, a transaction id is required. This is created by the TransactionController (which obtains it from the transactional resource):

        TxnIdIF txnid = txc.createTxnId();

To finish a transaction, either "commit()":

        txc.commit(txnid);

or "rollback()" needs to be called:

        txc.rollback(txnid);

Outbound (Sending) Transactions

To send a message in a transaction, the message has to be associated with the transaction id.

        // Send messages in transactions of size <txSize>
        int currentTxSize = 0;
        TxnIdIF txnId = txc.createTxnId();
        for (int i = 0; i < nMsgs; i++)
        {
          AMQPMessage msg = new AMQPMessage();
          String s = "Message #" + (i + 1);
          System.out.println("Sending " + s);
          msg.setAmqpValue(new AmqpValue(new AMQPString(s)));
          msg.setTxnIdIF(txnId); // Transaction association
          p.send(msg);
          currentTxSize++;
          if ((i + 1) % txSize == 0)
          {
            txc.commit(txnId);
            txnId = txc.createTxnId();
            currentTxSize = 0;
          }
        }
        if (currentTxSize > 0)
          txc.commit(txnId);

Inbound (Receiving) Transactions as Transactional Retirement

Transactional retirement means that messages are delivered to the Consumer in a non-transacted fashion. After the client has received it, the outcome in form of accepting or rejecting the message can be associated with a transaction and will then be applied to the messages on commit or discarded on rollback. So NOT the messages are associated with a transaction BUT the outcomes accept and reject.

The big advantage here is that the messages are still at the client (and don't need to be redelivered) if a transaction is rolled back and so the messages can be associated with another transaction or can be settled in a non-transacted fashion. This provides an enormous flexibility.

Transactional retirement is not conform with QoS mode AT_MOST_ONCE (pre-settlement).

        TxnIdIF txnid = txc.createTxnId();
        AMQPMessage msg = c.receive();
        msg.setTxnIdIF(txnid);  // Transaction association
        try {
          String s = "RE: "+((AMQPString)msg.getAmqpValue().getValue()).getValue();
          AMQPMessage msg1 = new AMQPMessage();
          msg1.setAmqpValue(new AmqpValue(new AMQPString(s)));
          msg1.setTxnIdIF(txnid);  // Transaction association
          producer.send(msg1);
          msg.accept();
        } catch (Exception e)
        {
          // error, reject
          msg.reject();
        }
        txc.commit(txnid);

Inbound (Receiving) Transactions as Transactional Acquisition

AMQP provides a second way to handle inbound traffic as transactions. It is called transactional acquisition. Here the client acquires a number of messages under a transaction id so the delivery is already associated with that transaction id. This is the only difference to the transactional retirement.

Transactional acquisition is not conform with QoS mode AT_MOST_ONCE (pre-settlement).

The Consumer must be created without a link credit parameter because the link credit is dimensioned by the number of acquired messages:

        // Important to create the consumer without a link credit
        // because the link credit is set by the acquisition
        Consumer c = session.createConsumer(source, OoS.EXACTLY_ONCE, true, null);

        // Get the transaction controller
        TransactionController txc = session.getTransactionController();

        // Receive messages in transactions in size <txSize>
        int currentTxSize = 0;
        TxnIdIF txnId = txc.createTxnId();
        // Acquire <txSize> messages under this txnid
        c.acquire(txSize, txnId);
        for (int i = 0; i < nMsgs; i++)
        {
          AMQPMessage msg = c.receive();
          if (msg == null)
            break;
          AmqpValue value = msg.getAmqpValue();
          System.out.println("Received: " + ((AMQPString) value.getValue()).getValue());
          msg.accept();
          currentTxSize++;
          if ((i + 1) % txSize == 0)
          {
            // Commit and acquire the next <txSize> messages under a new txnid
            txc.commit(txnId);
            txnId = txc.createTxnId();
            c.acquire(txSize, txnId);
            currentTxSize = 0;
          }
        }
        if (currentTxSize > 0)
          txc.commit(txnId);

Link Recovery (since 9.1.0)

If a Link (a Producer, Consumer or DurableConsumer) cannot be properly closed by it's close()-Method, e.g. because the corresponding connection disconnects, there can be unsettled deliveries. In order to finish the settlement, a link can be recovered.

Each Link has a so-called DeliveryMemory where unsettled deliveries are stored and removed after settlement. This DeliveryMemory is specified by this interface:

      package com.swiftmq.amqp.v100.client;

      import com.swiftmq.amqp.v100.generated.transport.definitions.DeliveryTag;

      import java.util.Collection;

      /**
       * Specifies a memory to store unsettled deliveries of a link (producer and consumer).
       *
       * @author IIT Software GmbH, Bremen/Germany, (c) 2012, All Rights Reserved
       */
      public interface DeliveryMemory
      {
        /**
         * Will be called from the link to set its link name. This is only done if the name
         * has not been set before and ensures that new created links that use this delivery
         * memory use the same link name as before.
         *
         * @param name
         */
        public void setLinkName(String name);

        /**
         * Returns the link name,
         *
         * @return link name
         */
        public String getLinkName();

        /**
         * Adds an unsettled delivery which consists of a delivery tag, the delivery state
         * and the AMQP message.
         *
         * @param unsettledDelivery unsettled delivery
         */
        public void addUnsettledDelivery(UnsettledDelivery unsettledDelivery);

        /**
         * Removes an unsettled delivery from the memory.
         *
         * @param deliveryTag delivery tag
         */
        public void deliverySettled(DeliveryTag deliveryTag);

        /**
         * Returns the number of unsettled deliveries contained in this memory.
         *
         * @return number unsettled deliveries
         */
        public int getNumberUnsettled();

        /**
         * Returns a collection of all unsettled deliveries. The delivery memory remains untouched
         * so the returned Collection is a copy (or better a clone) of the content.
         *
         * @return unsettled deliveries
         */
        public Collection<UnsettledDelivery> getUnsettled();
      }

One of the hidden beauties of AMQP 1.0 is the ability to delegate the management of deliveries and their state to the application which can reconstruct this state out of application data. So even if the client dies and there are unsettled deliveries, it can be recovered.

A DeliveryMemory can be optionally specified as a parameter to the various "create" methods of a session, e.g.:

      Producer p = session.createProducer("orderqueue@router4",
                                          QoS.EXACTLY_ONCE,
                                          new OrderDatabaseDeliveryMemory());

To identify a delivery, an application can tag a message with its own DeliveryTag which will then be used for the settlement via the DeliveryMemory. If no delivery tag is specified, an internal sequence number will be used instead:

      AMQPMessage msg = new AMQPMessage();
      ...
      msg.setDeliveryTag(getOrderId());
      p.send(msg);

This works similar for consumers.

The actual recovery of a Link will be done during Link creation, see above. Before the Link is attached, it retrieves the unsettled deliveries from the DeliveryMemory and performs the settlement.

If no DeliveryMemory is specified when the Link is created, a DefaultDeliveryMemory is used which stores unsettled deliveries in a Map. If the client dies, this state is lost, of course.

So the most basic recovery without any custom implementation of DeliveryMemory or DeliveryTag can look like this:

      Producer p = session.createProducer("orderqueue@router4",
                                          QoS.AT_LEAST_ONCE);
      ...
      p.send(msg); // Always asynchronously
      ...
      < CONNECTION DROPS >
      // Retrieve the old DeliveryMemory from the former Producer, do the
      // recovery and continue
      Producer p = session.createProducer("orderqueue@router4",
                                          QoS.AT_LEAST_ONCE,
                                          p.getDeliveryMemory());
      p.close(); // All settlement done (recovered)

Filters

AMQP 1.0 filters are extension points which are listed in a public registry on www.amqp.org

To ensure minimal interoperability between AMQP brokers and clients concerning JMS selector filters, the SwiftMQ AMQP 1.0 Java Client uses the filter declarations APACHE.ORG:SELECTOR and APACHE.ORG:NO_LOCAL of Apache Qpid's AMQP 1.0 implementation.