1、前言
上一篇中归纳了rabbitmq比较基本的使用,如最简单的消息发送和多个队列的负载均衡。rabbit mq中还提供了另外几种队列方式。这里主要是依赖于不同类型的交换机来实现的,在rabbitmq中如果你没有引用交换机,那么消息的生产者和消费者是通过队列直接关联,相对来说耦合性会强一点,同时功能的拓展性也相对受限。这里的exchange主要有4中类型。分别是fanout、topic、headers、fanout四种。
2、直传类型
这个属于最简单的mq配合交换器的使用方式,通过交换器将生产者和消费者隔开,消费者只知道它需要把消息投递到哪个交换器上;生产者只知道从哪个交换器上获取消息。
在spring boot中可以通过bean来管理这些消息队列的组件。
@Configuration
public class DirectQueueConfig {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("express.item");
}
@Bean
public Queue directQueueOne(){
return new Queue("clothes",false,false,true);
}
@Bean
public Queue directQueueTwo(){
return new Queue("cup",false,false,true);
}
@Bean
public Binding directBingOne(DirectExchange directExchange,Queue directQueueOne){
return BindingBuilder.bind(directQueueOne)
.to(directExchange)
.with("normal");
}
@Bean
public Binding directBingTwo(DirectExchange directExchange,Queue directQueueTwo){
return BindingBuilder.bind(directQueueTwo)
.to(directExchange)
.with("fragile");
}
@Bean
public DirectListener directListener(){
return new DirectListener();
}
}
比如上面的代码片段分别创建了交换器、队列、绑定关系、消费者。
声明队列的时候有几个参数可选。
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, (Map)null);
}
name: 队列的名称
durable: 是否能持久化,队列在broker重启后是否能够恢复;
exclusive:排他性,这个可能使用的比较少但是是一个比较有特点的参数。根据文档的描述,当声明一个排他队列时,这个队列只能被一个连接所使用的,同时无论你是否设置了自动autodelete属性,这个排他队列都会在连接关闭时被自动删除;
auto-delete: 自动删除,这个比较好理解,当没有消费者订阅这个队列时,改队列会被自动删除
@Slf4j
public class DirectListener {
@RabbitListener(queues = "#{directQueueOne.name}")
public void normalListener(String message){
log.info("normal express:[{}]",message);
}
@RabbitListener(queues = "#{directQueueTwo.name}")
public void fragileListener(String message){
log.info("fragile express:[{}]",message);
}
}
这里声明了一个消费者,用到了spring中的一个注解@RabbitListener,最简单的用法就是在这里声明监听的队列名称,这个地方是可以使用SPEL表达式的,这样可以参数化变量,达到解耦的效果,也可以直接写字符串来声明。
public void directSendMessage(String message, String routingKey) {
LocalDateTime now = LocalDateTime.now();
log.info("producer direct time:[{}]", now.toString());
rabbitTemplate.convertAndSend(directExchange.getName(), routingKey, message);
log.info("producer direct message:[{}]", message);
}
发送消息可以使用上述的代码片段使用spring中配置好的rabbitTemplate,convertAndSend方法的三个参数代表发送到哪个交换器,使用哪个路由键(就是之前绑定队列和交换器时使用的key,这样消息就知道投递到哪个队列中),最后一个是投递的消息。这样一个完整的直传消息就结束
3、广播类型
这个也是比较常用的一种交换器模式,也就是将消息投递到所有订阅的队列中,让所有监听队列的消费者去消费队列。
@Configuration
public class FanoutQueueConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("express.fanout");
}
@Bean
public Queue fanoutQueueOne(){
return new Queue("fanout1",false,false,true);
}
@Bean
public Queue fanoutQueueTwo(){
return new Queue("fanout2",false,false,true);
}
@Bean
public Binding bindingOne(FanoutExchange fanoutExchange,Queue fanoutQueueOne){
return BindingBuilder.bind(fanoutQueueOne)
.to(fanoutExchange);
}
@Bean
public Binding bindingTwo(FanoutExchange fanoutExchange,Queue fanoutQueueTwo){
return BindingBuilder.bind(fanoutQueueTwo)
.to(fanoutExchange);
}
@Bean
public FanoutListener fanoutListener(){
return new FanoutListener();
}
}
这里的代码基本和直传的类型没有太多区别,唯一就是使用了Fanout的交换器
消费者的代码片段如下
@Slf4j
public class FanoutListener {
@RabbitListener(queues = "#{fanoutQueueOne.name}")
public void fanoutListenerOne(String msg){
log.info("fanoutQueue1 receive:[{}],time:[{}]",msg, LocalDateTime.now());
}
@RabbitListener(queues = "#{fanoutQueueTwo.name}")
public void fanoutListenerTwo(String msg){
log.info("fanoutQueue2 receive:[{}],time:[{}]",msg,LocalDateTime.now());
}
}
4、主题类型
这个应该是使用最广泛的一种交换器的类型了。和前面两种的区别在于多了一个特殊的路由键,那就意味着我可以自主选择哪些队列需要接受我的投递。路由键是连接队列和交换器之间的一个标记参数。主题交换器中的路由键有一定的规则,它需要由一些单词通过点拼接而成。另外这里加入了两个特殊字符用于匹配:
:星号代表能被单个词替代,举例路由键是orange. .max那么orange.blue.max可以匹配而orange.max不能匹配;
#:井号代表能被任意个词替代(0个和多个都可以),举例路由键是orange.*.max那么orange.blue.max可以匹配,orange.max可以匹配,orange.blue.red.max可以匹配。
@Configuration
public class TopicQueueConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("express.topic");
}
@Bean
public Queue topicQueueOne(){
return new Queue("food",false,false,true);
}
@Bean
public Queue topicQueueTwo(){
return new Queue("appliance",false,false,true);
}
@Bean
public Queue topicQueueThree(){
return new Queue("fruit",false,false,true);
}
@Bean
public Binding topicBindingOne(TopicExchange topicExchange,Queue topicQueueOne){
return BindingBuilder.bind(topicQueueOne)
.to(topicExchange)
.with("food.delicatessen");
}
@Bean
public Binding topicBindingTwo(TopicExchange topicExchange,Queue topicQueueTwo){
return BindingBuilder.bind(topicQueueTwo)
.to(topicExchange)
.with("appliance.*.bigSize");
}
@Bean
public Binding topicBindingThree(TopicExchange topicExchange,Queue topicQueueThree){
return BindingBuilder.bind(topicQueueThree)
.to(topicExchange)
.with("fruit.#");
}
@Bean
public TopicListener topicListener(){
return new TopicListener();
}
}
这里创建了主题交换器,队列和消费者。用点拼接的路由键其实可以认为是构建了一个树形数据结构,很方便的达到了消息分别投递的目的。
@RabbitListener(queues = "#{topicQueueOne.name}")
public void foodListener(String message) {
log.info("receive delicatessen food:[{}]", message);
}
@RabbitListener(queues = "#{topicQueueTwo.name}")
public void applianceListener(String message) {
log.info("receive big appliance:[{}]", message);
}
@RabbitListener(queues = "#{topicQueueThree.name}")
public void fruitListener(String message) {
log.info("receive any fruit:[{}]", message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "web_queue", durable = "false", autoDelete = "true", exclusive = "false"),
exchange = @Exchange(value = "web", type = ExchangeTypes.TOPIC),
key = {"web.info"}
))
public void webListener(String message) {
log.info("web info:[{}]", message);
}
这里提供了两种创建方式,一种是之前使用过的通过RabbitListerner注解声明监听队列的名称来监听消费消息。另一种是通过这个注解同时完成队列,交换器,消费者三者的创建,同时声明了绑定的路由键。这样一个主题交换器的例子就配置完成了
-----------------
作者:君不见月
链接:spring boot 整合rabbitmq 不同类型队列的案例 - 掘金