Messaging servers facilitate the exchange of binary or text information between remote systems, and are useful in any distributed system where one node needs to share information with another. They have a lot to do: They must deliver messages, monitor client connections, and store and resend undelivered messages when a destination system comes back online.
You don't need a message server if guaranteed message delivery isn't important, but for some types of applications, such as financial transactions, it is essential that messages arrive at their destination in the right order and be stored in the case of a network failure.
Apache ActiveMQ, based on the Java Message Service 1.1 specification, is a mature open source message server with a range of advanced features. It supports a wide range of client libraries, which allows programs written in languages such as C/C++, .NET, PHP, and Ruby to access and use the message server. It is also frequently tested with popular JEE servers such as JBoss, GlassFish, and WebLogic. If you're writing an application that encompasses distributed systems, you can use ActiveMQ as your application's communication channel.
To see how such a messaging server works, you can start by implementing a simple remote system logger that logs events, warnings, and errors that occur during a program's execution and sends these log messages over the network to a client that reads and optionally stores or processes the messages. For our purposes, we'll say that multiple log readers must be able to receive the log messages, and if a log reader disconnects, it must be able to catch up with log messages that were issued while it was offline.
You can read more about the basics of ActiveMQ in How to Get Started with ActiveMQ. Install the program, change directory to apache-activemq-5.8.0/bin and run ./activemq start. You should then be able to access the ActiveMQ's admin page from a browser at http://localhost:8161.
Before we start coding, let's talk about some key components. At the highest level a message producer, which can be any type of software including a server, a database, or an application, wants to send a message to a remote system. The remote system is a message consumer, and there can be more than one of them. The producer submits the message to the message server, which processes it and sends it to the message consumer. The message producer doesn't communicate directly with the consumer, but rather uses the message server as a broker.
ActiveMQ (and JMS) has two models for processing messages: a publish-and-subscribe model and a load-balancer model. The former is implemented as topics in ActiveMQ, while the latter is implemented as queues.
With topics, the producer submits a message to the message server, which sends it to all the consumers that have registered to receive the messages.
With queues, the message server sends any one message to exactly one consumer. If there are two or more consumers, they will receive messages alternately as determined by a load balancer built into the message server.
For our purposes, then, queues are not desirable, as each log reader would get only a few of the messages. If we use topics, every log reader that subscribes to receive the messages will get its own copy.
Topics can be either durable or non-durable. Durable topics are stored by the message server until delivered, while non-durable topics have no persistency and are discarded if the consumer is not online to receive the message. For our remote system logger, a log reader needs to register itself with the message server as durable to ensure that the messages are stored and later delivered if the log reader disconnects and then later reconnects.
On our demo logging system, the server part – the message producer – connects to ActiveMQ and starts sending the log messages it is generating. One or more log readers – the message consumers – connect to ActiveMQ to receive those messages. Because we will use topics with durability, once a log reader has registered an interest it can recover from network disconnects without losing any messages.
We have to take into consideration that until the first log reader has registered its interest with ActiveMQ, there will be no consumers for the messages logged by the server component, and they will be lost. Also, ActiveMQ identifies the set of log readers that want to receive log messages by using a combination of the client ID and subscriber name, which means that the client ID needs to be unique for each log reader that connects. After a connection drop, a log reader must use the same client ID to ensure that it can pick up where it left off. Depending on the network topography, you might use information such as IP address, hostname, or MAC address to create a unique client ID.
In a real-world example the logger would be part of a bigger program which would call it to send the log message, but we've coded it as a program of its own for illustration purposes. The log reader would either be a standalone program or part of a larger monitoring solution.
The initial ActiveMQ API calls for the logger start by making a connection to ActiveMQ. Here is that code in Java, but you can use the ActiveMQ API in other languages:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
Connection connection = connectionFactory.createConnection();
The code for the reader is similar, but after creating the connection, the log reader needs to set its client ID. Remember that each client that connects needs to have a unique client ID.
In both the logger and the reader code you then need to start the connection with:
Next, both the logger and log reader need to create a session and destination. This code is the same for both the logger and the log reader:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("TEST.FOO");
For the logger, the final initialization step is to create a message producer object, which is the top-level object used for sending messages, and ensure that ActiveMQ writes the messages to disk before delivering them, so that messages won't be lost if the system reboots.
MessageProducer producer = session.createProducer(destination);
The final initialization step in the log reader code is to create a consumer object:
MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "Log reader");
At this point in the code, both the logger and the log reader have established a link with ActiveMQ. To send messages, the logger calls producer.send(), and to receive messages, the log reader calls consumer.receive().
We can add a logIt() function to the logger code that prepends the date to the log message and wraps it up in a TextMessage object for sending to the log reader. A TextMessage object is the easiest way to send a message that contains a string, which means it can also send XML. The TextMessage object has a getText() method that the log reader can use to extract the string from the received message.
private static void logIt(Session session, MessageProducer producer, String msg) throws javax.jms.JMSException
String logmsg = new Date() + ": " + msg;
System.out.println("Sending: '" + logmsg + "'");
TextMessage message = session.createTextMessage(logmsg);
The downloadable zip archive of the project contains three files. One, build.xml, lets the Apache Ant tool build and execute the logger and reader programs. The other two are the source files Logger.java and LogReader.java. Unzip the archive in the ActiveMQ installation directory (apache-activemq-5.8.0/) and change directory to logger. You can run the logger with the command ant logger, and the log reader with ant logreader.
If you want to see whether the code works as it should, try stopping the log reader and starting it again after a few seconds. ActiveMQ should send all the log messages posted while the log reader was offline to the log reader once it comes back in contact.
As you can see, programming for ActiveMQ is straightforward. Now that you know how to create a logger, try modifying the programs to uses queues rather than topics – call session.createQueue() instead of session.createTopic() – and observe the difference in behavior. You can also try creating a non-durable subscriber – use session.createSubscriber() rather than session.createDurableSubscriber() in the log reader – and see what happens to messages posted when the log reader is offline.
To learn more about ActiveMQ it is also worth looking at the various article resources on the ActiveMQ website.
Allowed tags: <a> link, <b> bold, <i> italics