Rabbitmq延迟队列实现
工作中很多场景需要用到定时任务、延迟任务,常用的方法用crontab job、Spring的Quartz,然后扫描整张数据库表,判断哪些数据需要处理。控制的粒度没办法做到特定数据上。
后来就想到了Rabbitmq,Rabbitmq本来不没有延迟队列的功能,但是有个[Dead Letter Exchange](https://www.rabbitmq.com/dlx.html)功能。
DLX是指队列中的消息在下面几种情况下会变为死信(dead letter),然后会被发布到另一个exchange中。
- 在requeue=false的情况系,消息被client reject
- 消息过期
- 队列长度超过限制
有了DLX,就可以将需要延迟的操作设置下次执行时间(如消息的TTL时间)放入一个存储队列中,消息过期后会经由DLX进入监听的队列中。有消费方进行相关的操作,结束或者再次进入存储队列中。
Spring AMQP实现
Configuration:
<rabbit:connection-factory id="rabbitMQConnectionFactory" requested-heartbeat="" host="${rabbit.host}"
port="${rabbit.port}"
username="${rabbit.username}" password="${rabbit.password}" publisher-confirms="true"
channel-cache-size="10"/>
<rabbit:admin connection-factory="rabbitMQConnectionFactory"/>
<!--声明延时队列-->
<rabbit:queue id="delayQueue" name="${rabbit.tracking.no.pre.track.delay.queue}">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="tracking_dead_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--声明监听队列-->
<rabbit:queue id="preTrackingQueue" name="${rabbit.tracking.no.pre.track.queue}"/>
<!--声明DLX-->
<rabbit:topic-exchange name="tracking_dead_exchange">
<rabbit:bindings>
<rabbit:binding pattern="#" queue="${rabbit.tracking.no.pre.track.queue}"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="rabbitMQConnectionFactory"
concurrency="1"
prefetch="1"
acknowledge="auto"
message-converter="jackson2JsonMessageConverter">
<rabbit:listener ref="trackingListener" method="handleMessage" queues="preTrackingQueue"/>
</rabbit:listener-container>
Code:
@Component
public class TrackingListener{
private Logger LOGGER = LoggerFactory.getLogger(TrackingListener.class);
@Autowired
private MessageConverter jackson2JsonMessageConverter;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue delayQueue;
public void handleMessage(TrackingMessage trackingMessage){
LOGGER.info("In Function: TrackingListener.handleMessage(trackingMessage={})", new Object[]{trackingMessage});
String expiration = 60*60*1000 + "";
if(trackingMessage.getRecordCount() == 2 && trackingMessage.getStatus() == 0){
//更新运单及订单状态
trackingMessage.setStatus(1);
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration(expiration); //one hour
messageProperties.setTimestamp(new Date());
rabbitTemplate.send(delayQueue.getName(), jackson2JsonMessageConverter.toMessage(trackingMessage, messageProperties));
LOGGER.info("End Function: TrackingListener.handleMessage()");