3.学习 Kafka - Java 客户端

2020-05-21   消息队列   Kafka  

引入客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

生产者

阻塞的生产者

生产者代码如下:

public class CustomProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1.配置生产者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#producerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2.实例化一个生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 3.实例化一条消息,第一个参数是主题的名称,第二个参数是消息的内容
            ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
            // 4.发送消息
            Future<RecordMetadata> future = producer.send(record);
            // 5.获取发送的结果,注意这个方法会阻塞当前线程
            RecordMetadata metadata = future.get();
            System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
        }
        // 6.关闭资源
        producer.close();
    }
}

带有回调的生产者

前面说过,future.get() 方法会阻塞当前线程,这样会影响到后面代码的执行,好在 Kafka 客户端提供了回调的方法。

代码如下:

public class CallbackProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1.配置生产者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#producerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2.实例化一个生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 3.实例化一条消息,第一个参数是主题的名称,第二个参数是消息的内容
            ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
            // 4.发送消息
            producer.send(record, new Callback() {
                // 5.在回调函数里处理结果,这样就不会阻塞了
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });
        }
        // 6.关闭资源
        producer.close();
    }
}

消息发送到哪个分区?

image

如上图所示,ProducerRecord 类的构造函数有很多重载,根据构造函数来决定消息的分区,规则如下:

  • 如果不指定分区和 key,那么消息会轮询负载均衡到每个分区上。
// 轮询负载均衡到每个分区上
ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
  • 如果不指定分区,但是指定了 key,那么会将 key hash,然后取模,根据模数决定分区。
// 消息发送到哪个分区取决于 key
ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i), Integer.toString(i));
  • 如果指定分区,那么直接发送到指定的分区。
// 指定消息发送到第 2 个分区
ProducerRecord<String,String> record = new ProducerRecord<>("test01", 2, Integer.toString(i), Integer.toString(i));

消费者

自动提交偏移量

public class AutoCommitConsumer {

    public static void main(String[] args) {
        // 1.配置消费者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#consumerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("group.id", "my-consumer-group-01");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 2.实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 3.订阅主题(可以订阅多个)
        consumer.subscribe(Arrays.asList("test01"));
        // 4.循环消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value()));
            }
        }
    }
}

通过配置 enable.auto.commit,表明这是个自动提交偏移量,偏移量由 auto.commit.interval.ms 控制自动提交的频率。

通过配置 bootstrap.servers 指定集群的一个或多个 broker。不用指定全部的 broker,它将自动发现集群中的其余的 borker(最好指定多个,万一有服务器故障)。

在这个例子中,客户端订阅了主题 test01。消费者组叫 my-consumer-group-01。

broker 通过心跳机制自动检测 my-consumer-group-01 组中失败的消费者实例,消费者实例会自动 ping 集群,告诉集群它还活着。只要消费者实例能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过 session.timeout.ms 设置的值,那么就会认为是故障的,它的分区将被分配到别的消费者实例。

通过配置 deserializer 如何把 byte 转成 object 类型,例子中,通过指定 string 解析器,我们告诉获取到的消息的 key 和 value 只是简单个 string 类型。

手动提交偏移量

通过设置 props.put(“enable.auto.commit”, “false”); 改为手动提交偏移量。

然后在消费消息后手动提交。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
    consumer.commitSync();
}

为什么需要手动提交?

可以想象一个场景:读取消息后,将它们批量插入到数据库中。如果我们设置 offset 自动提交,消费将被认为是已消费的。这样会出现问题,因为消息被插入到数据库之前可能会失败,这样我们就会丢失部分的数据。

List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= 100) {
        try {
            // 插入数据库
            insertIntoDb(buffer);
        } catch(Exception e) {
            // 如果插入数据库失败
            // TODO
        }
        // 插入数据库成功再手动提交
        consumer.commitSync();
        buffer.clear();
    }
}

订阅指定的分区

在前面的例子中,我们订阅感兴趣的 topic,让 Kafka 提供给我们平分后的分区。但是,在有些情况下,你可能需要自己来控制分配指定分区。

要使用此模式,你只需调用 assign(Collection partitions) 方法消费指定的分区即可:

// 订阅分区
TopicPartition partition0 = new TopicPartition("test01", 0);
TopicPartition partition1 = new TopicPartition("test01", 1);
consumer.assign(Arrays.asList(partition0, partition1));

注意订阅主题和订阅分区是互斥的。

控制消费的位置

大多数情况下,消费者只是简单的从头到尾的消费消息,周期性的提交 offset(自动或手动)。Kafka 也支持消费者去手动的控制消费的位置,可以消费之前的消息也可以跳过最近的消息。

要使用此模式,你只需调用 seek(TopicPartition partition, long offset) 方法消费指定的位置即可:

TopicPartition partition0 = new TopicPartition("test01", 0);
consumer.seek(partition0, 784);

结果如下图:

image

2.学习 Kafka - 重要概念

