SwiftMQ Documentation

SwiftMQ Documentation

  • Client
  • CE
  • UR
  • HA
  • Javadocs
  • Release Notes

›SwiftMQ Client

SwiftMQ Client

  • Getting Started
  • JNDI Client
  • JMS Client
  • AMQP 1.0 Client
  • Filetransfer Client

SwiftMQ CE

  • Getting Started
  • Installation
  • Upgrade
  • Software Architecture
  • Administration

    • Preconfig
    • CLI Command Line Interface
    • JMX Administration
    • CLI Message Interface
    • routerconfig.xml Watch Dog
    • System Properties

    Swiftlets

    • AMQP Swiftlet
    • Authentication Swiftlet
    • Deploy Swiftlet
    • JMS Swiftlet
    • JNDI Swiftlet
    • Log Swiftlet
    • Management Swiftlet
    • MQTT Swiftlet
    • Network Swiftlet
    • Queue Manager Swiftlet
    • Routing Swiftlet
    • Scheduler Swiftlet
    • Store Swiftlet
    • Streams Swiftlet
    • Threadpool Swiftlet
    • Timer Swiftlet
    • Topic Manager Swiftlet
    • Trace Swiftlet
    • XA Resource Manager Swiftlet

    APIs

    • CLI Admin API

    How To

    • TLS Configuration
    • HTTP Tunneling Configuration
    • Upgrade SwiftMQ Router and Clients

    In Depth

    • Versioning
    • File Store

SwiftMQ UR

  • Getting Started
  • License
  • Installation
  • Upgrade
  • Administration

    • SwiftMQ Explorer

    Swiftlets

    • JDBC Authentication Swiftlet
    • FileCache Swiftlet
    • JMS Application Container Swiftlet
    • JMS XA Swiftlet
    • JDBC Store Swiftlet
    • AMQP Bridge Extension Swiftlet
    • JMS Bridge Extension Swiftlet
    • JavaMail Bridge Extension Swiftlet
    • Replicator Extension Swiftlet

SwiftMQ HA

  • Getting Started
  • License
  • Installation
  • Upgrade
  • HA Introduction
  • HA Deployment
  • Administration
  • JNDI/JMS under HA
  • Routing under HA
  • Advanced Configuration
  • HA Test Suite

AMQP 1.0 Client

Introduction

The SwiftMQ AMQP 1.0 Java Client is an implementation of the AMQP 1.0 specification in Java and can be used to connect to any AMQP 1.0 capable endpoints such as a SwiftMQ Router.

The client API is similar to JMS but easier to handle, thread-safe if not specified otherwise and consists only of a few classes:

ClassPackageDescription
AMQPMessagecom.swiftmq.amqp.v100.messagingRepresents an AMQP message.
QoScom.swiftmq.amqp.v100.clientConstants representing the different quality of service modes.
AMQPContextcom.swiftmq.amqpRepresents the context of the client.
Connectioncom.swiftmq.amqp.v100.clientConnection to an AMQP endpoint.
ExceptionListenercom.swiftmq.amqp.v100.clientCan be registered at a Connection to listen for disconnects.
Sessioncom.swiftmq.amqp.v100.clientGrouping context for Links.
Producercom.swiftmq.amqp.v100.clientOutgoing (sending) Link.
Consumercom.swiftmq.amqp.v100.clientIncoming (receiving) Link.
DurableConsumercom.swiftmq.amqp.v100.clientIncoming (receiving) Durable Link.
MessageAvailabilityListenercom.swiftmq.amqp.v100.clientUsed for non-blocking receives.
DeliveryMemorycom.swiftmq.amqp.v100.clientUsed for settlement and link recovery.
TransactionControllercom.swiftmq.amqp.v100.clientCan control multiple transactions per Session.

Specification and Samples

Other classes referenced from the above are part of SwiftMQ's AMQP library (amqp.jar) which is almost directly generated from the specification's XML files.

There are also several example programs provided under the SwiftMQ distribution's directory "samples/amqp". Please have a look!

Debugging

The following system properties can be set at the client to enable debugging. All output goes to System.out.

System PropertyDescription
swiftmq.amqp.frame.debugShows all sent and received AMQP frames.
swiftmq.amqp.debugShows quite verbose debug output of the SwiftMQ AMQP library.

Example:

-Dswiftmq.amqp.frame.debug=true -Dswiftmq.amqp.debug=true

