RabittMQ batch message processing

There are times when we want to fire set of jobs and be notified when all of them complete. This can be easily accomplished with the latest version of Hoplin.io RabbitMQ client.

A use case for using batch messages would be partitioning a document and processing each partition via separate client.

As always we need our Publisher and Subscriber we will start with publisher first.

We start by creating a new client and then enqueuing number of jobs to process, upon completion we display the time it took to complete all jobs. Client will attempt to use Direct-Reply queue if available.

   public static void main(final String... args) throws IOException, InterruptedException
    {
        final BatchClient client = new DefaultBatchClient(options(), bind());

        client.startNew(context ->
        {
            for(int i = 0; i < 1000; ++i)
            {
                context.enque(() -> new LogDetail("Msg >> " + System.nanoTime(), "info"));
                context.enque(() -> new LogDetail("Msg >> " + System.nanoTime(), "warn"));
            }
        })
        .whenComplete((context, throwable)->
        {
                log.info("Batch completed in : {}", context.duration());
        });

        Thread.currentThread().join();
    }

    private static Binding bind()
    {
        return BindingBuilder
                .bind("batch.documents")
                .to(new DirectExchange("exchange.batch"))
                .build()
                ;
    }

The subscriber is little bit more involved, this is the part of API that needs to be simplified.


/**
 * Batch Job receiver
 */
public class ReceiveBatchJob extends BaseExample
{
    private static final Logger log = LoggerFactory.getLogger(ReceiveBatchJob.class);

    private static final String EXCHANGE = "exchange.batch";

    private static RabbitMQClient mqClient;

    public static void main(final String... args) throws InterruptedException
    {
        final ExchangeClient client = DirectExchangeClient.create(options(), EXCHANGE);
        mqClient = client.getMqClient();

        client.subscribe("test", LogDetail.class, ReceiveBatchJob::handle);
        Thread.currentThread().join();
    }

    private static void handle(final LogDetail msg, final MessageContext context)
    {
        final AMQP.BasicProperties properties = context.getProperties();
        final String replyTo = properties.getReplyTo();
        final String correlationId = properties.getCorrelationId();
        final Map<String, Object> headers = properties.getHeaders();
        final Object batchId = headers.get("x-batch-id");
        headers.put("x-batch-correlationId", correlationId);

        log.info("Incoming context        >  {}", context);
        log.info("Incoming replyTo        >  {}", replyTo);
        log.info("Incoming msg            >  {}", msg);
        log.info("Incoming correlationId  >  {}", correlationId);
        log.info("Incoming batchId        >  {}", batchId);

        final LogDetail reply = new LogDetail("Reply Message", "WARN");
        mqClient.basicPublish("", replyTo, reply, headers);
    }
}

There at two important properties that need to be looked at x-batch-id and x-batch-correlationId, currently they need to be copied directly form message and then reply need to be published via the underlying client.

Not really best API design at the moment as it mixes concerns and exposed the underlying RattitMQ client but the initial release will try to address this and simplify usage.

Leave a Comment

Your email address will not be published. Required fields are marked *