2020-05-20   消息队列   Kafka  

由于搭建集群的时候,3 台虚拟机都在前台开启了 Kafka,占用了窗口,所以我这里再开启两个窗口,分别用来操作 Kafka 和 Zookeeper。

image

下面的操作都在这两个窗口上进行。

主题(Topic)

如果把 Kafka 比作 MySQL 的话,那么主题就相当于数据库,所以想用 Kafka,就必须先创建主题,就好比我们想用 MySQL,就必须先创建数据库一样。

使用这个命令创建主题:

/usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.197.6:2181 --replication-factor 3 --partitions 3 --topic test01

关于这条命令的解释:

–zookeeper:指定 zookeeper 集群的 ip 地址和端口

–topic:创建名为 test01 的主题

–partitions:test01 这个主题分配 3 个分区

–replication-factor:每个分区分配 3 个副本,也就是每个分区备份 3 份

主题在 Zookeeper 中是如何体现的呢,我们进入 Zookeeper 看一下。

使用命令 ls /brokers/topics,可以看到刚才创建的 test01 主题在 Zookeeper 中已经创建了。

image

使用命令 /brokers/topics/test01/partitions,可以看到分区也创建了。

image

分区(partition)

主题只是一个逻辑的概念,在物理上并不存在,消息也不是存在主题里的,而是存在分区里的。分区是 Kafka 消息存储的最小单元,在物理上对应着一个个真实的文件。

我们上面新建了一个 test01 主题,分配了 3 个分区,这 3 个分区其实就是一个个的文件夹。我们可以进入 log.dirs 目录

cd /usr/local/kafka_2.13-2.5.0/logs/

image

可以看到创建了 3 个文件夹,分别对应 3 个分区,命名规则是 topic名称-N,N 就是分区的个数。后面生产者发送给 test01 主题的数据都会存在这 3 个文件夹里。

进入 test01 主题的第一个分区看一下

image

可以看到每个分区文件夹,都包含以下 4 类文件:

  • xxxxxxxxx.index:索引文件
  • xxxxxxxxx.log:数据文件
  • xxxxxxxxx.timeindex:时间戳索引文件
  • leader-epoch-checkpoint:保存了每一任 leader 开始写入消息时的 offset,会定时更新,当 follower 被选为 leader 时会根据这个确定哪些消息可用

我们重点看 .index .log .timeindex 这 3 个文件。

这 3 个文件都是成对出现的,每一对都叫做 segment。也就是说分区中的数据是分段存储,一个分区(partition)被切割成多个相同大小的段(segment)(这个是由 log.segment.bytes 决定,控制每个 segment 的大小)。

segment 文件命名规则:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

如下图所示:

image

通过上面的内容,我们可以这样总结:

在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,目录的命名规则为 topic名称+有序序号。第一个序号从 0 开始计,最大的序号为 partition 数量减 1。partition 是实际物理上的概念,而 topic 是逻辑上的概念,topic 更像是一个消息的类别。由于生产者生产的消息会不断追加到 log 文件的末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 将 partition 细分为 segment,一个 partition 物理上由多个 segment 组成。每个 segment 由 3 个文件组成,分别是 index 文件、log 文件和 timeindex 文件,这 3 个文件的命名规则是:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。

偏移量(offset)

分区中的每一条记录都会分配一个 id 号来表示顺序,我们称之为 offset,offset 用来唯一的标识分区中每一条记录。

每一个消费者中唯一保存的元数据是 offset(偏移量)即消费在 log 中的位置。偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据。也可以跳过最近的记录,从 “现在” 开始消费。

特别说明的是,老版本的消费者是依赖 Zookeeper 的,当启动一个消费者时向 Zookeeper 注册。新版本消费者去掉了对 Zookeeper 的依赖,而是由消费组协调器统一管理,已消费的消息偏移量提交后会保存到名为 “__consumer_offsets” 文件夹中。

image

Kafka 的 offset 是分区内有序的,但是在不同分区中是无序的,Kafka 不保证数据的全局有序。

关于 offset,可以通过以下几个问题来理解:

1.为啥 offset 在分区内是有序的?

因为分区的结构是队列,先进先出,所以保证了有序。

2.为啥在不同分区中是无序的?

因为消息写入哪一个分区不是固定的,有可能第一条消息写入第一个分区,第二条消息写入第二个区分。。。

3.如果有业务场景需要保证数据的有序呢?该如何做?

可以这样做:创建主题的时候只分配一个分区,这样就能保证数据的顺序性了。

但是只分配一个分区,又跟 Kafka 的分布式冲突,所以还可以这样做:

创建主题的时候分配多个分区,读取消息时,在代码层面处理消息的顺序性。

broker

Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。一个 broker 有多个 topic,一个 topic 有多个 partition,一个 partition 有多个 segment。

image

生产者(producer)

Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为 message(消息)发送到 Kafka 集群。默认情况下,每行将作为单独的 message 发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