AMQPMessage

The AMQP specification does not define a message type. It only specified sections that can make a message. AMQPMessage is SwiftMQ's custom type of a message that handles access to the sections. With the exception of accept() and reject() an AMQPMessage is not thread-safe.

The following fields are overwritten from the Producer during send and therefore MUST not be set from the application:

SectionField
HeaderDurable
HeaderPriority
HeaderTtl
PropertiesMessageId
PropertiesTo
PropertiesUserId

QoS

For convenience the client API pre-defines 3 modes of quality of service (QoS):

ModeDescription
AT_MOST_ONCEMessages are pre-settled at the sender endpoint. Messages may be lost.
AT_LEAST_ONCEMessages are received and settled at the receiver without waiting for the sender to settle. Duplicates may occur.
EXACTLY_ONCEMessages are (1) received, (2) the sender settles and then (3) the receiver settles. Messages are delivered once and only once.

AMQPContext

The client requires thread pool and tracing facilities from its environment. This is provided from the AMQPContext class which has 2 modes, CLIENT and ROUTER. Mode ROUTER is for internal use only so the context always needs to be created with mode CLIENT:

            AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);

Connection

Without SASL

A Connection without SASL authentication is created by setting the doAuth parameter to false:

            Connection connection = new Connection(ctx, host, port, false);

With SASL

A Connection with SASL authentication is created by setting the doAuth parameter to true. If username and password are not specified, connect takes place via the SASL mechanism ANONYMOUS:

            Connection connection = new Connection(ctx, host, port, true);

Otherwise as the specified user with SASL mechanism PLAIN by default:

            Connection connection = new Connection(ctx, host, port, true, username, password);

With SASL EXTERNAL

Here is a nice writeup by Jakub Scholz about using SSL and SASL EXTERNAL authentication with the Apache Qpid C++ Broker.

Connection Configuration and actual connect

The Connection is in unconnected state after construction and can now be configured. For example, the maximum frame size and an exception listener can be set or the SASL mechanism to use can be specified. The SASL mechanism depends on the provision of the remote endpoint. SwiftMQ provides PLAIN, ANONYMOUS and the platform mechanisms CRAM-MD5 and Digest-MD5. For use of ANONYMOUS mechanism please use the resp. constructor of the connection.

            connection.setMechanism("CRAM-MD5");
            connection.setMaxFrameSize(1024);
            connection.setIdleTimeout(120000); // Sends and expects heart beat frames
            connection.setExceptionListener(this);

There are 2 socket factories that can be used to create the TCP connections: com.swiftmq.net.PlainSocketFactory (default) and com.swiftmq.net.JSSESocketFactory. The latter must be specified to connect via SSL/TLS:

            connection.setSocketFactory(new JSSESocketFactory());

Finally, do the connect:

            connection.connect();

Session

A Session is created from a Connection. It multiplexes traffic between link endpoints and can handle non-transacted and transacted transfers with multiple active transactions simultaneously. It maintains incoming and outgoing windows of unsettled transfers (depending on the maximum frame size, a message can be splitted over multiple transfers). Both needs to be specified when creating the Session:

            Session session = connection.createSession(100, 100);

Producer

A Producer is created from a Session on a target (queue or topic) with a specific QoS. The whole settlement interaction will be done internally so there is nothing more to do than to send messages. The transfer itself is asynchronously as well as the settlement. Only the close() of the Producer is done synchronously and ensures that all settlement has been done for the previously sent messages when the close()-Method returns.

If the connection is dropped before close() can be called, unsettled deliveries can be recovered. See last section Link Recovery on the end of this page.

            Producer p = session.createProducer("orderqueue@router4", QoS.AT_LEAST_ONCE);
            ...
            p.send(msg); // Always asynchronously
            ...
            p.close(); // Settlement completed after this call returns

Consumer

A Consumer is created from a Session on a source (queue or topic) with a specific QoS. Each Consumer has a client-side message cache which is dimensioned by the link credit, a value that is passed to the source.

On a non-temporary Source

A Consumer on a non-temporary source (regular queue or topic) is created by specifying the name of the source, the link credit (consumer cache size), the QoS, a NoLocal flag and an optional message selector. If the NoLocal flag is set to true (default) then messages sent from the same connection on a topic are not received. This is the case when a client sends and receives on the same topic but don't want to receive its own messages. The message selector is a string which the connected message broker must understand. For SwiftMQ this is a JMS message selector.

            Consumer c = session.createConsumer("orderqueue", 200, QoS.EXACTLY_ONCE,
                                                true, "orderid between 0 and 100");

