由于搭建集群的时候,3 台虚拟机都在前台开启了 Kafka,占用了窗口,所以我这里再开启两个窗口,分别用来操作 Kafka 和 Zookeeper。
下面的操作都在这两个窗口上进行。
如果把 Kafka 比作 MySQL 的话,那么主题就相当于数据库,所以想用 Kafka,就必须先创建主题,就好比我们想用 MySQL,就必须先创建数据库一样。
使用这个命令创建主题:
/usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.197.6:2181 --replication-factor 3 --partitions 3 --topic test01
关于这条命令的解释:
–zookeeper:指定 zookeeper 集群的 ip 地址和端口
–topic:创建名为 test01 的主题
–partitions:test01 这个主题分配 3 个分区
–replication-factor:每个分区分配 3 个副本,也就是每个分区备份 3 份
主题在 Zookeeper 中是如何体现的呢,我们进入 Zookeeper 看一下。
使用命令 ls /brokers/topics
,可以看到刚才创建的 test01 主题在 Zookeeper 中已经创建了。
使用命令 /brokers/topics/test01/partitions
,可以看到分区也创建了。
主题只是一个逻辑的概念,在物理上并不存在,消息也不是存在主题里的,而是存在分区里的。分区是 Kafka 消息存储的最小单元,在物理上对应着一个个真实的文件。
我们上面新建了一个 test01 主题,分配了 3 个分区,这 3 个分区其实就是一个个的文件夹。我们可以进入 log.dirs 目录
cd /usr/local/kafka_2.13-2.5.0/logs/
可以看到创建了 3 个文件夹,分别对应 3 个分区,命名规则是 topic名称-N
,N 就是分区的个数。后面生产者发送给 test01 主题的数据都会存在这 3 个文件夹里。
进入 test01 主题的第一个分区看一下
可以看到每个分区文件夹,都包含以下 4 类文件:
我们重点看 .index .log .timeindex 这 3 个文件。
这 3 个文件都是成对出现的,每一对都叫做 segment。也就是说分区中的数据是分段存储,一个分区(partition)被切割成多个相同大小的段(segment)(这个是由 log.segment.bytes 决定,控制每个 segment 的大小)。
segment 文件命名规则:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
如下图所示:
通过上面的内容,我们可以这样总结:
在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,目录的命名规则为 topic名称+有序序号
。第一个序号从 0 开始计,最大的序号为 partition 数量减 1。partition 是实际物理上的概念,而 topic 是逻辑上的概念,topic 更像是一个消息的类别。由于生产者生产的消息会不断追加到 log 文件的末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 将 partition 细分为 segment,一个 partition 物理上由多个 segment 组成。每个 segment 由 3 个文件组成,分别是 index 文件、log 文件和 timeindex 文件,这 3 个文件的命名规则是:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。
分区中的每一条记录都会分配一个 id 号来表示顺序,我们称之为 offset,offset 用来唯一的标识分区中每一条记录。
每一个消费者中唯一保存的元数据是 offset(偏移量)即消费在 log 中的位置。偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据。也可以跳过最近的记录,从 “现在” 开始消费。
特别说明的是,老版本的消费者是依赖 Zookeeper 的,当启动一个消费者时向 Zookeeper 注册。新版本消费者去掉了对 Zookeeper 的依赖,而是由消费组协调器统一管理,已消费的消息偏移量提交后会保存到名为 “__consumer_offsets” 文件夹中。
Kafka 的 offset 是分区内有序的,但是在不同分区中是无序的,Kafka 不保证数据的全局有序。
关于 offset,可以通过以下几个问题来理解:
1.为啥 offset 在分区内是有序的?
因为分区的结构是队列,先进先出,所以保证了有序。
2.为啥在不同分区中是无序的?
因为消息写入哪一个分区不是固定的,有可能第一条消息写入第一个分区,第二条消息写入第二个区分。。。
3.如果有业务场景需要保证数据的有序呢?该如何做?
可以这样做:创建主题的时候只分配一个分区,这样就能保证数据的顺序性了。
但是只分配一个分区,又跟 Kafka 的分布式冲突,所以还可以这样做:
创建主题的时候分配多个分区,读取消息时,在代码层面处理消息的顺序性。
Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。一个 broker 有多个 topic,一个 topic 有多个 partition,一个 partition 有多个 segment。
Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为 message(消息)发送到 Kafka 集群。默认情况下,每行将作为单独的 message 发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。
[root@localhost kafka_2.13-2.5.0]# /usr/local/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 192.168.197.6:9092 --topic test01
>This is a message
>This is another message
>
关于这条命令的解释:
–broker-list:设置 kafka 集群地址
–topic:设置生产消息的主题
Kafka 还有一个命令行 consumer(消费者),将消息转储到标准输出。
/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.12:9092 --topic test01 --from-beginning
This is a message
This is another message
关于这条命令的解释:
–bootstrap-server:设置 kafka 集群地址
–topic:设置消费消息的主题
–from-beginning:设置消息起始位置开始消费。默认是从新位置 latest 开始消费。
消费者使用一个 “消费组” 名称来进行标识,发布到 topic 中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。
如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。
通常情况下,每个 topic 都会有一些消费组,一个消费组对应一个”逻辑订阅者”。一个消费组由许多消费者实例组成,便于扩展和容错。这就是发布和订阅的概念,只不过订阅者是一组消费者而不是单个的进程。
在 Kafka 中实现消费的方式是将分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由 Kafka 协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区。如果一个实例消失或者故障,拥有的分区将被分发到剩余的实例。这被称为重新平衡分组。
Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序。每个 partition 分区按照key值排序足以满足大多数应用程序的需求。但如果你需要总记录在所有记录的上面,可使用仅有一个分区的主题来实现,这意味着每个消费者组只有一个消费者进程。
创建消费者时如果不指定消费组,那么就会默认分配一个,可以通过这个命令查看有哪些消费组。
/usr/local/kafka_2.13-2.5.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.197.6:9092 --list
如图,因为我启动了 4 个消费者,所以有 4 个消费组。由于这 4 个消费者都属于不同的消费组,所以他们都能收到 test01 主题的所有消息。
打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。
消费组 my-consumer-group-01 有 3 个消费者,分别是 my-consumer-01,my-consumer-02,my-consumer-03。
那么,Kafka 会负载均衡的将分区划分给同一消费组里的消费者。
如上图所示,此时,test01-0 分区已经被第一个消费者消费了,那么它就不能被这个组里的其它消费者消费。除非第一个消费者挂了,那么它就会被重新分配给其它的消费者。
我们启动 3 个消费者来亲自感受一下。
使用这个命令可以指定消费者的组。
/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.6:9092 --consumer-property group.id=my-consumer-group-01 --consumer-property client.id=my-consumer-01 --topic test01
--consumer-property group.id=my-consumer-group-01
指定消费者的组是 my-consumer-group-01。
--consumer-property client.id=my-consumer-01
指定消费者的 ID 是 my-consumer-01。
如上图所示,我启动了 3 个消费者,这 3 个消费者都位于同一个组里。然后生产者往 test01 这个主题里发送消息,可以看到:对于同一个消费组而言,同一个时刻,只有一个消费者在消费消息,而且每个消费者消费唯一的分区,不可以重复消费。
打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。
消费组 my-consumer-group-01 有 2 个消费者,分别是 my-consumer-01 和 my-consumer-02。
消费组 my-consumer-group-02 有 1 个消费者,是 my-consumer-03。
如上图所示,my-consumer-01 和 my-consumer-02 位于同一个组,那么它们无法重复消费同一个分区,并且同一个时刻,只能有一个消费者消费。
虽然 test01-0,test01-1,test01-2 这 3 个分区已经被 my-consumer-01 和 my-consumer-02 消费过了,但是 my-consumer-03 位于不同的组,所以其它组对它是不影响的,它是独立的,可以消费其它组消费过的分区。
如上图所示,我启动了 3 个消费者,前两个消费者同一个组,第三个消费者另一个组。
由于 my-consumer-01 和 my-consumer-02 位于同一个组,所以它们只能消费分配给自己的分区,不能重复消费。而且同一个时刻只能有一个消费者消费,所以看起来就像是轮流消费。而 my-consumer-03 位于不同的组,即使分区已经被其它组的消费者消费过了,但是它还是能消费的。而且这个组只有它一个消费者,所以它消费了所有分区的消息。
请准备 3 台虚拟机,以下是我准备的:
Zookeeper 和 Kafka 都依赖 JDK,所以安装 JDK 是必须的。
到官网 https://www.oracle.com/java/technologies/javase-downloads.html 下载 JDK1.8。
下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。
我是用 Git 上传的。
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.6:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.10:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.12:/usr/local/src
接着解压 jdk-8u251-linux-x64.tar.gz 压缩包到 /usr/local 目录。
tar zxvf /usr/local/src/jdk-8u251-linux-x64.tar.gz -C /usr/local/
然后配置每一台虚拟机的环境变量,编辑 profile 文件
vi /etc/profile
在最末尾添加以下内容
# set jdk environmentexport
export JAVA_HOME=/usr/local/jdk1.8.0_251
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
保存 profile 文件,执行生效命令:
source /etc/profile
最后执行 java -version 命令,验证是否安装成功。输出以下内容则说明安装成功:
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)
由于 Kafka 的 broker 的状态、消费者的消费状态、偏移量以及消费者组等是交给 Zookeeper 维护的,所以安装 Kafka 之前必须先安装 Zookeeper。
到官网 https://zookeeper.apache.org/releases.html 下载 Zookeeper,我们下载 3.4.14 版本的。
下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。
我是用 Git 上传的。
scp zookeeper-3.4.14.tar.gz root@192.168.197.6:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.10:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.12:/usr/local/src
接着解压 zookeeper-3.4.14.tar.gz 压缩包到 /usr/local 目录。
tar zxvf /usr/local/src/zookeeper-3.4.14.tar.gz -C /usr/local/
然后配置每一台虚拟机的环境变量,编辑 profile 文件
vi /etc/profile
在最末尾添加以下内容
# set zookeeper environmentexport
export ZK_HOME=/usr/local/zookeeper-3.4.14
export PATH=$ZK_HOME/bin:$PATH
保存 profile 文件,执行生效命令:
source /etc/profile
将 zoo_sample.cfg 文件备份并重命名为 zoo.cfg
cp /usr/local/zookeeper-3.4.14/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.14/conf/zoo.cfg
编辑 zoo.cfg 文件
vi zoo.cfg
修改数据文件夹路径
dataDir=/usr/local/zookeeper-3.4.14/data
添加日志文件夹路径
dataLogDir=/usr/local/zookeeper-3.4.14/logs
在文件末尾添加集群信息
server.1=192.168.197.6:2888:3888
server.2=192.168.197.10:2888:3888
server.3=192.168.197.12:2888:3888
创建数据文件夹
mkdir /usr/local/zookeeper-3.4.14/data
创建日志文件夹
mkdir /usr/local/zookeeper-3.4.14/logs
创建 myid 文件,在 myid 文件中添加本机的 server ID
echo "1" > /usr/local/zookeeper-3.4.14/data/myid
请注意,每台虚拟机的 myid 要跟 server ID 对应。
每台虚拟机都开放以下端口,如果不开放的话,Zookeeper 集群无法与其它节点通信,启动会报错。
# 2181 端口对 client 端提供服务
firewall-cmd --zone=public --add-port=2181/tcp --permanent
# 2888 端口是集群内机器相互通信使用(由 Leader 负责监听此端口)
firewall-cmd --zone=public --add-port=2888/tcp --permanent
# 3888 端口是选举 leader 使用的
firewall-cmd --zone=public --add-port=3888/tcp --permanent
使端口生效
firewall-cmd --reload
使用以下命令分别启动每台虚拟机上的 Zookeeper。
/usr/local/zookeeper-3.4.14/bin/zkServer.sh start
/usr/local/zookeeper-3.4.14/bin/zkServer.sh status
如果输出以下信息,则说明 Zookeeper 集群搭建成功
192.168.197.6:
192.168.197.10:
192.168.197.12:
通过以上图片可以知道,3 台虚拟机上的 Zookeeper 已经成功启起来了。其中 192.168.197.10 节点被选举为 leader 节点,其它两个节点是 follower 节点。每个节点都监听 3888 端口,当 leader 挂掉时,其它节点会通过此端口相互通信,选举出新的 leader。
2888 端口是集群内机器相互通信使用的,由 Leader 负责监听此端口。
到官网 http://kafka.apache.org/downloads 下载 Kafka,我们下载 2.5.0 版本的。
下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。
我是用 Git 上传的。
scp kafka_2.13-2.5.0.tgz root@192.168.197.6:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.10:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.12:/usr/local/src
接着解压 kafka_2.13-2.5.0.tgz 压缩包到 /usr/local 目录。
tar zxvf kafka_2.13-2.5.0.tgz -C /usr/local/
然后配置每一台虚拟机的环境变量,编辑 profile 文件
vi /etc/profile
在最末尾添加以下内容
# set kafka environment
export KAFKA_HOME=/usr/local/kafka_2.13-2.5.0
export PATH=$KAFKA_HOME/bin:$PATH
保存 profile 文件,执行生效命令:
source /etc/profile
编辑配置文件
vi /usr/local/kafka_2.13-2.5.0/config/server.properties
修改配置如下(IP 地址应该根据实际情况填写)
# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=0
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.6:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000
编辑配置文件
vi /usr/local/kafka_2.13-2.5.0/config/server.properties
修改配置如下(IP 地址应该根据实际情况填写)
# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=1
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.10:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000
编辑配置文件
vi /usr/local/kafka_2.13-2.5.0/config/server.properties
修改配置如下(IP 地址应该根据实际情况填写)
# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=2
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.12:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000
然后每个节点都创建 logs 文件夹
mkdir /usr/local/kafka_2.13-2.5.0/logs
每台虚拟机都开放以下端口
# 9092 端口是 Kafka 对 client 端提供服务的端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
使端口生效
firewall-cmd --reload
使用以下命令分别启动每台虚拟机上的 Kafka。
/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.5.0/config/server.properties &
上面那个命令是前台启动,会一直占用窗口,你也可以使用后台启动,命令如下:
/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/server.properties
如果输出以下信息,则说明启动成功
最后来总结以下每台虚拟机安装的内容。
使用这个命令查看服务启动情况:
jps
从上图可以看到安装了 Kafka 和 Zookeeper,并且都启动了。
使用这个命令查看端口占用情况:
netstat -tunlp|egrep "(2181|2888|3888|9092)"
随便在一个节点上,使用以下命名,查看 Zookeeper 的目录情况。
使用客户端进入 Zookeeper
/usr/local/zookeeper-3.4.14/bin/zkCli.sh
使用 ls /
命令查看 Zookeeper 的目录情况。
上面的显示结果中:只有 zookeeper 目录是 Zookeeper 原生的,其它都是 Kafka 创建的。
关于各个目录的作用,后面再说。
至此,Kafka 的集群环境就搭建成功了。
新建一个 Maven 工程,我的工程叫 springboot-activemq-example。
pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/>
</parent>
<groupId>wang.miansen</groupId>
<artifactId>springboot-activemq-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-activemq-example</name>
<description>activemq project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息队列连接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
server.port=8080
# 配置消息的类型,如果是true则表示为topic消息,如果为false表示Queue消息
spring.jms.pub-sub-domain=true
# 连接用户名
spring.activemq.user=admin
# 连接密码
spring.activemq.password=admin
# 连接主机信息
spring.activemq.brokerUrl=tcp://localhost:61616
# 连接池的最大连接数
jms.pool.maxConnections=10
@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {
// 队列1
@Bean(name = "springboot-activemq-queue-1")
public Queue queue1() {
return new ActiveMQQueue("springboot-activemq-queue-1");
}
// 队列2
@Bean(name = "springboot-activemq-queue-2")
public Queue queue2() {
return new ActiveMQQueue("springboot-activemq-queue-2");
}
// 主题1
@Bean(name = "springboot-activemq-topic-1")
public Topic topic1() {
return new ActiveMQTopic("springboot-activemq-topic-1");
}
// 主题2
@Bean(name = "springboot-activemq-topic-2")
public Topic topic2() {
return new ActiveMQTopic("springboot-activemq-topic-2");
}
}
@Component
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Resource(name = "springboot-activemq-queue-1")
private Queue queue1;
@Resource(name = "springboot-activemq-queue-2")
private Queue queue2;
// 发送消息到队列1
public void sendQueueMessage1(final Map<String, Object> mapMessage) {
jmsMessagingTemplate.convertAndSend(queue1, mapMessage);
}
// 发送消息到队列2
public void sendQueueMessage2(final Map<String, Object> mapMessage) {
jmsMessagingTemplate.convertAndSend(queue2, mapMessage);
}
}
@Component
public class QueueConsumer {
// 监听队列1的消息
@JmsListener(destination = "springboot-activemq-queue-1")
public void receiveQueueMessage1(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听队列2的消息
@JmsListener(destination = "springboot-activemq-queue-2")
public void receiveQueueMessage2(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听队列2的消息
@JmsListener(destination = "springboot-activemq-queue-2")
public void receiveQueueMessage3(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
@Component
public class TopicProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Resource(name = "springboot-activemq-topic-1")
private Topic topic1;
@Resource(name = "springboot-activemq-topic-2")
private Topic topic2;
// 发送消息到主题1
public void sendTopicMessage1(final Map<String, Object> mapMessage) {
jmsMessagingTemplate.convertAndSend(topic1, mapMessage);
}
// 发送消息到主题2
public void sendTopicMessage2(final Map<String, Object> mapMessage) {
jmsMessagingTemplate.convertAndSend(topic2, mapMessage);
}
}
@Component
public class TopicConsumer {
// 监听主题1的消息
@JmsListener(destination = "springboot-activemq-topic-1")
public void receiveQueueMessage1(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听主题2的消息
@JmsListener(destination = "springboot-activemq-topic-2")
public void receiveQueueMessage2(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听主题2的消息
@JmsListener(destination = "springboot-activemq-topic-2")
public void receiveQueueMessage3(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
如果要测试主题,那么需要修改配置 spring.jms.pub-sub-domain=true
。
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
@Autowired
private QueueProducer queueProducer;
@Autowired
private TopicProducer topicProducer;
@Test
public void sendQueueMessage1() throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("message", "Hello ActiveMQ,我是队列消息1");
queueProducer.sendQueueMessage1(map);
}
@Test
public void sendQueueMessage2() throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("message", "Hello ActiveMQ,我是队列消息2");
queueProducer.sendQueueMessage2(map);
queueProducer.sendQueueMessage2(map);
queueProducer.sendQueueMessage2(map);
}
@Test
public void sendTopicMessage1() throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("message", "Hello ActiveMQ,我是主题消息1");
topicProducer.sendTopicMessage1(map);
}
@Test
public void sendTopicMessage2() throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("message", "Hello ActiveMQ,我是主题消息2");
topicProducer.sendTopicMessage2(map);
}
}
上面说过,如果想要使用主题,那么需要修改配置文件,这样不方便。我们可以在 ActiveMQConfig 类里同时配置队列和主题。
@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {
@Value("${spring.activemq.brokerUrl}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
@Value("${jms.pool.maxConnections}")
private int maxConnections;
// 队列1
@Bean(name = "springboot-activemq-queue-1")
public Queue queue1() {
return new ActiveMQQueue("springboot-activemq-queue-1");
}
// 队列2
@Bean(name = "springboot-activemq-queue-2")
public Queue queue2() {
return new ActiveMQQueue("springboot-activemq-queue-2");
}
// 主题1
@Bean(name = "springboot-activemq-topic-1")
public Topic topic1() {
return new ActiveMQTopic("springboot-activemq-topic-1");
}
// 主题2
@Bean(name = "springboot-activemq-topic-2")
public Topic topic2() {
return new ActiveMQTopic("springboot-activemq-topic-2");
}
// 真正可以产生连接的工厂
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(user, password, brokerUrl);
}
// ActiveMQ 提供的连接池
@Bean
public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
PooledConnectionFactory pool = new PooledConnectionFactory();
pool.setConnectionFactory(connectionFactory);
pool.setMaxConnections(maxConnections);
return pool;
}
// 队列监听容器
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(PooledConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
return factory;
}
// 主题监听容器
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(PooledConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
// 主题监听容器(持久化订阅)
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable1(
PooledConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
// 持久化订阅
factory.setSubscriptionDurable(true);
// 持久订阅必须指定一个 clientId
factory.setClientId("springboot-activemq-topic-clientId-1");
return factory;
}
// 主题监听容器(持久化订阅)
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable2(
PooledConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
// 持久化订阅
factory.setSubscriptionDurable(true);
// 持久订阅必须指定一个 clientId
factory.setClientId("springboot-activemq-topic-clientId-2");
return factory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(PooledConnectionFactory connectionFactory){
return new JmsMessagingTemplate(connectionFactory);
}
}
@Component
public class QueueConsumer {
// 监听队列1的消息
@JmsListener(destination = "springboot-activemq-queue-1", containerFactory = "jmsListenerContainerQueue")
public void receiveQueueMessage1(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听队列2的消息
@JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
public void receiveQueueMessage2(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听队列2的消息
@JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
public void receiveQueueMessage3(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
@Component
public class TopicConsumer {
// 监听主题1的消息
@JmsListener(destination = "springboot-activemq-topic-1", containerFactory = "jmsListenerContainerTopic")
public void receiveQueueMessage1(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听主题2的消息
@JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable1")
public void receiveQueueMessage2(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 监听主题2的消息
@JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable2")
public void receiveQueueMessage3(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
新建一个 Maven 工程,我的工程叫 springmvc-activemq-example。
pom.xml 文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wang.miansen</groupId>
<artifactId>springmvc-activemq-example</artifactId>
<packaging>war</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>springmvc-activemq-example Maven Webapp</name>
<url>http://maven.apache.org</url>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<!-- 1)Spring核心 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<!-- 2)Spring DAO层 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<!-- 3)Spring web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<!-- 4)Spring test -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<!-- 1)ActiveMQ 核心 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<!-- 2)ActiveMQ与 Spring 集成 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!--3)ActiveMQ所需要的pool包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
</dependencies>
<build>
<finalName>springmvc-activemq-example</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<!-- web.xml版本 -->
<version>3.1</version>
<configuration>
<!-- jdk版本 -->
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
新建队列生产者 QueueProducer 类:
public class QueueProducer {
private JmsTemplate jmsTemplate;
public void sendMessage(final Map<String, Object> mapMessage) {
jmsTemplate.convertAndSend(mapMessage);
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
新建 3 个队列消费者,这是其中一个,其它两个都一样的,就不贴出来了。
public class QueueConsumer1 implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
新建主题生产者 TopicProducer 类:
public class TopicProducer {
private JmsTemplate jmsTemplate;
public void sendMessage(final Topic topic, final Map<String, Object> mapMessage) {
jmsTemplate.convertAndSend(topic, mapMessage);
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
新建 3 个主题消费者,这是其中一个,其它两个都一样的,就不贴出来了。
public class TopicConsumer1 implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
新建 application.properties 文件,配置 ActiveMQ 的相关信息。
# jms服务器地址
jms.broker.url=tcp://localhost:61616
# 连接用户名
jms.broker.userName=admin
# 连接密码
jms.broker.password=admin
# 使用异步发送
jms.broker.useAsyncSend=true
# 连接池的最大连接数
jms.pool.maxConnections=10
新建 application-context.xml 文件,配置各个 Bean。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">
<!-- 读取配置文件 -->
<bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:application.properties</value>
</list>
</property>
</bean>
<!-- 连接工厂 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.broker.url}"/>
<property name="userName" value="${jms.broker.userName}"></property>
<property name="password" value="${jms.broker.password}"></property>
<property name="useAsyncSend" value="${jms.broker.useAsyncSend}" />
</bean>
<!-- 连接池 -->
<!-- ActiveMQ 为我们提供了一个 PooledConnectionFactory,通过往里面注入一个 ActiveMQConnectionFactory
可以用来将 Connection、Session 和 MessageProducer 池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool 包 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<!--真正可以产生连接的连接工厂,由对应的 JMS 服务厂商提供-->
<property name="connectionFactory" ref="activeMQConnectionFactory" />
<!--连接池的最大连接数-->
<property name="maxConnections" value="${jms.pool.maxConnections}"/>
</bean>
<!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
<!-- 队列和非持久化订阅使用 -->
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
<!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
<!-- 持久化订阅使用 -->
<bean id="subscriptionDurableSingleConnectionFactory2" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
<!--持久订阅ID -->
<property name="clientId" value="springmvc-activemq-topic-clientId-2" />
</bean>
<!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
<!-- 持久化订阅使用 -->
<bean id="subscriptionDurableSingleConnectionFactory3" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
<!--持久订阅ID -->
<property name="clientId" value="springmvc-activemq-topic-clientId-3" />
</bean>
<!--队列1-->
<bean id="activeMQQueue1" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 队列的名称 -->
<constructor-arg index="0" value="springmvc-activemq-queue-1" />
</bean>
<!--队列2-->
<bean id="activeMQQueue2" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 队列的名称 -->
<constructor-arg index="0" value="springmvc-activemq-queue-2" />
</bean>
<!--主题1-->
<bean id="activeMQTopic1" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 主题的名称 -->
<constructor-arg index="0" value="springmvc-activemq-topic-1"/>
</bean>
<!--主题2-->
<bean id="activeMQTopic2" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 主题的名称 -->
<constructor-arg index="0" value="springmvc-activemq-topic-2"/>
</bean>
<!-- 队列 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
<bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
<!-- 连接池 -->
<property name="connectionFactory" ref="singleConnectionFactory"/>
<!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
<property name="defaultDestination" ref="activeMQQueue1"/>
<!-- 消息转换器 -->
<property name="messageConverter" ref="simpleMessageConverter" />
<!-- 发送模式,1: 非持久化,2: 持久化 -->
<property name="deliveryMode" value="2" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="false"/>
</bean>
<!-- 主题 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
<bean id="jmsTemplateTopic" class="org.springframework.jms.core.JmsTemplate">
<!-- 连接池 -->
<property name="connectionFactory" ref="singleConnectionFactory"/>
<!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
<property name="defaultDestination" ref="activeMQTopic1"/>
<!-- 消息转换器 -->
<property name="messageConverter" ref="simpleMessageConverter" />
<!-- 发送模式,1: 非持久化,2: 持久化 -->
<property name="deliveryMode" value="2" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="true"/>
</bean>
<!--队列监听容器1,一经注册,自动监听-->
<bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!-- 连接池 -->
<property name="connectionFactory" ref="singleConnectionFactory" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="false" />
<!-- 监听目的地 -->
<property name="destination" ref="activeMQQueue1" />
<!-- 监听类 -->
<property name="messageListener" ref="queueConsumer1" />
</bean>
<!--队列监听容器2,一经注册,自动监听-->
<bean id="queueListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!-- 连接池 -->
<property name="connectionFactory" ref="singleConnectionFactory" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="false" />
<!-- 监听目的地 -->
<property name="destination" ref="activeMQQueue2" />
<!-- 监听类 -->
<property name="messageListener" ref="queueConsumer2" />
</bean>
<!--队列监听容器3,注意目的地跟容器2一样-->
<bean id="queueListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!-- 连接池 -->
<property name="connectionFactory" ref="singleConnectionFactory" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="false" />
<!-- 监听目的地 -->
<property name="destination" ref="activeMQQueue2" />
<!-- 监听类 -->
<property name="messageListener" ref="queueConsumer3" />
</bean>
<!--主题监听容器1(非持久化订阅),一经注册,自动监听-->
<bean id="topicListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!-- 连接池 -->
<property name="connectionFactory" ref="singleConnectionFactory" />
<!-- 监听目的地 -->
<property name="destination" ref="activeMQTopic1" />
<!-- 持久化订阅 -->
<property name="subscriptionDurable" value="false" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="true" />
<!-- 监听类 -->
<property name="messageListener" ref="topicConsumer1" />
</bean>
<!--主题监听容器2(持久化订阅),一经注册,自动监听-->
<bean id="topicListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!-- 连接池 -->
<property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory2" />
<!-- 监听目的地 -->
<property name="destination" ref="activeMQTopic2" />
<!-- 持久化订阅 -->
<property name="subscriptionDurable" value="true" />
<!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
<property name="clientId" value="springmvc-activemq-topic-clientId-2" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="true" />
<!-- 监听类 -->
<property name="messageListener" ref="topicConsumer2" />
</bean>
<!-- 主题监听容器3(持久化订阅),注意目的地跟容器2一样 -->
<bean id="topicListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!-- 连接池 -->
<property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory3" />
<!-- 监听目的地 -->
<property name="destination" ref="activeMQTopic2" />
<!-- 持久化订阅 -->
<property name="subscriptionDurable" value="true" />
<!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
<property name="clientId" value="springmvc-activemq-topic-clientId-3" />
<!-- 是否为发布订阅模式,队列是 false,主题是 true -->
<property name="pubSubDomain" value="true" />
<!-- 监听类 -->
<property name="messageListener" ref="topicConsumer3" />
</bean>
<!-- 队列生产者 -->
<bean id="queueProducer" class="com.example.activemq.QueueProducer">
<property name="jmsTemplate" ref="jmsTemplateQueue" />
</bean>
<!-- 主题生产者 -->
<bean id="topicProducer" class="com.example.activemq.TopicProducer">
<property name="jmsTemplate" ref="jmsTemplateTopic" />
</bean>
<!-- 队列消费者1 -->
<bean id="queueConsumer1" class="com.example.activemq.QueueConsumer1" />
<!-- 队列消费者2 -->
<bean id="queueConsumer2" class="com.example.activemq.QueueConsumer2" />
<!-- 队列消费者3 -->
<bean id="queueConsumer3" class="com.example.activemq.QueueConsumer3" />
<!-- 主题消费者1 -->
<bean id="topicConsumer1" class="com.example.activemq.TopicConsumer1" />
<!-- 主题消费者2 -->
<bean id="topicConsumer2" class="com.example.activemq.TopicConsumer2" />
<!-- 主题消费者3 -->
<bean id="topicConsumer3" class="com.example.activemq.TopicConsumer3" />
<!-- 消息转换器 -->
<bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</beans>
最后的项目结构如下:
新建测试类 ApplicationTest
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:application-context.xml"})
public class ApplicationTest {
@Resource(name = "activeMQQueue1")
private Queue queue1;
@Resource(name = "activeMQQueue2")
private Queue queue2;
@Resource(name = "activeMQTopic1")
private Topic topic1;
@Resource(name = "activeMQTopic2")
private Topic topic2;
@Autowired
private QueueProducer queueProducer;
@Autowired
private TopicProducer topicProducer;
private Map<String, Object> mapMessage;
@Before
public void initMessage() throws Exception {
mapMessage = new HashMap<>();
mapMessage.put("message", "Hello ActiveMQ - " + new Date().getTime());
}
// 发送消息到队列1
@Test
public void queue1() throws Exception {
queueProducer.sendMessage(queue1, mapMessage);
}
// 发送消息到队列2
@Test
public void queue2() throws Exception {
queueProducer.sendMessage(queue2, mapMessage);
}
// 发送消息到主题1
@Test
public void topic1() throws Exception {
topicProducer.sendMessage(topic1, mapMessage);
}
// 发送消息到主题2
@Test
public void topic2() throws Exception {
topicProducer.sendMessage(topic2, mapMessage);
}
}
运行 queue1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。
由于队列2被两个消费者监听,又由于 ActiveMQ 队列中的消息只能被一个消费者消费,所以这两个消费者应该是通过负载均衡的方式消费。
运行 queue2() 方法,可以看到有时是消费者2消费,有时又是消费者3消费。
运行 topic1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。
运行 topic2() 方法,生产者发送消息后,由于是发布订阅的模式,所以消费者2和消费者3都能收到消息。
查看控制台,可以看到有两个持久化订阅的消费者。
主题的持久化跟队列有点不一样,我们先看生产者。
生产者改动不大,只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。
以下是代码:
public class DurableTopicProducer {
public static void main(String[] args) {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
// 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// 2.使用连接工厂创建一个连接对象
connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.使用连接对象创建会话对象。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用会话对象创建目标对象,这里我们创建的是 topic,也就是一对多模式
Destination destination = session.createTopic("topic-test-01");
// 6.使用会话对象创建生产者对象
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7.使用会话对象创建一个文本消息对象
TextMessage textMessage = session.createTextMessage();
String text = "Hello ActiveMQ - " + new Date().getTime();
// 8.设置消息内容
textMessage.setText(text);
// 9.发送消息
producer.send(textMessage);
System.out.println("已生产消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 10.关闭资源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
然后是消费者,消费者改动比较多,以下是代码:
public class DurableTopicConsumer {
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// 2.使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 3.设置ClientID
connection.setClientID("client1");
// 4.开启连接
connection.start();
// 5.使用连接对象创建会话对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 6.使用会话对象创建目的地对象,生产者与消费者的名称要保持一致。
Topic topic = session.createTopic("topic-test-01");
// 7.使用会话对象创建主题订阅者
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subscriber-1");
// 8.通过监听器的方式来接收消息
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println("已接收消息:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
启动消费者,打开控制台页面,点击 “Subscribers”,可以看到以下页面。
在Active Durable Topic Subscribers里可以看到有一个消费者的信息,说明订阅成功了。
此时,我们将这个消费者线程关闭,再来看看,可以看到这个消费者跑到了Offline Durable Topic Subscribers里面,如下图所示。表明这个消费者离线了,但是依旧保持订阅关系。
下面做一个试验,此时消费者已经离线,我们先启动生产者,后启动消费者,此时消费者还能接收到刚才生产者发送的消息吗?
答案是可以的。
此时再刷新控制台,可以看到刚才在Offline的订阅者跑到了Active里面。
上述内容可以联系微信公众号理解。我们可以想象一个场景,我订阅了某公众号,后来我手机没电关机了,当我意识到后,充电再开机,我依旧能收到公众号推送给我的消息,这里的持久化,可以认为,我和公众号的联系被持久化了,所以我上线后可以及时收到消息。
持久化队列只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。默认是持久化的。
我们可以做个试验,使用 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 设置为持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者依然可以拿到消息,也就是消息在宕机之前已经被存储了下来,后面再启动的时候,会读取到存储的数据。
非持久化队列也很简单,只需要设置 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 就可以了。
我们再做个试验,使用 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 设置为非持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者什么都拿不到,也就是消息已经丢了,我们在MQ的控制台页面看到,MQ里的消息数量是0。
ActiveMQ 的持久化可以这样理解:生产者生产消息之后,消息首先会到达 broker,broker 就相当于一个调度中心。broker 首先将消息存储在本地的数据文件(或者是远程数据库),再将消息发送给消费者。如果消费者接收到了消息,则将消息从存储中删除,否则继续尝试重发。
ActiveMQ 启动后,首先检查存储位置,如果有未发送成功的消息,需要先把消息发送出去。
ActiveMQ 默认的持久化数据库是 KahaDB,我们也可以改成 MySQL。
第一步,首先将 mysql-connector-java 的jar 扔到 ActiveMQ的lib目录下。
然后修改 conf 目录下的 activmeq.xml 中的 persistenceAdapter 结点为 JDBC。
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-datasource"/>
</persistenceAdapter>
再添加 mysql-datasource,这样上面的配置文件才能拿到 DataSource。将下面这段内容放在 broker 结点和 import 结点之间。
<bean id="mysql-datasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&characterEncoding=UTF-8"/>
<property name="username" value="root"/>
<property name="password" value="123"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
上面的用户名和密码要改成你自己的,还有要注意 url 参数,这里的&
要改成&
,也就是&的转义,否则启动会报错。
启动 ActiveMQ,打开 activemq 库,就会发现自动生成了 3 张表。
每张表的作用如下:
activemq_msgs(消息表):
activemq_acks(订阅关系表,针对于持久化的Topic,订阅者和服务器的订阅关系会保存在这个表中):
activemq_lock:
在集群环境中才有用,只有一个 Broker 可以获取消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用来记录哪个 Broker 是当前的 Master Broker。
运行队列生产者生产 3 条数据,然后查看 activemq_msgs 表,可以看到这 3 条消息已经存进 activemq_msgs 表了。
然后启动队列消费者消费消息,在刷新 activemq_msgs 表,可以看到数据没了,说明消息的消费操作也同步更新了数据库。
启动消费者 DurableTopicConsumer 类,查看数据库的 activemq_acks 表,可以发现订阅关系已经持久化到数据库里了。
启动生产者 DurableTopicProducer 类,这时候消息已经被消费了。但是再查看数据库的 activemq_msgs 表,里面的数据依然存在。
上述内容可以联系微信公众号来理解,微信公众号发布过的文章,不能因为订阅者已经收到了或者是服务器关闭了,就把发布过的文章和订阅者清空,这是不合理的,从这个角度来理解就能明白了。
1.生产者将消息发布到主题中,每个消息可以被多个消费者消费,属于一对多的关系。
2.生产者和消费者存在时间上的相关性,必须先启动消费者。
3.生产者生产时,主题模式不会持久化消息,所以先启动消费者,再启动生产者,这样消息才不会丢弃。
1.先生产,然后启动1号消费者,1号消费者能消费到消息吗?
答:不可以。
2.先生产,然后启动1号消费者,再启动2号消费者,1号消费者能消费吗?2号消费者能消费吗?
答:都不可以。
3.先启动2个消费者,然后再生产消息,消费情况如何?
答:都可以消费到消息。
之前我们说过,在创建 session 的时候,我们传了两个参数,第一个是事务,第二个是签收,在这篇博客中,我们来说说事务。
connection.createSession()方法的第一个参数就是事务,它的值可以是true或false,代表session的提交是事务提交还是非事务提交。
当事务的值是false时,只要执行了producer.send()方法,消息就到了队列中,也就是自动提交了。
当事务的值是true时,在执行完roducer.send()方法后,在session关闭之前需要多加一个session.commit()方法提交事务。
第一个参数传入 true,这样就开启事务了。
session关闭之前提交事务,否则消息不会进入队列中。
事务的提交,用于实际复杂的业务场景,可能有多个消息需要入队列,假设有一条入队列报错了,我们希望这一批次的都要回滚,这时候就要用到session.rollback()方法了,可以将session.rollback()方法放在catch语句块中来执行。
事务主要是针对生产者而言的,但是对于消费者,也有事务。
将connection.createSession()的第一个参数改为true,表示消费者采用事务的方式来消费消息。
通过前面的介绍,我们知道,如果开启了事务,就需要手动执行session.commit()来提交事务,假设这时候,我们不提交事务,看看会出现什么情况。
我们先启动生产者。
可以看到生产者生产了一条消息。
接着启动消费者。
这时候消费者是可以消费到消息的。
但是注意看控制台,这时候队列中还存在1条消息等待消息,有0条消息已消费。
这时候我们如果再启动一个消费者,发现又做了一次消费,也就是重复消费了。
也就是说在事务模式下,消费者虽然消费到了消息,但是它没有提交事务,也就没有通知到MQ,所以MQ并不知道消息是否被消费了,所以就会出现重复消费的情况。
所以消费者如果开启了事务,那么也要提交事务。否则MQ并不知道消息是否被消费了,最后就会出现重复消费的情况。