Development

RabbitMQ message processing and retry logic

Tackling with RabbitMQ’s message broker unusual feature: message returning to the original queue if something breaks during message consumption.

November 17 2017

If you have ever used the RabbitMQ message broker and tried to consume its messages, you have certainly noticed one unusual feature: if something breaks, during message consumption the message is (by default) returned to the original queue. And after a few seconds it can break again, and again, and so on. What options do we have to prevent this behavior? Or to change this logic to, for example: "retry only after 5 minutes"

Avoid requeuing

One option is to throw an AmqpRejectAndDontRequeException and the message will not be requeued. Or, we can simply turn off this feature using amqp property:

spring.rabbitmq.listener.default-requeue-rejected: false

Now the message retrieved from queue A will not be requeued automatically, and if dead-lettering is correctly defined for this queue, the message will finish in some other queue. For simplicity, let's call this new queue B. So, now we have the message in queue B instead of queue A. How to retry a message from queue B?

Set time-to-live (TTL)

We can set time-to-live (TTL) on queue B. After TTL expires, message on queue B is declared as dead. If we set dead-lettering on queue B, we can immediately return the message again to queue A. This way, if we set TTL to 5 minutes, messages will go from B to A after 5 minutes. If anything fails during message processing, the whole process is repeated again: message stays in B for 5 minutes and then goes to A.

We could also set TTL on an individual message, instead on queue B. That would allow us to set different expiration values to different messages. For example, we could increment TTL for every new message failure. First time the message fails, we would set TTL to 5 minutes, second time to 15 minutes etc. We can’t rely on default settings and RabbitMQ properties, and we need to use our java / spring knowledge to accomplish this behavior.

Message recovering

If we dig beneath the surface of the spring-amqp project, and its RabbitTemplate abstraction, we will see that message requeuing is called message recovering, and it is done with the help of two beans. The first one is RepublishMessageRecoverer, so we need to extend it:

public class CustomMessageRecoverer extends RepublishMessageRecoverer {

	public CustomMessageRecoverer(AmqpTemplate errorTemplate, String errorExchange) {
	    super(errorTemplate, errorExchange);
	}

	@Override
	protected Map<? extends String, ? extends Object> additionalHeaders(Message message,Throwable cause) {
	    Map headers = new HashMap<>();
	    // ...
	    headers.put(HEADER_RETRY_COUNT, count);
	    message.getMessageProperties().setExpiration(count * 60 * 1000);
	    return headers;
	}
}

... and to define the bean from it:

@Bean
public CustomMessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
    CustomMessageRecoverer recoverer = new CustomMessageRecoverer(rabbitTemplate,
		DEAD_LETTER_EXCHANGE_NAME);
    recoverer.setErrorRoutingKeyPrefix("");
    return recoverer;
}

There are a few important things here. First, we are setting TTL of an individual message using the setExpiration(ms) method. In this part of code we can set any arbitrary RabbitMQ header for additional processing when a message returns from B to A. Second, when creating our CustomMessageRecoverer instance, we need to set dead-letter exchange to which the failed message will be sent. Third, we need to set an errorRoutingKeyPrefix to the empty string. The reason is simple: we want that failed message returns to original queue after TTL expire (queue A), and errorRoutingKeyPrefix is just a prefix that will be added to this queue’s name.

We also need to set our recoverer as default:

@Bean
public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor(RabbitTemplate rabbitTemplate, CustomMessageRecoverer recoverer) {
    StatefulRetryOperationsInterceptor interceptor =
	RetryInterceptorBuilder.StatefulRetryInterceptorBuilder.stateful()
	            .maxAttempts(1)     	        
	            .recoverer(recoverer)
	            .build();
    return interceptor;
}

Now, spring-amqp will use our custom message recoverer every time message processing fails due to any reason.

Happy (re)queuing!

By Almir Pehratovic, Senior Software Engineer / Team Leader at Infobip