On a temporary Sources

A Consumer on a temporary source (temporary queue, e.g. for request/reply) is created like above but without specifying a source parameter. This automatically creates the temporary source with a lifetime of that of this link and attaches the Consumer to it. The address of the temporary source can be obtained by calling getRemoteAddress():

            Consumer c = session.createConsumer(200, QoS.AT_MOST_ONCE);
            AddressIF remoteAddress = c.getRemoteAddress(); // Use it for request/reply

Non-blocking Receive

To consume messages, the various receive() methods of a consumer can be called. There is no message listener like in JMS but a way to perform non-blocking receives. This is done by calling receiveNoWait(listener) and passing a MessageAvailabilityListener as a parameter. If there is no message available at the time of the receiveNoWait(listener) call, the MessageAvailabilityListener is stored and called later when a message is available:

          // Implementation of MessageAvailabilityListener
          // Notifies a Poller thread to call poll()
          public void messageAvailable(Consumer consumer)
          {
            poller.enqueue(this);
          }
    
          // Called from the Poller thread
          public void poll()
          {
            AMQPMessage msg = c.receiveNoWait(this);
            if (msg != null)
            {
              // process message
            }
          }

Settlement

Settlement on the Consumer side needs to be done by the application and can be done in 2 ways. If a message is unsettled, it can be accpeted or rejected. The latter will lead to a redelivery of the message. Note that AMQP settles each single message (and not streams of messages like in JMS). Settlement can be done by different threads so it's completely legal to call "receive()" by different threads and process them in parallel, e.g. by using worker thread pools. The order of settlement is not relevant, that is, message #3 can be settled before message #1.

Settlement is done asynchronously. Only the close() of the Consumer is done synchronously and ensures that all settlement has been done for the previously consumed messages when the close()-Method returns. If the connection is dropped before close() can be called, unsettled deliveries can be recovered. See last section Link Recovery on the end of this page.

            AMQPMessage msg = c.receive();
            if (msg != null) // null is returned if the consumer was closed
            {
              if (!msg.isSettled())
              {
                if (processed(msg))
                  msg.accept();
                else
                  msg.reject();
              }
            }

DurableConsumer

A DurableConsumer is created from a Session on a topic source with a specific QoS. The durable link is identified by a link name. If the link does not exists, it will be created with a terminus expiry policy of NEVER and a terminus durability of CONFIGURATION. If it exists, it will be attached to the DurableConsumer. The durable link will remain in place after detaching the DurableConsumer so messages are received while the DurableConsumer is disconnected and delivered to it once the DurableConsumer reconnects. A call to unsubscribe() will set the terminus expiry policy to LINK_DETACH and the durable link will be deleted.

If connected to a SwiftMQ Router, a DurableConsumer is used to create a durable subscription on a topic like in JMS. The durable subscription is identified by the container id and the link name. Therefore, the container id and the link name needs to be the same to connect to the same durable subscription.

            // Set up the connection
            AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
            Connection connection = new Connection(ctx, host, port, true);
            // Set the container id
            connection.setContainerId("orderprocessor");
            connection.connect();
            Session session = connection.createSession(Integer.MAX_VALUE, Integer.MAX_VALUE);
            // Create the durable on topic hierarchy "orders.%"
            DurableConsumer c = session.createDurableConsumer("ordermailbox", "orders.%", 20,
                                                              QoS.EXACTLY_ONCE, true, null);
    
            // Do processing
    
            // Close the link
            c.close();
            // Delete the durable if not needed anymore
            if (!neededAnymore)
              c.unsubscribe();
            session.close();
            connection.close();

TransactionController

A TransactionController can be obtained from a Session to do transactional work on it. A Session does not have to be created as a transacted Session as in JMS. Rather a Session can handle non-transacted and transacted traffic at the same time.

The Session's TransactionController is obtained by use of the corresponding getter method and the transaction capabilities can be retrieved:

            TransactionController txc = session.getTransactionController();
            boolean multiTxnsPerSession = txc.isSupportMultiTxnsPerSsn();

The SwiftMQ Router can handle local transactions (LOCAL_TRANSACTIONS) and multiple transactions per session (MULTI_TXNS_PER_SSN).

