使用延迟队列的场景很多,比较常见的就是:下单30分钟未支付,取消订单。市场上流行的MQ中目前好像只有RocketMQ原生支持延迟队列,但是它的开源版本,延迟消息,延迟时间是固定的,只有几个选项,非常不灵活。
得益于Rabbitmq的TTL队列以及死信交换机机制,可以实现一个延迟队列。
TTL队列 & 死信交换机
TTL队列
TTL队列,顾名思义,在创建一个队列的时候可以给队列设置一个TTL时间。这个时间表示这个队列中消息的最大存活时间。如果消息在队列中存活的时间超过了设置值,还没被消费,就会被丢弃。
Map<String, Object> properties = new HashMap<>();
properties.put("x-message-ttl", TimeUnit.HOURS.toMillis(1)); // ttl 设置为 1小时
channel.queueDeclare("myqueue", false, false, false, properties);
死信交换机
死信交换机,当一个消息变成死信的时候,就会尝试把它投递到指定的交换机。这个指定的交换机就叫做:死信交换机
消息变成死信的场景
- 消息被消费者拒绝消费,并且设置了 requeue 为 false
- 消息TTL到期
- 队列达到了最大长度
Map<String, Object> properties = new HashMap<>();
properties.put("x-dead-letter-exchange", "my-queue-dead-exchange"); // 指定队列的死信交换机
properties.put("x-dead-letter-routing-key" , "dlx-routing-key"); // 把死信投递到交换机的时候,设置的路由KEY(这个配置不是必须的)
channel.queueDeclare("myqueue", false, false, false, properties);
如果理解了以上2点,那么就很好理解这么实现一个死信队列了。
- 创建一个TTL队列,给这个队列设置一个死信交换机,并且不要给这个队列设置任何消费者
- 创建一个正常队列,设置消费者,专门用于消费TTL到期的消息
- 以上2个队列都可以绑定在同一个交换机,通过不同的route_key来路由消息
消息投递到TTL队列,不同延迟的TTL队列通过不同的route_key区分,因为没有消费者,就会等待过期,一旦过期就变成了死信,那么就会进如死信交换机,并且添加上指定的route_key。专门监听消费到期消息的消费者就会消费到这条延迟消息。
消息生产者
创建2个TTL队列,一个延迟5秒,一个延迟10秒。分别投递5条消息。
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String DELAY_EXCHANGE = "delay_exchange";
public static final String DELAY_05_SECONDS_QUEUE = "delay_05_seconds_queue";
public static final String DELAY_10_SECONDS_QUEUE = "delay_10_seconds_queue";
public static final String EXPIRATION_QUEUE = "expiration_queue";
public static final String EXPIRATION_QUQUE_ROUTE = "expiration";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, false, false, false, null);
// 声明延迟消费队列
channel.queueDeclare(EXPIRATION_QUEUE, false, false, false, null);
channel.queueBind(EXPIRATION_QUEUE, DELAY_EXCHANGE, EXPIRATION_QUQUE_ROUTE);
// 声明05秒延迟队列
Map<String, Object> properties = new HashMap<>();
properties.put("x-dead-letter-exchange", DELAY_EXCHANGE);
properties.put("x-message-ttl", TimeUnit.SECONDS.toMillis(5));
properties.put("x-dead-letter-routing-key" , EXPIRATION_QUQUE_ROUTE);
channel.queueDeclare(DELAY_05_SECONDS_QUEUE, false, false, false, properties);
channel.queueBind(DELAY_05_SECONDS_QUEUE, DELAY_EXCHANGE, "05"); // 5 秒
// 声明10秒延迟队列
properties = new HashMap<>();
properties.put("x-dead-letter-exchange", DELAY_EXCHANGE);
properties.put("x-message-ttl", TimeUnit.SECONDS.toMillis(10));
properties.put("x-dead-letter-routing-key" , EXPIRATION_QUQUE_ROUTE);
channel.queueDeclare(DELAY_10_SECONDS_QUEUE, false, false, false, properties);
channel.queueBind(DELAY_10_SECONDS_QUEUE, DELAY_EXCHANGE, "10"); // 10 秒
for (int i = 0; i < 5; i ++ ) {
channel.basicPublish(DELAY_EXCHANGE, "05", true, false, null,
("[05秒]我是第" + (i + 1) + "条消息[" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) + "]").getBytes());
}
for (int i = 0; i < 5; i ++ ) {
channel.basicPublish(DELAY_EXCHANGE, "10", true, false, null,
("[10秒]我是第" + (i + 1) + "条消息[" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) + "]").getBytes());
}
channel.close();
connection.close();
}
}
消息消费者
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Consumer {
public static final String DELAY_EXCHANGE = "delay_exchange";
public static final String EXPIRATION_QUEUE = "expiration_queue";
public static final String EXPIRATION_QUQUE_ROUTE = "expiration";
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(10);
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, false, false, false, null);
// 声明延迟消费队列
channel.queueDeclare(EXPIRATION_QUEUE, false, false, false, null);
channel.queueBind(EXPIRATION_QUEUE, DELAY_EXCHANGE, EXPIRATION_QUQUE_ROUTE);
channel.basicConsume(EXPIRATION_QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
System.out.println("[" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) +"] 消费消息:"
+ new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
countDownLatch.countDown();
}
});
countDownLatch.await();
channel.close();
connection.close();
}
}
测试
先启动消费者,阻塞,进入等待消费状态。然后启动消费生产者,基本是秒发送完毕10条消息,然后就会退出。在短暂延迟后就可以看到消费者的控制台输出。
一共10条日志,延迟5秒消费的5条,以及延迟10秒消费的10条。准确无误。
[2021-07-13 00:42:27] 消费消息:[05秒]我是第1条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第2条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第3条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第4条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第5条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第1条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第2条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第3条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第4条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第5条消息[2021-07-13 00:42:22]