springboot整合rabbitmq实现消息队列
- 发表于:2023-03-05 12:39
- 阅读(62)
有关rabbitmq的安装和配置这里就不说了,请自行百度。
主要整合步骤如下:
一、springboot引入rabbitmq
(1)pom.xml
引入依赖,如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.local</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>DEMO工程</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 引入rabbitmq依赖,坑:还以为是springboot版本问题还是少了依赖,还是版本之间不兼容,老是报连接超时异常,结果发现是本地的rabbitmq服务没安装好!!! -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
(2)配置 application.properties
server.port=8080
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
注意我这里使用root用户,你可以在rabbitmq的后台去新增这个用户并分配权限,我这里使用虚拟主机地址直接/即可
OK,到这里消息队列就整合完成了。
二、简单使用
(1)配置交换机、队列
1、直连交换机配置:
package com.local.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置直连型交换机<br/>
*
* 简介:
* 直连型交换机,交换机绑定了很多路由键不同的队列,根据消息携带的路由键将消息投递给对应队列;
* 大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key;
* 然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列;
*
* 问题:那么直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?
* 结论:可以看到是实现了轮询的方式对消息进行消费,而且不存在重复消费。
*/
@Configuration
public class DirectConfig {
/**
* 队列
*
* @return
*/
@Bean
public Queue queue() {
// durable:是否持久化,默认是true,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除
// 一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("order-message-queue-direct", true);
}
/**
* Direct交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("order-message-direct", true, true);
}
/**
* 绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
*
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with("order-message-routing");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
2、扇形交换机配置:
package com.local.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置扇型交换机<br/>
*
* 简介:
* 扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列
*/
@Configuration
public class FanoutConfig {
@Bean
public Queue queueA() {
return new Queue("order-message-queue-fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("order-message-queue-fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("order-message-queue-fanout.C");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("order-message-fanout");
}
/**
* 创建三个队列 :fanout.A fanout.B fanout.C
* 将三个队列都绑定在交换机 fanoutExchange 上
* 因为是扇型交换机, 路由键无需配置,配置了也不起作用
*/
@Bean
public Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
3、主题交换机配置:
package com.local.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置主题型交换机<br/>
*
* 简介:
* 主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
* 简单地介绍下规则:
*
* 1、* (星号) 用来表示一个单词 (必须出现的)
* 2、# (井号) 用来表示任意数量(零个或多个)单词
* 通配的绑定键是跟队列进行绑定的,举个小例子
* 队列Q1 绑定键为 *.TT.*
* 队列Q2绑定键为 TT.#
* 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
* 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
*
* 主题交换机是非常强大的,为啥这么膨胀?
* 当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
* 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有直连交换机的行为。
* 所以主题交换机也就实现了扇形交换机的功能和直连交换机的功能。
*/
@Configuration
public class TopicConfig {
private static final String FIRST_QUEUE = "order-message-queue-topic.first";
private static final String SECOND_QUEUE = "order-message-queue-topic.second";
@Bean
public Queue firstQueue() {
return new Queue(FIRST_QUEUE);
}
@Bean
public Queue secondQueue() {
return new Queue(SECOND_QUEUE);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("order-message-topic");
}
@Bean
public Binding bindingFirstQueueToExchange() {
// 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
// 这样只要是消息携带的路由键是order-message-queue.first,才会分发到该队列
return BindingBuilder.bind(firstQueue()).to(topicExchange()).with(FIRST_QUEUE);
}
@Bean
public Binding bindingSecondQueueToExchange() {
// 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以order-message-queue.开头,都会分发到该队列
return BindingBuilder.bind(secondQueue()).to(topicExchange()).with("order-message-queue-topic.#");
}
}
(2)生产者生产队列消息
package com.local.demo.controller;
import lombok.AllArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@AllArgsConstructor
public class TestController {
/**
* 使用RabbitTemplate,这提供了接收/发送等等方法
*/
private final RabbitTemplate rabbitTemplate;
/**
* 测试直连交换机
*
* @return
*/
@GetMapping("/direct/sendOrderMessage")
public String sendOrderMessageByDirect() {
// 将消息携带绑定键值:order-message-routing,发送到自连型交换机:order-message-direct
rabbitTemplate.convertAndSend("order-message-direct", "order-message-routing", "【直连交换机】测试发送订单消息");
return "SUCCESS";
}
/**
* 测试主题交换机
*
* @return
*/
@GetMapping("/topic/sendOrderMessage")
public String sendOrderMessageByTopic(@RequestParam String routingKey) {
// 如果路由键是:order-message-queue-topic.second,可以看到两个监听消费者只有OrderMessageTopicSecondReceiver成功消费到了消息
// 如果路由键是:order-message-queue-topic.first,可以看到两个监听消费者receiver都成功消费到了消息,因为这两个recevier监听的队列的绑定键都能与这条消息携带的路由键匹配上
rabbitTemplate.convertAndSend("order-message-topic", routingKey, "【主题交换机】测试发送订单消息,路由键:" + routingKey);
return "SUCCESS";
}
/**
* 测试扇型交换机
*
* @return
*/
@GetMapping("/fanout/sendOrderMessage")
public String sendOrderMessageByFanout() {
rabbitTemplate.convertAndSend("order-message-fanout", null, "【扇型交换机】测试发送订单消息");
return "SUCCESS";
}
}
(3)消费者消费队列消息
1、监听直连交换机绑定的队列:order-message-queue-direct
package com.local.demo.mq.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听的队列名称
@RabbitListener(queues = "order-message-queue-direct")
public class DirectReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("================ 监听到订单消息 ================");
System.out.println("OrderMessageDirectReceiver消费该订单消息,消息内容:" + message);
}
}
2、监听扇形交换机绑定的队列:
监听队列:order-message-queue-fanout.A
package com.local.demo.mq.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听的队列名称
@RabbitListener(queues = "order-message-queue-fanout.A")
public class FanoutAReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("================ 监听到订单消息 ================");
System.out.println("FanoutAReceiver消费该订单消息,消息内容:" + message);
}
}
监听队列:order-message-queue-fanout.B
package com.local.demo.mq.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听的队列名称
@RabbitListener(queues = "order-message-queue-fanout.B")
public class FanoutBReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("================ 监听到订单消息 ================");
System.out.println("FanoutBReceiver消费该订单消息,消息内容:" + message);
}
}
监听队列:order-message-queue-fanout.C
package com.local.demo.mq.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听的队列名称
@RabbitListener(queues = "order-message-queue-fanout.C")
public class FanoutCReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("================ 监听到订单消息 ================");
System.out.println("FanoutCReceiver消费该订单消息,消息内容:" + message);
}
}
3、监听主题交换机绑定的队列:
监听队列:order-message-queue-topic.first
package com.local.demo.mq.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听的队列名称
@RabbitListener(queues = "order-message-queue-topic.first")
public class TopicFirstReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("================ 监听到订单消息 ================");
System.out.println("TopicFirstReceiver消费该订单消息,消息内容:" + message);
}
}
监听队列:order-message-queue-topic.second
package com.local.demo.mq.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 监听的队列名称
@RabbitListener(queues = "order-message-queue-topic.second")
public class TopicSecondReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("================ 监听到订单消息 ================");
System.out.println("TopicSecondReceiver消费该订单消息,消息内容:" + message);
}
}
启动类:
package com.local.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 案例工程
*/
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
/**
* 总结:
*
* 1、springboot整合rabbitmq
* - 在pom引入amqp依赖:
* <dependency>
* <groupId>org.springframework.boot</groupId>
* <artifactId>spring-boot-starter-amqp</artifactId>
* </dependency>
* - 定义交换机和队列,参考:DirectConfig、FanoutConfig、TopicConfig
* - 生产者发送消息到交换机,参考:TestController,由交换机自己决定具体发送给哪一个队列
* - 消费者监听并消费消息队列,参考:DirectReceiver、FanoutAReceiver、FanoutBReceiver、FanoutCReceiver、TopicFirstReceiver、TopicSecondReceiver
*/
}
参考:
Springboot 整合RabbitMq ,用心看完这一篇就够了
RabbitMQ介绍(详细)
Spring Cloud Stream RabbitMQ详解
springCloudStream集成rabbitmq
相关推荐
-
mac安装subversion,并使用svn命令检出服务器上的代码库项目
mac安装svn只要通过Homebrew安装即可,不需要下载额外的安装包手动安装,Homebrew类似一个软件库,我们可以通过brew命令实现一键下载并安装我们所需要的常用软件。
-
Java如何获取泛型类T的Class
我们平时在封装接口或抽象类的时候经常会用到Java的泛型,经常会在传入一个泛型类T,然后封装一些抽象的方法,泛型的好处就是在编译的时候检查类型安全,并且所有的强制类型转换都是隐式和自动的,这样可以提高代码的通用性。但是我们有时候需要获取泛型类的Class,那可以如何获取到呢?
-
springboot项目事务报错:Transaction synchronization is not active
这几天在使用spring声明式事务的时候突然报了一个错误:Transaction synchronization is not active,之前使用的都是好好的,为什么这次就不行了呢?不就是加一个 @Transactional 而已嘛???
-
mysql应该如何在where语句中添加if语句进行条件判断?where if 语句应该如何使用
我们在平时的项目开发中,有时候会遇到复杂一点的需求,需要我们手动编写复杂的SQL语句,并且有时候需要根据每条表记录的实际情况进行判断,根据每条记录动态添加不同的where条件,这个时候我们就可以在where语句中使用if语句进行条件判断,那么where if应该如何正确使用呢?
-
关于websocket多节点分布式问题的解决方案
websocket是一种在单个TCP连接上进行双全工通信的协议,使用websocket,我们可以实现服务端主动向各个订阅消息通道的客户端推送消息。这点比传统的http轮询请求要更好一点,避免一些无用的请求,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
-
mac如何把文件压缩成tar、zip以及如何解压tar、zip?
有时候我们需要把文件压缩成一个tar文件或zip文件,发送给别人,那么在macos系统应该如何压缩和解压缩呢?
-
openjdk和jdk有什么区别,应该如何选择?
我们一开始学习java的时候,安装的都是从sun官网或oracle官网下载的jdk安装包,但其实还有另外一个来源可以获取到jdk安装包,那就是openjdk,它和jdk基本一样,推荐使用openjdk。
-
如果git仓库发生变更,IDEA如何直接修改git远程仓库地址?
有时候我们整理远程仓库代码的时候,会修改远程仓库的名称,或者所属分组,这个时候在IDEA由于还是使用原先拉取的旧仓库地址,导致本地代码会提交不了,也更新不了远程最新代码,那么这个时候要如何修改IDEA当前的git远程仓库地址呢?如何无缝修改,修改完之后就能和原来一样更新提交,并且以前的提交记录也保留呢?
-
微信公众号自定义菜单报错:no permission to use weapp in menu rid:xxxxxxx
昨晚公司系统添加微信公众号菜单突然报错:{"errcode":45064,"errmsg":"no permission to use weapp in menu rid: 60311f70-0736ff08-29143906"}
-
Java如何使用stream流对List列表数据进行自定义排序
我们一般做排序功能都是通过在mysql数据库中的表中定义好排序字段,然后使用升序或降序来进行排序,复杂一点的话就配合多个排序字段进行排序,但是如果碰到那种无法使用表的字段进行排序的情况,我们需要先从数据库中取出列表数据,然后再通过业务代码对列表进行排序,这个时候我们就可以使用redis或Java的stream流。
-
微信企业付款到零钱报错:此请求可能存在风险,已被微信拦截
具体错误信息:com.github.binarywang.wxpay.exception.WxPayException: 返回代码:[SUCCESS],返回信息:[支付失败],结果代码:[FAIL],错误代码:[NO_AUTH],错误详情:[此请求可能存在风险,已被微信拦截。]
-
springboot项目使用@Transactional注解如何避免长事务问题
在springboot项目中,我们开启事务是非常简单的,使用注解的方式就是在需要开启事务的方法上添加@Transactional,这样就可以实现这个方法里面的所有操作和调用方法的操作都绑定在一个事务上面,要么全部一起执行成功,要么全部一起执行失败,如果其中有某个地方抛了异常,则整个方法涉及的事务操作都会回滚,但是如果随意滥用@Transactional,又有可能引发长事务问题,导致数据库死锁、数据库连接池占满等问题。
-
css实现“展开阅读全文”功能
最近发现很多博客网站,资讯网站喜欢把资讯博文,内容等这些大文本的信息在页面显示的时候都会有个“展开阅读全文”的按钮,点击这个按钮即可展开显示所有的内容,不然一开始就显示那么长的篇幅相对来说既不美观,又对用户体验不好。现在就让我来仿照这类网站实现一个“展开阅读全文”功能。这里主要用到的前端技术是html+jquery+css,只做展开功能,没做收起功能(收起功能没必要吧,谁会去收起呀???)。
-
关于编程中面向对象的理解,什么是面向对象
面向对象设计相对于结构化程序设计可以说是一种更优秀的程序设计方法。它的基本思想是使用类、对象、继承、封装、消息等基本概念进行程序设计。它是从现实世界中客观存在的事物(即对象)出发来构造软件系统,并在软件系统构造中尽可能运用人类的自然思维方式,强调直接以现实世界中的事物(即对象)为中心来思考,认识问题。
-
我的linux操作命令总结,记录常用linux操作命令
平常本地开发项目使用的系统基本都是window系统,而且都是图形化操作,非常方便,window也是越做越好了,项目部署到生产环境一般都是选择linux系统(当然window server系列也可以),而linux一般则选择centOS居多,这里记录一下linux常用命令,以免老是过几天就忘了,后续不断补充。