To do transactional work, a transaction id is required. This is created by the TransactionController (which obtains it from the transactional resource):

            TxnIdIF txnid = txc.createTxnId();

To finish a transaction, either commit():

            txc.commit(txnid);

or rollback() needs to be called:

            txc.rollback(txnid);

Outbound (Sending) Transactions

To send a message in a transaction, the message has to be associated with the transaction id.

            // Send messages in transactions of size 
            int currentTxSize = 0;
            TxnIdIF txnId = txc.createTxnId();
            for (int i = 0; i < nMsgs; i++)
            {
              AMQPMessage msg = new AMQPMessage();
              String s = "Message #" + (i + 1);
              System.out.println("Sending " + s);
              msg.setAmqpValue(new AmqpValue(new AMQPString(s)));
              msg.setTxnIdIF(txnId); // Transaction association
              p.send(msg);
              currentTxSize++;
              if ((i + 1) % txSize == 0)
              {
                txc.commit(txnId);
                txnId = txc.createTxnId();
                currentTxSize = 0;
              }
            }
            if (currentTxSize > 0)
              txc.commit(txnId);

Inbound (Receiving) Transactions as Transactional Retirement

Transactional retirement means that messages are delivered to the Consumer in a non-transacted fashion. After the client has received it, the outcome in form of accepting or rejecting the message can be associated with a transaction and will then be applied to the messages on commit or discarded on rollback. So NOT the messages are associated with a transaction BUT the outcomes accept and reject.