[root@localhost kafka_2.13-2.5.0]# /usr/local/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 192.168.197.6:9092 --topic test01
>This is a message
>This is another message
>

关于这条命令的解释:

–broker-list:设置 kafka 集群地址

–topic:设置生产消息的主题

消费者(consumer)

Kafka 还有一个命令行 consumer(消费者),将消息转储到标准输出。

/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.12:9092 --topic test01 --from-beginning
This is a message
This is another message

关于这条命令的解释:

–bootstrap-server:设置 kafka 集群地址

–topic:设置消费消息的主题

–from-beginning:设置消息起始位置开始消费。默认是从新位置 latest 开始消费。

消费组(consumer-groups)

消费者使用一个 “消费组” 名称来进行标识,发布到 topic 中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。

如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。

如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。

image

如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。

通常情况下,每个 topic 都会有一些消费组,一个消费组对应一个”逻辑订阅者”。一个消费组由许多消费者实例组成,便于扩展和容错。这就是发布和订阅的概念,只不过订阅者是一组消费者而不是单个的进程。

在 Kafka 中实现消费的方式是将分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由 Kafka 协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区。如果一个实例消失或者故障,拥有的分区将被分发到剩余的实例。这被称为重新平衡分组。

Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序。每个 partition 分区按照key值排序足以满足大多数应用程序的需求。但如果你需要总记录在所有记录的上面,可使用仅有一个分区的主题来实现,这意味着每个消费者组只有一个消费者进程。

创建消费者时如果不指定消费组,那么就会默认分配一个,可以通过这个命令查看有哪些消费组。

/usr/local/kafka_2.13-2.5.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.197.6:9092 --list

image

如图,因为我启动了 4 个消费者,所以有 4 个消费组。由于这 4 个消费者都属于不同的消费组,所以他们都能收到 test01 主题的所有消息。

同一个消费组里的消费者消费唯一的分区,不可以重复消费。

打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。

消费组 my-consumer-group-01 有 3 个消费者,分别是 my-consumer-01,my-consumer-02,my-consumer-03。

那么,Kafka 会负载均衡的将分区划分给同一消费组里的消费者。

image

如上图所示,此时,test01-0 分区已经被第一个消费者消费了,那么它就不能被这个组里的其它消费者消费。除非第一个消费者挂了,那么它就会被重新分配给其它的消费者。

我们启动 3 个消费者来亲自感受一下。

使用这个命令可以指定消费者的组。

/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.6:9092 --consumer-property group.id=my-consumer-group-01 --consumer-property client.id=my-consumer-01 --topic test01

--consumer-property group.id=my-consumer-group-01 指定消费者的组是 my-consumer-group-01。

--consumer-property client.id=my-consumer-01 指定消费者的 ID 是 my-consumer-01。

image

如上图所示,我启动了 3 个消费者,这 3 个消费者都位于同一个组里。然后生产者往 test01 这个主题里发送消息,可以看到:对于同一个消费组而言,同一个时刻,只有一个消费者在消费消息,而且每个消费者消费唯一的分区,不可以重复消费。

不同消费组里的消费者可以消费相同的分区,可以重复消费。

打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。

消费组 my-consumer-group-01 有 2 个消费者,分别是 my-consumer-01 和 my-consumer-02。

消费组 my-consumer-group-02 有 1 个消费者,是 my-consumer-03。

image

如上图所示,my-consumer-01 和 my-consumer-02 位于同一个组,那么它们无法重复消费同一个分区,并且同一个时刻,只能有一个消费者消费。

虽然 test01-0,test01-1,test01-2 这 3 个分区已经被 my-consumer-01 和 my-consumer-02 消费过了,但是 my-consumer-03 位于不同的组,所以其它组对它是不影响的,它是独立的,可以消费其它组消费过的分区。

image

如上图所示,我启动了 3 个消费者,前两个消费者同一个组,第三个消费者另一个组。

由于 my-consumer-01 和 my-consumer-02 位于同一个组,所以它们只能消费分配给自己的分区,不能重复消费。而且同一个时刻只能有一个消费者消费,所以看起来就像是轮流消费。而 my-consumer-03 位于不同的组,即使分区已经被其它组的消费者消费过了,但是它还是能消费的。而且这个组只有它一个消费者,所以它消费了所有分区的消息。

1.学习 Kafka - 搭建集群环境

2020-05-18   消息队列   Kafka  

前提条件

请准备 3 台虚拟机,以下是我准备的:

image

安装 JDK1.8

Zookeeper 和 Kafka 都依赖 JDK,所以安装 JDK 是必须的。

到官网 https://www.oracle.com/java/technologies/javase-downloads.html 下载 JDK1.8。

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp jdk-8u251-linux-x64.tar.gz root@192.168.197.6:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.10:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.12:/usr/local/src

接着解压 jdk-8u251-linux-x64.tar.gz 压缩包到 /usr/local 目录。

