In the last blog entry I talked about message flow control using CometD‘s lazy channels.
Now I want to show how it is possible to achieve a similar flow control using specialized listeners that allow to manipulate the ServerSession message queue.
The ServerSession message queue is a data structure that is accessed concurrently when messages are published and delivered to clients, so it needs appropriate synchronization when accessed.
In order to simplify this synchronization requirements, CometD allows you to add DeQueueListeners to ServerSessions, with the guarantee that these listeners will be called with the appropriate locks acquired, to allow user code to freely modify the queue’s content.
Below you can find an example of a DeQueueListener that keeps only the first message of a series of message published to the same channel within a tolerance period of 1000 ms, and removes the others (it relies on the timestamp extension):

String channelName = "/stock/GOOG";
long tolerance = 1000;
ServerSession session = ...;
session.addListener(new ServerSession.DeQueueListener()
{
    public void deQueue(ServerSession session, Queue queue)
    {
        long lastTimeStamp = 0;
        for (Iterator iterator = queue.iterator(); iterator.hasNext();)
        {
            ServerMessage message = iterator.next();
            if (channelName.equals(message.getChannel()))
            {
                long timeStamp = Long.parseLong(message.get(Message.TIMESTAMP_FIELD).toString());
                if (timeStamp <= lastTimeStamp + tolerance)
                {
                    System.err.println("removed " + message);
                    iterator.remove();
                }
                else
                {
                    System.err.println("kept " + message);
                    lastTimeStamp = timeStamp;
                }
            }
        }
    }
});

Other possibilities include keeping the last message (instead of the first), coalescing the message fields following a particular logic, or even clearing the queue completely.
DeQueueListeners are called when CometD is about to deliver messages to the client, so clearing the queue completely results in an empty response being sent to the client.
This is different from the behavior of lazy channels, that allowed to delay the message delivery until a configurable timeout expired.
However, lazy channels do not alter the number of messages being sent, while DeQueueListeners can manipulate the message queue.
Therefore, CometD message control flow is often best accomplished by using both mechanisms: lazy channels to delay message delivery, and DeQueueListeners to reduce/coalesce the number of messages sent.


5 Comments

Modex · 04/03/2012 at 11:44

Using Jetty 7.1.4 and CometD 1.1.1 on one of the Amazon EC2 c1.xlarge boxes (8 CPUS, 7GB memory), I was able to iahceve a latency of less than 190ms (99th percentile of 454 ms) with a throughput of 133,000 msgs/s (with SSL turned on).The clients were split across 4 other c1.xlarge boxes. I am looking forward to the CometD-2 release!

Marat · 12/07/2012 at 19:43

Hi Simon,
Thanks for this article! I was able to set up and use deQueue listener as you mentioned. Now, I would like to go a step further and modify the ServerMessage as well, not only the queue, like so…
ServerMessage message = queue.remove();
Map data = message.getDataAsMap();
data.put(“new_key”, “new_value”);
message.someHowPutDataBackIn(data);// is there any way to do that?
queue.add(message);
Is modifying the data in the queue even possible?
Thanks!

    simon · 12/07/2012 at 20:09

    You cannot modify messages that are in the queue and, in fact, you get non-mutable message instances of type ServerMessage (and not ServerMessage.Mutable).
    You have to create new messages using BayeuxServer.newMessage(), remove the old messages, then add the new messages in the queue.

      Marat · 16/07/2012 at 14:11

      That’s awesome!
      Thanks, Simon!

Marat · 20/07/2012 at 15:16

Hi Simon,
Do you know if there is a way to set the size of the queue to be 1? That is make it work as if there was no queue. I am working on a real time application where I don’t need stale data.
Thanks,
Marat

Comments are closed.