RabbitMQ message processing and retry logic
If you have ever used the RabbitMQ message broker and tried to consume its messages, you have certainly noticed one unusual feature: if something breaks, during message consumption the message is (by default) returned to the original queue. And after a few seconds it can break again, and again, and so on. What options do we have to prevent this behavior? Or to change this logic to, for example: “retry only after 5 minutes”
One option is to throw an AmqpRejectAndDontRequeException and the message will not be requeued. Or, we can simply turn off this feature using amqp property:
Now the message retrieved from queue A will not be requeued automatically, and if dead-lettering is correctly defined for this queue, the message will finish in some other queue. For simplicity, let’s call this new queue B. So, now we have the message in queue B instead of queue A. How to retry a message from queue B?
SET TIME-TO-LIVE (TTL)
We can set time-to-live (TTL) on queue B. After TTL expires, message on queue B is declared as dead. If we set dead-lettering on queue B, we can immediately return the message again to queue A. This way, if we set TTL to 5 minutes, messages will go from B to A after 5 minutes. If anything fails during message processing, the whole process is repeated again: message stays in B for 5 minutes and then goes to A.
We could also set TTL on an individual message, instead on queue B. That would allow us to set different expiration values to different messages. For example, we could increment TTL for every new message failure. First time the message fails, we would set TTL to 5 minutes, second time to 15 minutes etc. We can’t rely on default settings and RabbitMQ properties, and we need to use our java / spring knowledge to accomplish this behavior.
If we dig beneath the surface of the spring-amqp project, and its RabbitTemplate abstraction, we will see that message requeuing is called message recovering, and it is done with the help of two beans. The first one is RepublishMessageRecoverer, so we need to extend it:
… and to define the bean from it:
There are a few important things here. First, we are setting TTL of an individual message using the setExpiration(ms) method. In this part of code we can set any arbitrary RabbitMQ header for additional processing when a message returns from B to A. Second, when creating our CustomMessageRecoverer instance, we need to set dead-letter exchange to which the failed message will be sent. Third, we need to set an errorRoutingKeyPrefix to the empty string. The reason is simple: we want that failed message returns to original queue after TTL expire (queue A), and errorRoutingKeyPrefix is just a prefix that will be added to this queue’s name.
We also need to set our recoverer as default:
Now, spring-amqp will use our custom message recoverer every time message processing fails due to any reason.
By Almir Pehratovic, Senior Software Engineer / Team Leader at Infobip