tar zxvf /usr/local/src/jdk-8u251-linux-x64.tar.gz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set jdk environmentexport
export JAVA_HOME=/usr/local/jdk1.8.0_251
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

最后执行 java -version 命令,验证是否安装成功。输出以下内容则说明安装成功:

java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

搭建 Zookeeper 集群

由于 Kafka 的 broker 的状态、消费者的消费状态、偏移量以及消费者组等是交给 Zookeeper 维护的,所以安装 Kafka 之前必须先安装 Zookeeper。

安装 Zookeeper

到官网 https://zookeeper.apache.org/releases.html 下载 Zookeeper,我们下载 3.4.14 版本的。

image

image

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp zookeeper-3.4.14.tar.gz root@192.168.197.6:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.10:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.12:/usr/local/src

接着解压 zookeeper-3.4.14.tar.gz 压缩包到 /usr/local 目录。

tar zxvf /usr/local/src/zookeeper-3.4.14.tar.gz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set zookeeper environmentexport
export ZK_HOME=/usr/local/zookeeper-3.4.14
export PATH=$ZK_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

修改 Zookeeper 的配置文件

将 zoo_sample.cfg 文件备份并重命名为 zoo.cfg

cp /usr/local/zookeeper-3.4.14/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.14/conf/zoo.cfg

编辑 zoo.cfg 文件

vi zoo.cfg

修改数据文件夹路径

dataDir=/usr/local/zookeeper-3.4.14/data

添加日志文件夹路径

dataLogDir=/usr/local/zookeeper-3.4.14/logs

在文件末尾添加集群信息

server.1=192.168.197.6:2888:3888
server.2=192.168.197.10:2888:3888
server.3=192.168.197.12:2888:3888

image

创建数据文件夹

mkdir /usr/local/zookeeper-3.4.14/data

创建日志文件夹

mkdir /usr/local/zookeeper-3.4.14/logs

创建 myid 文件,在 myid 文件中添加本机的 server ID

echo "1" > /usr/local/zookeeper-3.4.14/data/myid

请注意,每台虚拟机的 myid 要跟 server ID 对应。

开放端口

每台虚拟机都开放以下端口,如果不开放的话,Zookeeper 集群无法与其它节点通信,启动会报错。

# 2181 端口对 client 端提供服务
firewall-cmd --zone=public --add-port=2181/tcp --permanent
# 2888 端口是集群内机器相互通信使用(由 Leader 负责监听此端口)
firewall-cmd --zone=public --add-port=2888/tcp --permanent
# 3888 端口是选举 leader 使用的
firewall-cmd --zone=public --add-port=3888/tcp --permanent

使端口生效

firewall-cmd --reload

启动 3 台虚拟机的 Zookeeper

使用以下命令分别启动每台虚拟机上的 Zookeeper。

/usr/local/zookeeper-3.4.14/bin/zkServer.sh start

查看 Zookeeper 的状态

/usr/local/zookeeper-3.4.14/bin/zkServer.sh status

如果输出以下信息,则说明 Zookeeper 集群搭建成功

192.168.197.6:

image

192.168.197.10:

image

192.168.197.12:

image

通过以上图片可以知道,3 台虚拟机上的 Zookeeper 已经成功启起来了。其中 192.168.197.10 节点被选举为 leader 节点,其它两个节点是 follower 节点。每个节点都监听 3888 端口,当 leader 挂掉时,其它节点会通过此端口相互通信,选举出新的 leader。

2888 端口是集群内机器相互通信使用的,由 Leader 负责监听此端口。

搭建 Kafka 集群

安装 Kafka

到官网 http://kafka.apache.org/downloads 下载 Kafka,我们下载 2.5.0 版本的。

image

image

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp kafka_2.13-2.5.0.tgz root@192.168.197.6:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.10:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.12:/usr/local/src

接着解压 kafka_2.13-2.5.0.tgz 压缩包到 /usr/local 目录。

tar zxvf kafka_2.13-2.5.0.tgz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set kafka environment
export KAFKA_HOME=/usr/local/kafka_2.13-2.5.0
export PATH=$KAFKA_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

修改 Kafka 的配置文件

修改 192.168.197.6 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=0
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.6:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

修改 192.168.197.10 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=1
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.10:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

修改 192.168.197.12 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=2
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.12:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

然后每个节点都创建 logs 文件夹

mkdir /usr/local/kafka_2.13-2.5.0/logs

开放端口

每台虚拟机都开放以下端口

# 9092 端口是 Kafka 对 client 端提供服务的端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent

使端口生效

firewall-cmd --reload

启动 3 台虚拟机的 Kafka

使用以下命令分别启动每台虚拟机上的 Kafka。

/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.5.0/config/server.properties &

上面那个命令是前台启动,会一直占用窗口,你也可以使用后台启动,命令如下:

/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/server.properties

如果输出以下信息,则说明启动成功

image

总结