The big advantage here is that the messages are still at the client (and don't need to be redelivered) if a transaction is rolled back and so the messages can be associated with another transaction or can be settled in a non-transacted fashion. This provides an enormous flexibility.

Transactional retirement is not conform with QoS mode AT_MOST_ONCE (pre-settlement).

            TxnIdIF txnid = txc.createTxnId();
            AMQPMessage msg = c.receive();
            msg.setTxnIdIF(txnid);  // Transaction association
            try {
              String s = "RE: "+((AMQPString)msg.getAmqpValue().getValue()).getValue();
              AMQPMessage msg1 = new AMQPMessage();
              msg1.setAmqpValue(new AmqpValue(new AMQPString(s)));
              msg1.setTxnIdIF(txnid);  // Transaction association
              producer.send(msg1);
              msg.accept();
            } catch (Exception e)
            {
              // error, reject
              msg.reject();
            }
            txc.commit(txnid);

Inbound (Receiving) Transactions as Transactional Acquisition

AMQP provides a second way to handle inbound traffic as transactions. It is called transactional acquisition. Here the client acquires a number of messages under a transaction id so the delivery is already associated with that transaction id. This is the only difference to the transactional retirement.

Transactional acquisition is not conform with QoS mode AT_MOST_ONCE (pre-settlement).

The Consumer must be created without a link credit parameter because the link credit is dimensioned by the number of acquired messages:

            // Important to create the consumer without a link credit
            // because the link credit is set by the acquisition
            Consumer c = session.createConsumer(source, OoS.EXACTLY_ONCE, true, null);
    
            // Get the transaction controller
            TransactionController txc = session.getTransactionController();
    
            // Receive messages in transactions in size 
            int currentTxSize = 0;
            TxnIdIF txnId = txc.createTxnId();
            // Acquire  messages under this txnid
            c.acquire(txSize, txnId);
            for (int i = 0; i < nMsgs; i++)
            {
              AMQPMessage msg = c.receive();
              if (msg == null)
                break;
              AmqpValue value = msg.getAmqpValue();
              System.out.println("Received: " + ((AMQPString) value.getValue()).getValue());
              msg.accept();
              currentTxSize++;
              if ((i + 1) % txSize == 0)
              {
                // Commit and acquire the next  messages under a new txnid
                txc.commit(txnId);
                txnId = txc.createTxnId();
                c.acquire(txSize, txnId);
                currentTxSize = 0;
              }
            }
            if (currentTxSize > 0)
              txc.commit(txnId);

Link Recovery

If a Link (a Producer, Consumer or DurableConsumer) cannot be properly closed by it's close()-Method, e.g. because the corresponding connection disconnects, there can be unsettled deliveries. In order to finish the settlement, a link can be recovered.

Each Link has a so-called DeliveryMemory where unsettled deliveries are stored and removed after settlement. This DeliveryMemory is specified by this interface:

          package com.swiftmq.amqp.v100.client;
    
          import com.swiftmq.amqp.v100.generated.transport.definitions.DeliveryTag;
    
          import java.util.Collection;
    
          /**
           * Specifies a memory to store unsettled deliveries of a link (producer and consumer).
           *
           * @author IIT Software GmbH, Bremen/Germany, (c) 2012, All Rights Reserved
           */
          public interface DeliveryMemory
          {
            /**
             * Will be called from the link to set its link name. This is only done if the name
             * has not been set before and ensures that new created links that use this delivery
             * memory use the same link name as before.
             *
             * @param name
             */
            public void setLinkName(String name);
    
            /**
             * Returns the link name,
             *
             * @return link name
             */
            public String getLinkName();
    
            /**
             * Adds an unsettled delivery which consists of a delivery tag, the delivery state
             * and the AMQP message.
             *
             * @param unsettledDelivery unsettled delivery
             */
            public void addUnsettledDelivery(UnsettledDelivery unsettledDelivery);
    
            /**
             * Removes an unsettled delivery from the memory.
             *
             * @param deliveryTag delivery tag
             */
            public void deliverySettled(DeliveryTag deliveryTag);
    
            /**
             * Returns the number of unsettled deliveries contained in this memory.
             *
             * @return number unsettled deliveries
             */
            public int getNumberUnsettled();
    
            /**
             * Returns a collection of all unsettled deliveries. The delivery memory remains untouched
             * so the returned Collection is a copy (or better a clone) of the content.
             *
             * @return unsettled deliveries
             */
            public Collection getUnsettled();
          }

One of the hidden beauties of AMQP 1.0 is the ability to delegate the management of deliveries and their state to the application which can reconstruct this state out of application data. So even if the client dies and there are unsettled deliveries, it can be recovered.

A DeliveryMemory can be optionally specified as a parameter to the various create methods of a Session, e.g.:

          Producer p = session.createProducer("orderqueue@router4",
                                              QoS.EXACTLY_ONCE,
                                              new OrderDatabaseDeliveryMemory());

To identify a delivery, an application can tag a message with its own DeliveryTag which will then be used for the settlement via the DeliveryMemory. If no delivery tag is specified, an internal sequence number will be used instead:

          AMQPMessage msg = new AMQPMessage();
          ...
          msg.setDeliveryTag(getOrderId());
          p.send(msg);

This works similar for consumers.

The actual recovery of a Link will be done during Link creation, see above. Before the Link is attached, it retrieves the unsettled deliveries from the DeliveryMemory and performs the settlement.

If no DeliveryMemory is specified when the Link is created, a DefaultDeliveryMemory is used which stores unsettled deliveries in a Map. If the client dies, this state is lost, of course.

So the most basic recovery without any custom implementation of DeliveryMemory or DeliveryTag can look like this:

          Producer p = session.createProducer("orderqueue@router4",
                                              QoS.AT_LEAST_ONCE);
          ...
          p.send(msg); // Always asynchronously
          ...
          < CONNECTION DROPS >
          // Retrieve the old DeliveryMemory from the former Producer, do the
          // recovery and continue
          Producer p = session.createProducer("orderqueue@router4",
                                              QoS.AT_LEAST_ONCE,
                                              p.getDeliveryMemory());
          p.close(); // All settlement done (recovered)

Filters

AMQP 1.0 filters are extension points which are listed in a public registry on www.amqp.org

To ensure minimal interoperability between AMQP brokers and clients concerning JMS selector filters, the SwiftMQ AMQP 1.0 Java Client uses the filter declarations APACHE.ORG:SELECTOR and APACHE.ORG:NO_LOCAL of Apache Qpid's AMQP 1.0 implementation.

← JMS ClientFiletransfer Client →
  • Introduction
  • Specification and Samples
  • Debugging
  • AMQPMessage
  • QoS
  • AMQPContext
  • Connection
    • Without SASL
    • With SASL
    • With SASL EXTERNAL
    • Connection Configuration and actual connect
  • Session
  • Producer
  • Consumer
    • On a non-temporary Source
    • On a temporary Sources
    • Non-blocking Receive
    • Settlement
  • DurableConsumer
  • TransactionController
    • Outbound (Sending) Transactions
    • Inbound (Receiving) Transactions as Transactional Retirement
    • Inbound (Receiving) Transactions as Transactional Acquisition
  • Link Recovery
  • Filters
Copyright © 2020 IIT Software GmbH