Rabbitmq延迟队列实现

Rabbitmq延迟队列实现

工作中很多场景需要用到定时任务、延迟任务,常用的方法用crontab job、Spring的Quartz,然后扫描整张数据库表,判断哪些数据需要处理。控制的粒度没办法做到特定数据上。
后来就想到了Rabbitmq,Rabbitmq本来不没有延迟队列的功能,但是有个[Dead Letter Exchange](https://www.rabbitmq.com/dlx.html)功能。
DLX是指队列中的消息在下面几种情况下会变为死信(dead letter),然后会被发布到另一个exchange中。
  • 在requeue=false的情况系,消息被client reject
  • 消息过期
  • 队列长度超过限制
有了DLX,就可以将需要延迟的操作设置下次执行时间(如消息的TTL时间)放入一个存储队列中,消息过期后会经由DLX进入监听的队列中。有消费方进行相关的操作,结束或者再次进入存储队列中。
Spring AMQP实现
Configuration:
<rabbit:connection-factory id="rabbitMQConnectionFactory" requested-heartbeat="" host="${rabbit.host}"                            port="${rabbit.port}"                            username="${rabbit.username}" password="${rabbit.password}" publisher-confirms="true"                            channel-cache-size="10"/> <rabbit:admin connection-factory="rabbitMQConnectionFactory"/>
<!--声明延时队列--> <rabbit:queue id="delayQueue" name="${rabbit.tracking.no.pre.track.delay.queue}">     <rabbit:queue-arguments>         <entry key="x-dead-letter-exchange" value="tracking_dead_exchange"/>     </rabbit:queue-arguments> </rabbit:queue> <!--声明监听队列--> <rabbit:queue id="preTrackingQueue" name="${rabbit.tracking.no.pre.track.queue}"/>
<!--声明DLX--> <rabbit:topic-exchange name="tracking_dead_exchange">     <rabbit:bindings>         <rabbit:binding pattern="#" queue="${rabbit.tracking.no.pre.track.queue}"/>     </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:listener-container connection-factory="rabbitMQConnectionFactory"                            concurrency="1"                            prefetch="1"                            acknowledge="auto"                            message-converter="jackson2JsonMessageConverter">     <rabbit:listener ref="trackingListener" method="handleMessage" queues="preTrackingQueue"/> </rabbit:listener-container>
Code:
@Component public class TrackingListener{ private Logger LOGGER = LoggerFactory.getLogger(TrackingListener.class);     @Autowired     private MessageConverter jackson2JsonMessageConverter;     @Autowired     private RabbitTemplate rabbitTemplate;     @Autowired     private Queue delayQueue;     public void handleMessage(TrackingMessage trackingMessage){ LOGGER.info("In Function: TrackingListener.handleMessage(trackingMessage={})", new Object[]{trackingMessage});         String expiration = 60*60*1000 "";         if(trackingMessage.getRecordCount() == && trackingMessage.getStatus() == 0){ //更新运单及订单状态             trackingMessage.setStatus(1);         } MessageProperties messageProperties = new MessageProperties();         messageProperties.setExpiration(expiration)//one hour         messageProperties.setTimestamp(new Date());         rabbitTemplate.send(delayQueue.getName()jackson2JsonMessageConverter.toMessage(trackingMessagemessageProperties));         LOGGER.info("End Function: TrackingListener.handleMessage()");

(转载本站文章请注明作者和出处乱世浮生,请勿用于任何商业用途)

comments powered by Disqus

目录