最后来总结以下每台虚拟机安装的内容。

查看服务启动情况

使用这个命令查看服务启动情况:

jps

image

从上图可以看到安装了 Kafka 和 Zookeeper,并且都启动了。

查看端口占用情况

使用这个命令查看端口占用情况:

netstat -tunlp|egrep "(2181|2888|3888|9092)"

image

  • 2888 端口:此端口是 Zookeeper 集群内机器相互通信使用的,由 Leader 节点负责监听此端口。
  • 3888 端口:Zookeeper 集群的每个节点都监听 3888 端口,当 leader 挂掉时,其它节点会通过此端口相互通信,选举出新的 leader。
  • 2181 端口:Zookeeper 对 client 端提供服务的端口。
  • 9092 端口:Kafka 对 client 端提供服务的端口。

查看 Zookeeper 的目录情况

随便在一个节点上,使用以下命名,查看 Zookeeper 的目录情况。

使用客户端进入 Zookeeper

/usr/local/zookeeper-3.4.14/bin/zkCli.sh

image

使用 ls / 命令查看 Zookeeper 的目录情况。

image

上面的显示结果中:只有 zookeeper 目录是 Zookeeper 原生的,其它都是 Kafka 创建的。

关于各个目录的作用,后面再说。

至此,Kafka 的集群环境就搭建成功了。

17.学习ActiveMQ-在SpringBoot中使用ActiveMQ

2020-05-15   消息队列   ActiveMQ SpringBoot  

新建工程

新建一个 Maven 工程,我的工程叫 springboot-activemq-example。

pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/>
    </parent>
    
    <groupId>wang.miansen</groupId>
    <artifactId>springboot-activemq-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>springboot-activemq-example</name>
    <description>activemq project for Spring Boot</description>
    
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>

    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
        <!--消息队列连接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

server.port=8080
# 配置消息的类型,如果是true则表示为topic消息,如果为false表示Queue消息
spring.jms.pub-sub-domain=true
# 连接用户名
spring.activemq.user=admin
# 连接密码
spring.activemq.password=admin
# 连接主机信息
spring.activemq.brokerUrl=tcp://localhost:61616
# 连接池的最大连接数
jms.pool.maxConnections=10

新建配置类 ActiveMQConfig

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }
    
    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }
    
    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }
    
}

队列生产者

@Component
public class QueueProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-queue-1")
    private Queue queue1;
    
    @Resource(name = "springboot-activemq-queue-2")
    private Queue queue2;
    
    // 发送消息到队列1
    public void sendQueueMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue1, mapMessage);
    }
    
    // 发送消息到队列2
    public void sendQueueMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue2, mapMessage);
    }

}

队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

@Component
public class TopicProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-topic-1")
    private Topic topic1;
    
    @Resource(name = "springboot-activemq-topic-2")
    private Topic topic2;
    
    // 发送消息到主题1
    public void sendTopicMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic1, mapMessage);
    }
    
    // 发送消息到主题2
    public void sendTopicMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic2, mapMessage);
    }

}

主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

测试

如果要测试主题,那么需要修改配置 spring.jms.pub-sub-domain=true

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private QueueProducer queueProducer;
    
    @Autowired
    private TopicProducer topicProducer;
    
    @Test
    public void sendQueueMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息1");
        queueProducer.sendQueueMessage1(map);
    }
    
    @Test
    public void sendQueueMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息2");
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
    }
    
    @Test
    public void sendTopicMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息1");
        topicProducer.sendTopicMessage1(map);
    }
    
    @Test
    public void sendTopicMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息2");
        topicProducer.sendTopicMessage2(map);
    }
    
}

同时配置队列和主题

上面说过,如果想要使用主题,那么需要修改配置文件,这样不方便。我们可以在 ActiveMQConfig 类里同时配置队列和主题。

修改 ActiveMQConfig 类:

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    @Value("${spring.activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String user;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${jms.pool.maxConnections}")
    private int maxConnections;

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }

    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }

    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }

    // 真正可以产生连接的工厂
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(user, password, brokerUrl);
    }

    // ActiveMQ 提供的连接池
    @Bean
    public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        PooledConnectionFactory pool = new PooledConnectionFactory();
        pool.setConnectionFactory(connectionFactory);
        pool.setMaxConnections(maxConnections);
        return pool;
    }

    // 队列监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable1(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-1");
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable2(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-2");
        return factory;
    }
    
    @Bean
    public JmsMessagingTemplate jmsMessagingTemplate(PooledConnectionFactory connectionFactory){
        return new JmsMessagingTemplate(connectionFactory);
    }

}

修改队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

修改主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1", containerFactory = "jmsListenerContainerTopic")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable1")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

16.学习ActiveMQ-在SpringMVC中使用ActiveMQ

2020-05-13   消息队列   ActiveMQ SpringMVC  

新建工程

新建一个 Maven 工程,我的工程叫 springmvc-activemq-example。

pom.xml 文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>wang.miansen</groupId>
    <artifactId>springmvc-activemq-example</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>springmvc-activemq-example Maven Webapp</name>
    <url>http://maven.apache.org</url>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
    
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        
        <!-- 1)Spring核心 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 2)Spring DAO层 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 3)Spring web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 4)Spring test -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>

        <!-- 1)ActiveMQ 核心 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

        <!-- 2)ActiveMQ与 Spring 集成 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>

        <!--3)ActiveMQ所需要的pool包 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>springmvc-activemq-example</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <!-- web.xml版本 -->
                <version>3.1</version>
                <configuration>
                    <!-- jdk版本 -->
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

队列生产者

新建队列生产者 QueueProducer 类:

public class QueueProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

队列消费者

新建 3 个队列消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class QueueConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

新建主题生产者 TopicProducer 类:

public class TopicProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Topic topic, final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(topic, mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

主题消费者

新建 3 个主题消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class TopicConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

application.properties

新建 application.properties 文件,配置 ActiveMQ 的相关信息。

# jms服务器地址
jms.broker.url=tcp://localhost:61616
# 连接用户名
jms.broker.userName=admin
# 连接密码
jms.broker.password=admin
# 使用异步发送
jms.broker.useAsyncSend=true
# 连接池的最大连接数
jms.pool.maxConnections=10

application-context.xml

新建 application-context.xml 文件,配置各个 Bean。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

    <!-- 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker.url}"/>
        <property name="userName" value="${jms.broker.userName}"></property>  
        <property name="password" value="${jms.broker.password}"></property>  
        <property name="useAsyncSend" value="${jms.broker.useAsyncSend}" />  
    </bean>
    
    <!-- 连接池 -->
    <!-- ActiveMQ 为我们提供了一个 PooledConnectionFactory,通过往里面注入一个 ActiveMQConnectionFactory   
        可以用来将 Connection、Session 和  MessageProducer 池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool 包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <!--真正可以产生连接的连接工厂,由对应的 JMS 服务厂商提供-->
        <property name="connectionFactory" ref="activeMQConnectionFactory" />
        <!--连接池的最大连接数-->
        <property name="maxConnections" value="${jms.pool.maxConnections}"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 队列和非持久化订阅使用 -->
    <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory2" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory3" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
    </bean>
    
    <!--队列1-->
    <bean id="activeMQQueue1" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-1" />
    </bean>
    
    <!--队列2-->
    <bean id="activeMQQueue2" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-2" />
    </bean>
    
    <!--主题1-->
    <bean id="activeMQTopic1" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-1"/>
    </bean>
    
    <!--主题2-->
    <bean id="activeMQTopic2" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-2"/>
    </bean>
    
    <!-- 队列 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQQueue1"/>
        <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false"/>
    </bean>
    
    <!-- 主题 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateTopic" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQTopic1"/>
         <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true"/>
    </bean>
    
    <!--队列监听容器1,一经注册,自动监听-->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue1" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer1" />
    </bean>
    
    <!--队列监听容器2,一经注册,自动监听-->
    <bean id="queueListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer2" />
    </bean>
    
    <!--队列监听容器3,注意目的地跟容器2一样-->
    <bean id="queueListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer3" />
    </bean>
    
    <!--主题监听容器1(非持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic1" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="false" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer1" />
    </bean>
    
    <!--主题监听容器2(持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory2" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer2" />
    </bean>
    
    <!-- 主题监听容器3(持久化订阅),注意目的地跟容器2一样 -->
    <bean id="topicListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory3" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer3" />
    </bean>
    
    <!-- 队列生产者 -->
    <bean id="queueProducer" class="com.example.activemq.QueueProducer">
        <property name="jmsTemplate" ref="jmsTemplateQueue" />
    </bean>
    
    <!-- 主题生产者 -->
    <bean id="topicProducer" class="com.example.activemq.TopicProducer">
        <property name="jmsTemplate" ref="jmsTemplateTopic" />
    </bean>
    
    <!-- 队列消费者1 -->
    <bean id="queueConsumer1" class="com.example.activemq.QueueConsumer1" />
    <!-- 队列消费者2 -->
    <bean id="queueConsumer2" class="com.example.activemq.QueueConsumer2" />
    <!-- 队列消费者3 -->
    <bean id="queueConsumer3" class="com.example.activemq.QueueConsumer3" />
    
    <!-- 主题消费者1 -->
    <bean id="topicConsumer1" class="com.example.activemq.TopicConsumer1" />
    <!-- 主题消费者2 -->
    <bean id="topicConsumer2" class="com.example.activemq.TopicConsumer2" />
    <!-- 主题消费者3 -->
    <bean id="topicConsumer3" class="com.example.activemq.TopicConsumer3" />
    
    <!-- 消息转换器 -->
    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</beans>

项目结构

最后的项目结构如下:

image

测试

新建测试类 ApplicationTest

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:application-context.xml"})
public class ApplicationTest {

    @Resource(name = "activeMQQueue1")
    private Queue queue1;

    @Resource(name = "activeMQQueue2")
    private Queue queue2;

    @Resource(name = "activeMQTopic1")
    private Topic topic1;

    @Resource(name = "activeMQTopic2")
    private Topic topic2;

    @Autowired
    private QueueProducer queueProducer;

    @Autowired
    private TopicProducer topicProducer;

    private Map<String, Object> mapMessage;

    @Before
    public void initMessage() throws Exception {
        mapMessage = new HashMap<>();
        mapMessage.put("message", "Hello ActiveMQ - " + new Date().getTime());
    }

    // 发送消息到队列1
    @Test
    public void queue1() throws Exception {
        queueProducer.sendMessage(queue1, mapMessage);
    }

    // 发送消息到队列2
    @Test
    public void queue2() throws Exception {
        queueProducer.sendMessage(queue2, mapMessage);
    }

    // 发送消息到主题1
    @Test
    public void topic1() throws Exception {
        topicProducer.sendMessage(topic1, mapMessage);
    }

    // 发送消息到主题2
    @Test
    public void topic2() throws Exception {
        topicProducer.sendMessage(topic2, mapMessage);
    }

}

测试队列1

运行 queue1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

image

测试队列2

由于队列2被两个消费者监听,又由于 ActiveMQ 队列中的消息只能被一个消费者消费,所以这两个消费者应该是通过负载均衡的方式消费。

运行 queue2() 方法,可以看到有时是消费者2消费,有时又是消费者3消费。

测试主题1

运行 topic1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

测试主题2

运行 topic2() 方法,生产者发送消息后,由于是发布订阅的模式,所以消费者2和消费者3都能收到消息。

查看控制台,可以看到有两个持久化订阅的消费者。

image

14.学习ActiveMQ-主题的持久化

2020-05-12   消息队列   ActiveMQ  

主题的持久化跟队列有点不一样,我们先看生产者。

生产者改动不大,只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。

以下是代码:

public class DurableTopicProducer {

    public static void main(String[] args) {

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

            // 2.使用连接工厂创建一个连接对象
            connection = connectionFactory.createConnection();

            // 3.开启连接
            connection.start();

            // 4.使用连接对象创建会话对象。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 5.使用会话对象创建目标对象,这里我们创建的是 topic,也就是一对多模式
            Destination destination = session.createTopic("topic-test-01");

            // 6.使用会话对象创建生产者对象
            producer = session.createProducer(destination);
            
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            
            // 7.使用会话对象创建一个文本消息对象
            TextMessage textMessage = session.createTextMessage();

            String text = "Hello ActiveMQ - " + new Date().getTime();

            // 8.设置消息内容
            textMessage.setText(text);

            // 9.发送消息
            producer.send(textMessage);
            System.out.println("已生产消息:" + text);

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 10.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

然后是消费者,消费者改动比较多,以下是代码:

public class DurableTopicConsumer {

    public static void main(String[] args) throws JMSException {

        // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

        // 2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        
        // 3.设置ClientID
        
        connection.setClientID("client1");

        // 4.开启连接
        connection.start();

        // 5.使用连接对象创建会话对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 6.使用会话对象创建目的地对象,生产者与消费者的名称要保持一致。
        Topic topic = session.createTopic("topic-test-01");

        // 7.使用会话对象创建主题订阅者
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subscriber-1");

        // 8.通过监听器的方式来接收消息
        subscriber.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        System.out.println("已接收消息:" + ((TextMessage) message).getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

启动消费者,打开控制台页面,点击 “Subscribers”,可以看到以下页面。

image

在Active Durable Topic Subscribers里可以看到有一个消费者的信息,说明订阅成功了。

此时,我们将这个消费者线程关闭,再来看看,可以看到这个消费者跑到了Offline Durable Topic Subscribers里面,如下图所示。表明这个消费者离线了,但是依旧保持订阅关系。

image

下面做一个试验,此时消费者已经离线,我们先启动生产者,后启动消费者,此时消费者还能接收到刚才生产者发送的消息吗?

答案是可以的。

此时再刷新控制台,可以看到刚才在Offline的订阅者跑到了Active里面。

上述内容可以联系微信公众号理解。我们可以想象一个场景,我订阅了某公众号,后来我手机没电关机了,当我意识到后,充电再开机,我依旧能收到公众号推送给我的消息,这里的持久化,可以认为,我和公众号的联系被持久化了,所以我上线后可以及时收到消息。

13.学习ActiveMQ-队列的持久化

2020-05-12   消息队列   ActiveMQ  

持久化

持久化队列只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。默认是持久化的。

我们可以做个试验,使用 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 设置为持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者依然可以拿到消息,也就是消息在宕机之前已经被存储了下来,后面再启动的时候,会读取到存储的数据。

非持久化

非持久化队列也很简单,只需要设置 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 就可以了。

我们再做个试验,使用 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 设置为非持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者什么都拿不到,也就是消息已经丢了,我们在MQ的控制台页面看到,MQ里的消息数量是0。

15.学习ActiveMQ-持久化数据到MySQL

2020-05-12   消息队列   ActiveMQ  

ActiveMQ 的持久化可以这样理解:生产者生产消息之后,消息首先会到达 broker,broker 就相当于一个调度中心。broker 首先将消息存储在本地的数据文件(或者是远程数据库),再将消息发送给消费者。如果消费者接收到了消息,则将消息从存储中删除,否则继续尝试重发。

ActiveMQ 启动后,首先检查存储位置,如果有未发送成功的消息,需要先把消息发送出去。

ActiveMQ 默认的持久化数据库是 KahaDB,我们也可以改成 MySQL。

第一步,首先将 mysql-connector-java 的jar 扔到 ActiveMQ的lib目录下。

image

然后修改 conf 目录下的 activmeq.xml 中的 persistenceAdapter 结点为 JDBC。

<persistenceAdapter>
    <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
    <jdbcPersistenceAdapter dataSource="#mysql-datasource"/>
</persistenceAdapter>

再添加 mysql-datasource,这样上面的配置文件才能拿到 DataSource。将下面这段内容放在 broker 结点和 import 结点之间。

<bean id="mysql-datasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&amp;characterEncoding=UTF-8"/> 
    <property name="username" value="root"/> 
    <property name="password" value="123"/> 
    <property name="poolPreparedStatements" value="true"/> 
</bean>

上面的用户名和密码要改成你自己的,还有要注意 url 参数,这里的&要改成&amp;,也就是&的转义,否则启动会报错。

启动 ActiveMQ,打开 activemq 库,就会发现自动生成了 3 张表。

image

每张表的作用如下:

activemq_msgs(消息表):

image

activemq_acks(订阅关系表,针对于持久化的Topic,订阅者和服务器的订阅关系会保存在这个表中):

image

activemq_lock:

在集群环境中才有用,只有一个 Broker 可以获取消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用来记录哪个 Broker 是当前的 Master Broker。

测试队列的数据

运行队列生产者生产 3 条数据,然后查看 activemq_msgs 表,可以看到这 3 条消息已经存进 activemq_msgs 表了。

image

然后启动队列消费者消费消息,在刷新 activemq_msgs 表,可以看到数据没了,说明消息的消费操作也同步更新了数据库。

image

测试主题的数据

启动消费者 DurableTopicConsumer 类,查看数据库的 activemq_acks 表,可以发现订阅关系已经持久化到数据库里了。

image

启动生产者 DurableTopicProducer 类,这时候消息已经被消费了。但是再查看数据库的 activemq_msgs 表,里面的数据依然存在。

image

上述内容可以联系微信公众号来理解,微信公众号发布过的文章,不能因为订阅者已经收到了或者是服务器关闭了,就把发布过的文章和订阅者清空,这是不合理的,从这个角度来理解就能明白了。

8.学习ActiveMQ-主题模式的特点以及消费者消费消息的3种情况

2020-05-11   消息队列   ActiveMQ  

主题模式的特点

1.生产者将消息发布到主题中,每个消息可以被多个消费者消费,属于一对多的关系。

2.生产者和消费者存在时间上的相关性,必须先启动消费者。

3.生产者生产时,主题模式不会持久化消息,所以先启动消费者,再启动生产者,这样消息才不会丢弃。

消费者消费消息的3种情况

1.先生产,然后启动1号消费者,1号消费者能消费到消息吗?

答:不可以。

2.先生产,然后启动1号消费者,再启动2号消费者,1号消费者能消费吗?2号消费者能消费吗?

答:都不可以。

3.先启动2个消费者,然后再生产消息,消费情况如何?

答:都可以消费到消息。

9.学习ActiveMQ-消息生产者的事务

2020-05-11   消息队列   ActiveMQ  

之前我们说过,在创建 session 的时候,我们传了两个参数,第一个是事务,第二个是签收,在这篇博客中,我们来说说事务。

connection.createSession()方法的第一个参数就是事务,它的值可以是true或false,代表session的提交是事务提交还是非事务提交。

当事务的值是false时,只要执行了producer.send()方法,消息就到了队列中,也就是自动提交了。

当事务的值是true时,在执行完roducer.send()方法后,在session关闭之前需要多加一个session.commit()方法提交事务。

第一个参数传入 true,这样就开启事务了。

image

session关闭之前提交事务,否则消息不会进入队列中。

image

事务的提交,用于实际复杂的业务场景,可能有多个消息需要入队列,假设有一条入队列报错了,我们希望这一批次的都要回滚,这时候就要用到session.rollback()方法了,可以将session.rollback()方法放在catch语句块中来执行。

image