Java 并发之 synchronized

2020-06-04   龙德   Java   并发 synchronized  

Java 提供了多线程之间同步的机制,其中最基本的就是使用 synchronization 关键字。这个关键字大家用得比较多,但是可能没仔细梳理过它,这篇博客将仔细梳理 synchronization 关键字,让大家对它有一个更全面和深刻的认识。

想要全面了解 synchronized,得先了解 3 个知识点:Java 内存模型(JMM)、对象的内存布局和 Monitor 对象。

Java 内存模型(JMM)

现代多核 CPU 中每个核心拥有自己的一级缓存或一级缓存加上二级缓存等,问题就发生在每个核心的占缓存上。每个核心都会将自己需要的数据读到独占缓存中,数据修改后也是写入到缓存中,然后等待刷入到主存中。所以会导致有些核心读取的值是一个过期的值。

Java 作为高级语言,屏蔽了这些底层细节,用 JMM 定义了一套读写内存数据的规范,虽然我们不再需要关心一级缓存和二级缓存的问题,但是,JMM 抽象了主内存和本地内存的概念。

所有的共享变量存在于主内存中,每个线程有自己的本地内存,线程读写共享数据也是通过本地内存交换的,所以可见性问题依然是存在的。这里说的本地内存并不是真的是一块给每个线程分配的内存,而是 JMM 的一个抽象,是对于寄存器、一级缓存、二级缓存等的抽象。

image

总而言之,在 Java 内存模型下,所有的共享变量都存储在主内存中,每条线程还有自己的工作内存,线程的工作内存保存的是用到的共享变量的主内存副本的拷贝。线程对共享变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。这就可能造成一个线程修改了一个变量的值,而另外一个线程还继续使用它在工作内存中的变量值的拷贝,造成数据的不一致。

那 JMM 跟 synchronized 有什么关系呢?

一个线程在获取到监视器锁以后才能进入 synchronized 控制的代码块,一旦进入代码块,首先,该线程对于共享变量的缓存就会失效,因此 synchronized 代码块中对于共享变量的读取需要从主内存中重新获取,也就能获取到最新的值。

退出代码块的时候的,会将该线程写在缓冲区中的数据刷到主内存中,所以在 synchronized 代码块之前或 synchronized 代码块中对于共享变量的操作随着该线程退出 synchronized 块,会立即对其他线程可见(这句话的前提是其他读取共享变量的线程会从主内存读取最新值)。

也就是说,当线程进入 synchronized 控制的代码块中,这个线程对共享变量的读取从主内存中获取,退出 synchronized 块,会将修改后的数据刷到主内存中。

对象的内存布局

image

一个 Java 对象在堆内存中包括对象头、实例数据和补齐填充 3 个部分。

  • 对象头包括 Mark Words(存储哈希码、GC分代年龄、锁标志位等)和 Klass Words(指向当前对象所属的类的地址,也就是 Class 对象),如果是数组对象,还有一个保存数组长度的空间。

  • 实例数据是对象真正存储的有效信息,包括了对象的所有成员变量,其大小由各个成员变量的大小共同决定。

  • 对齐填充不是必然存在的,仅仅起占位符的作用。

对象的内存布局跟 synchronized 有什么关系呢?

别着急,下面介绍 synchronized 的使用的时候,会根据对象的内存布局来画图,更加直观的了解 synchronized。现在只需要记住对象头就可以了,对象头的 Mark Words 有一块空间记录了指向 Monitor 对象的指针,通过它可以关联到 Monitor 对象。

image

Monitor 对象

Monitor 其实是对 Java 对象的锁的一种抽象,称为监视器锁。每个对象都有一个 Monitor 相关联,它和 Java 对象是一对一的关系的。为了便于理解,我们可以简单的把它想象成一个对象。

Monitor 对象记录了持有锁的线程信息、阻塞队列、等待队列等。既然是对象就会有字段,Monitor 对象主要包含以下三个字段:

  • _Owner:记录当前持有锁的线程
  • _EntryList:阻塞队列,记录所有阻塞等待锁的线程
  • _WaitSet:等待队列,记录调用 wait() 方法并还未被通知的线程

当线程获得对象的监视器锁的时候,线程 id 等信息会拷贝进 _Owner 字段,其余线程会进入阻塞队列 _Entrylist,当持有锁的线程执行 wait() 方法,会立即释放锁进入 _Waitset 队列。当线程释放锁的时候,_Owner 会被置空,然后 _Entrylist 中的线程会竞争锁,竞争成功的线程 id 会写入 _Owner,其余线程继续在 _Entrylist 中等待。

Java 对象如何跟 Monitor 关联?

前面说过,对象头的 Mark Words 有一块空间记录了指向 Monitor 对象的指针,通过它可以关联到 Monitor 对象。请看下图:

image

当锁状态变成重量级锁时,对象头有一块空间指向重量级锁的指针,通过它可以关联到 Monitor 对象。

关于锁的 3 种状态:

JDK1.6 对 synchronized 的实现引入了大量的优化,如偏向锁、轻量级锁、重量级锁等技术来减少 synchronized 操作的开销。,这 3 种锁的区别如下:

  • 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。

  • 轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。

  • 重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

synchronized 的使用

synchronized 关键字主要有两种使用方式:

  • synchronized 作用于方法,称为同步方法。同步方法被调用时,会自动执行加锁操作,只有加锁成功,方法体才会得到执行。如果被 synchronized 修饰的方法是实例方法,那么这个实例的监视器会被锁定。如果是 static 方法,线程会锁住相应的 Class 对象的监视器。方法体执行完成或者异常退出后,会自动执行解锁操作。

  • synchronized 代码块。synchronized(object) 在对某个对象上执行加锁时,会尝试在该对象的监视器上进行加锁操作,只有成功获取锁之后,线程才会继续往下执行。线程获取到了监视器锁后,将继续执行 synchronized 代码块中的代码,如果代码块执行完成,或者抛出了异常,线程将会自动对该对象上的监视器执行解锁操作。

synchronized 修饰实例方法

synchronized 修饰实例方法,锁住的是实例对象的监视器,请看下面的例子。

public class Person {

	private String name;
	
	private Integer age;
	
	private Dog dog;

	public Person(String name, Integer age, Dog dog) {
		this.name = name;
		this.age = age;
		this.dog = dog;
	}

	public synchronized void setA() {
		System.out.println(Thread.currentThread().getName() + " 线程进入了 setA() 方法");
	}
	
	public synchronized void setB() {
		System.out.println(Thread.currentThread().getName() + " 线程进入了 setB() 方法");
	}
	
	// 省略 Getters 和 Setters...
}
public class Dog {

	private String name;
	
	private Integer age;

	public Dog(String name, Integer age) {
		this.name = name;
		this.age = age;
	}
	
	// 省略 Getters 和 Setters...
}
public static void main(String[] args) {
	Dog dog = new Dog("汪汪", 1);
	Person zhangsan = new Person("张三", 15, dog);
	Thread t1 = new Thread(() -> zhangsan.setA());
	Thread t2 = new Thread(() -> zhangsan.setA());
	t1.start();
	t2.start();
}

启动 2 个线程,共同访问张三的 setA() 方法,由于 setA() 方法被 synchronized 修饰,所以只会有一个线程获得张三的监视器锁,获得锁之后才有资格执行 setA() 方法,其它获取不到锁的线程都会阻塞。

image

如果 t2 线程访问的是 setB() 方法呢?

Thread t2 = new Thread(() -> zhangsan.setB());

也会发生多个线程争同一把锁的问题,因为 setA() 和 setB() 都是实例方法,synchronized 锁住的都是张三这个实例的监视器。

image

这时候的内存结构是这样的:

image

我画的不是很严谨,仅供参考,大家能理解就好。

如果 t2 线程访问的是李四的 setA() 方法呢?

Person lisi = new Person("李四", 16, dog);
Thread t2 = new Thread(() -> lisi.setA());

这时候这两个线程都能执行各自对象的 setA() 方法,因为 t1 线程竞争的是张三的监视器锁,t2 线程竞争的是李四的监视器锁。这是两个不同的锁了,自然不会发生多个线程争同一把锁的问题。

image

但是这样是有安全隐患的,别忘了张三和李四养的都是同一条狗,如果在 setA() 方法里对这条狗做了增删改操作,那么势必会引起线程安全问题。如果是这种情况,那么必须在狗对象里思考可能引起的线程安全问题,并作出相应的同步操作。

这时候的内存结构大概是这样的:

image

synchronized 修饰静态方法

Person 类新增两个静态方法

public synchronized static void setC() {
	System.out.println(Thread.currentThread().getName() + " 线程进入了 setC() 方法");
}

public synchronized static void setD() {
	System.out.println(Thread.currentThread().getName() + " 线程进入了 setD() 方法");
}

t1 和 t2 线程都访问 setC() 方法

Thread t1 = new Thread(() -> zhangsan.setC());
Thread t2 = new Thread(() -> zhangsan.setC());

前面说过,synchronized 修饰静态方法,锁住的是相应的 Class 对象的监视器。Class 对象是啥对象,干啥用的?怎么生成的?下面我就简单的说一下。

image

个人觉得学习 Java 有几座难以跨越的大山,除了并发这座大山之外,Class 对象也算得上一座大山了。Class 对象其实不是很好理解,只有理解了 Class 对象,才能理解类加载机制、类型机制、反射以及 synchronized 修饰静态方法等等重要的知识点。

简单的来说,Class 对象也就是 Class 类的对象,我们每写的一个 Java 类,都会编译成一个 .class 文件,这个文件包含了类的字段、方法、注解等等元信息。既然在 Java 里万物都是对象,那么 .class 文件就可以看成一个个的对象,哪个类的对象呢?就是 Class 类的对象。JVM 加载 .class 文件的时候,会在方法区生成一个个 Class 对象。然后我们每 new 一个实例对象,其实都是根据这个类对应的 Class 对象 new 出来的实例对象。不管你 new 多少个,这些实例对象都是根据 Class 对象这个模板生成的,这些实例对象都指向同一个 Class 对象。前面说对象头的时候,有个 Klass Words 空间,里面存的就是指向 Class 对象的指针,通过它就可以关联到 Class 对象。

image

所以 t1 和 t2 线程都访问 setC() 方法,由于 setC() 方法是静态方法,并且被 synchronized 修饰,锁的是张三对应的 Class 对象的监视器,所以会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是 setD() 方法呢?

Thread t2 = new Thread(() -> zhangsan.setD());

答案大家都想到了,也会发生多个线程争同一把锁的问题。因为 setC() 和 setD() 都是静态方法,synchronized 锁住的都是张三这个实例对应的 Class 对象的监视器,所以会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是 setA() 方法呢?

那这时候不会发生多个线程争同一把锁的问题。因为 t1 线程竞争的是张三的 Class 对象的监视器锁,t2 线程竞争的是张三这个实例对象的监视器锁。这是两个不同的锁了,自然不会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是李四的 setC() 方法呢?

Thread t2 = new Thread(() -> lisi.setC());

由于张三和李四的 Class 对象都是同一个,所以张三和李四的 setC() 方法锁的都是同一个 Class 对象的监视器,所以也会发生多个线程争同一把锁的问题。

image

这时候的内存结构图大概是这样的:

image

synchronized 代码块

Person 类新增 setE() 方法

public void setE() {
	synchronized (dog) {
		System.out.println(Thread.currentThread().getName() + " 线程进入了同步代码块");
	}
}

可以看到 setE() 方法有个同步代码块,锁住的是 dog 对象的监视器。

t1 和 t2 线程都访问张三的 setE() 方法

Thread t1 = new Thread(() -> zhangsan.setE());
Thread t2 = new Thread(() -> zhangsan.setE());

结果大家肯定想到了,肯定会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是李四的 setE() 方法呢?

Thread t2 = new Thread(() -> lisi.setE());

也会发生多个线程争同一把锁的问题。因为张三和李四养的都是同一条狗。

image

这时候的内存结构图大概是这样的:

image

如果李四养的是另一条狗呢?

Dog haha = new Dog("哈哈", 2);
Person lisi = new Person("李四", 16, haha);
Thread t2 = new Thread(() -> lisi.setE());

这时候就不会发生多个线程争同一把锁的问题。因为张三和李四的狗不是同一只,这两个线程抢的不是同一把锁。

image

这时候的内存结构图大概是这样的:

image

总结

总的来说就两点:

1.为什么需要 synchronized

因为在 Java 内存模型下,所有的共享变量都存储在主内存中,每条线程还有自己的工作内存,线程的工作内存保存的是用到的共享变量的主内存副本的拷贝。线程对共享变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。这就可能造成一个线程修改了一个变量的值,而另外一个线程还继续使用它在工作内存中的变量值的拷贝,造成数据的不一致。所以在多线程的环境中,必须要考虑多个线程访问临界资源的安全问题。可以使用 Java 语言提供的 synchronized 关键字或者 java.util.concurrent.locks 包下的各种锁(比如 ReentrantLock) 来保证线程操作的原子性。

对于 synchronized 来说,当线程进入 synchronized 控制的代码块中,这个线程对共享变量的读取从主内存中获取,退出 synchronized 块,会将修改后的数据刷到主内存中。

2.如何使用 synchronized

要分清楚 synchronized 锁的是哪个对象的监视器:

  • synchronized 修饰实例方法,那么这个实例对象的监视器会被锁定。

  • synchronized 修饰静态方法,那么这个实例对象对应的 Class 对象的监视器会被锁定。

  • synchronized 作用于代码块,那么括号里的对象的监视器会被锁定。

参考资料

https://javadoop.com/post/java-memory-model

https://www.cnblogs.com/ZoHy/p/11313155.html

https://zhuanlan.zhihu.com/p/138427106

4.学习 Kafka - 在 Spring MVC 中使用 Kafka

2020-05-22   龙德   Kafka   Kafka SpringMVC  

源码地址:https://github.com/miansen/kafka-example/tree/master/kafka-example-springmvc

引入依赖

Spring MVC 的依赖就不贴了,可以去源码里看。

既然是在 Spring MVC 应用中使用 Kafka,那肯定会有一个 spring-kafka 的玩意,一般 Spring 应用都是通过这种方式将第三方应用集成进来,简化开发,开箱即用。

<!-- 1)Kafka与 Spring 集成 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.0.5.RELEASE</version>
</dependency>

要注意 spring-kafka 与 Spring MVC 的版本,太高了可能会不兼容。

application.properties

application.properties 文件配置 kafka 用到的各个属性,这样就不用再代码里或者是 xml 配置文件里写死了,更灵活一点。

bootstrap.servers=192.168.197.6:9092
acks=all
retries=0
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=my-consumer-group-04
session.timeout.ms=15000
enable.auto.commit=true
auto.commit.interval.ms=1000

各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html

配置文件

既然是 Spring MVC 应用,那么 xml 配置文件也是必不可少的。虽然可以通过 Java Config 的形式配置,但是还是用 xml 配置比较好,这样直观一点,可以更明显的看出配置了哪个 bean,哪个 bean 跟 哪个 bean 关联。

我分两个配置文件,一个是 kafka-producer.xml,这个是配置生产者的。还有一个是 kafka-consumer.xml,这个是配置消费者的。

kafka-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

     <!-- 配置生产者的属性 -->  
     <bean id="kafkaProducerProperties" class="java.util.HashMap">  
        <constructor-arg>  
            <map>  
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="acks" value="${acks}"/>
                <entry key="retries" value="${retries}"/>  
                <entry key="batch.size" value="${batch.size}"/>  
                <entry key="linger.ms" value="${linger.ms}"/>  
                <entry key="buffer.memory" value="${buffer.memory}"/>  
                <entry key="key.serializer" value="${key.serializer}"/>  
                <entry key="value.serializer" value="${value.serializer}"/>
                <entry key="key.deserializer" value="${key.deserializer}"/>  
                <entry key="value.deserializer" value="${value.deserializer}"/> 
            </map>  
        </constructor-arg>  
     </bean>  
    
    <!-- 创建 kafkaProducerTemplate 需要使用的 kafkaProducerFactory bean -->
     <bean id="kafkaProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">  
        <constructor-arg ref="kafkaProducerProperties" /> 
     </bean>
     
     <!-- 创建 kafkaTemplate bean,使用的时候,只需要注入这个 bean,即可使用 template 的 send 消息方法 -->
     <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">  
        <constructor-arg ref="kafkaProducerFactory"/>
        <!--设置对应 topic-->
        <property name="defaultTopic" value="test01"/>
     </bean>
     
    <!-- 生产者 -->
    <bean id="kafkaProducerService" class="wang.miansen.kafka.springmvc.KafkaProducerService" />
</beans>

kafka-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

    <!-- 配置消费者的属性 -->
    <bean id="kafkaConsumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                <entry key="group.id" value="${group.id}" />
                <entry key="enable.auto.commit" value="${enable.auto.commit}" />
                <entry key="session.timeout.ms" value="${session.timeout.ms}" />
                <entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}" />
                <entry key="key.serializer" value="${key.serializer}" />
                <entry key="value.serializer" value="${value.serializer}" />
                <entry key="key.deserializer" value="${key.deserializer}" />
                <entry key="value.deserializer" value="${value.deserializer}" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 创建 kafkaConsumerFactory bean -->
    <bean id="kafkaConsumerFactory"
        class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg ref="kafkaConsumerProperties" />
    </bean>

    <!--具体监听的 bean -->
    <bean id="kafkaConsumerListener1" class="wang.miansen.kafka.springmvc.KafkaConsumerListener1" />
    <bean id="kafkaConsumerListener2" class="wang.miansen.kafka.springmvc.KafkaConsumerListener2" />

    <!-- 消费者容器配置信息 -->
    <bean id="kafkaMessageListenerContainerProperties1"
        class="org.springframework.kafka.listener.config.ContainerProperties">
        <!-- 订阅主题 -->
        <constructor-arg value="test01" />
        <property name="messageListener" ref="kafkaConsumerListener1" />
    </bean>
    <bean id="kafkaMessageListenerContainerProperties2"
        class="org.springframework.kafka.listener.config.ContainerProperties">
        <!-- 订阅主题 -->
        <constructor-arg value="test01" />
        <property name="messageListener" ref="kafkaConsumerListener2" />
    </bean>

    <!-- 配置消费者容器 -->
    <bean id="kafkaMessageListenerContainer1"
        class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
        <constructor-arg ref="kafkaConsumerFactory" />
        <constructor-arg ref="kafkaMessageListenerContainerProperties1" />
    </bean>
    <bean id="kafkaMessageListenerContainer2"
        class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
        <constructor-arg ref="kafkaConsumerFactory" />
        <constructor-arg ref="kafkaMessageListenerContainerProperties2" />
    </bean>

</beans>

业务代码

KafkaProducerService

public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String value) {
        sendMessage(null, value);
    }
    
    public void sendMessage(String key, String value) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.sendDefault(value);
        future.addCallback(success -> {
            RecordMetadata metadata = success.getRecordMetadata();
            System.out.println("生产者发送消息成功。topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
        }, failure -> {
            System.out.println("生产者发送消息失败,原因:" + failure.getMessage());
        });
    }
}

KafkaConsumerListener1

public class KafkaConsumerListener1 implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(String.format("消费者1收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(),
                record.partition(), record.offset(), record.key(), record.value()));
    }

}

KafkaConsumerListener2

public class KafkaConsumerListener2 implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(String.format("消费者2收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(),
                record.partition(), record.offset(), record.key(), record.value()));
    }

}

各个 bean 的作用都写上了注释,看一眼就明白是干啥用的了。如果你仔细看就会发现,在 Spring 应用中使用 Kafka,其实跟你用 kafka-clients 的 API 差不多的,只不过是 Spring 封一层,一般都会提供一个 xxxFactory 和 xxxTemplate 的 Bean,将第三方应用融合进 Spring 应用,以便开发者更容易上手。

在生产者的配置中,我配置了一个 kafkaProducerService bean,这个 bean 是生产者服务的接口,用来处理业务请求,这个 bean 注入了 Spring 提供的操作模板类 kafkaTemplate,这个类有很多操作 Kafka 的方法,通过它可以很方便的将消息发送到 Kafka。

kafkaTemplate 类的 send 方法会返回一个 Future 对象,这个对象接收两个对象,一个用来处理发送成功,一个用来处理发送失败。

在消费者的配置中,我配置了两个监听 bean:kafkaConsumerListener1 和 kafkaConsumerListener2。既然是监听的方式,那么说明消费者是异步消费的,当有消息进来时,会另起线程去处理消费逻辑。

测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:kafka-producer.xml", "classpath:kafka-consumer.xml"})
public class ApplicationTest {

    @Autowired
    private KafkaProducerService producerService;
    
    @Test
    public void sendMessageTest() throws Exception {
        for (int i = 0; i < 10; i++) {
            producerService.sendMessage(Integer.toString(i), "hello kafka-" + i);
        }
        System.in.read(); // 让 main 线程阻塞一下,否则消费者线程可能会来不及消费就死了
    }
}

生产者向 Kafka 发送数据,我这里指定了 key。前面说过,如果不指定分区,但是指定了 key,那么 Kafka 会将 key hash,然后取模,根据模数决定消息分配到哪个分区。所以这 10 条数据会分散到各个分区存储。

控制台输出信息如下,为了方便查看,我截成了两部分。

生产者发送消息成功。topic: test01, partition: 0, offset: 880
生产者发送消息成功。topic: test01, partition: 0, offset: 881
生产者发送消息成功。topic: test01, partition: 0, offset: 882
生产者发送消息成功。topic: test01, partition: 1, offset: 714
生产者发送消息成功。topic: test01, partition: 1, offset: 715
生产者发送消息成功。topic: test01, partition: 1, offset: 716
生产者发送消息成功。topic: test01, partition: 2, offset: 731
生产者发送消息成功。topic: test01, partition: 2, offset: 732
生产者发送消息成功。topic: test01, partition: 2, offset: 733
生产者发送消息成功。topic: test01, partition: 2, offset: 734

可以看到这 10 条消息均匀的分布在 3 个分区里。

我配置了两个消费者,这两个消费者都位于一个消费者组下,所以这两个消费者会轮流消费,而且只会消费到分配给自己的分区,不会重复消费相同的分区。

image

可以看到有两条线程在处理消息的监听。

消费者1收到了消息。topic: test01, partition: 0, offset: 880, key: null, value: hello kafka-2
消费者1收到了消息。topic: test01, partition: 0, offset: 881, key: null, value: hello kafka-5
消费者1收到了消息。topic: test01, partition: 0, offset: 882, key: null, value: hello kafka-8
消费者2收到了消息。topic: test01, partition: 2, offset: 731, key: null, value: hello kafka-0
消费者2收到了消息。topic: test01, partition: 2, offset: 732, key: null, value: hello kafka-3
消费者2收到了消息。topic: test01, partition: 2, offset: 733, key: null, value: hello kafka-6
消费者2收到了消息。topic: test01, partition: 2, offset: 734, key: null, value: hello kafka-9
消费者1收到了消息。topic: test01, partition: 1, offset: 714, key: null, value: hello kafka-1
消费者1收到了消息。topic: test01, partition: 1, offset: 715, key: null, value: hello kafka-4
消费者1收到了消息。topic: test01, partition: 1, offset: 716, key: null, value: hello kafka-7

可以看到 “消费者1” 消费了 0 号和 1 号分区,”消费者2” 只消费了 2 号分区。再次强调,这两个消费者位于同一个消费者组下,它们只会消费分配给自己的分区,不会消费他人的分区,也就是说不会重复消费相同的分区。

为了更直观的对比,加深印象,我们再配置一个 “消费者3”,不过这个 “消费者3” 位于不同的消费者组下。

具体配置就不贴出来了,可以到源码里看。

我们直接看结果,控制台输出如下,为了方便查看,我截成了两部分。

生产者发送消息成功。topic: test01, partition: 0, offset: 887
生产者发送消息成功。topic: test01, partition: 0, offset: 888
生产者发送消息成功。topic: test01, partition: 0, offset: 889
生产者发送消息成功。topic: test01, partition: 1, offset: 720
生产者发送消息成功。topic: test01, partition: 1, offset: 721
生产者发送消息成功。topic: test01, partition: 1, offset: 722
生产者发送消息成功。topic: test01, partition: 1, offset: 723
生产者发送消息成功。topic: test01, partition: 2, offset: 738
生产者发送消息成功。topic: test01, partition: 2, offset: 739
生产者发送消息成功。topic: test01, partition: 2, offset: 740

可以看到生成了 10 条消息,均匀的分布在 3 个分区里。

消费者1收到了消息。topic: test01, partition: 0, offset: 887, key: null, value: hello kafka-1
消费者1收到了消息。topic: test01, partition: 0, offset: 888, key: null, value: hello kafka-4
消费者1收到了消息。topic: test01, partition: 0, offset: 889, key: null, value: hello kafka-7
消费者3收到了消息。topic: test01, partition: 0, offset: 887, key: null, value: hello kafka-1
消费者3收到了消息。topic: test01, partition: 0, offset: 888, key: null, value: hello kafka-4
消费者3收到了消息。topic: test01, partition: 0, offset: 889, key: null, value: hello kafka-7
消费者2收到了消息。topic: test01, partition: 2, offset: 738, key: null, value: hello kafka-2
消费者2收到了消息。topic: test01, partition: 2, offset: 739, key: null, value: hello kafka-5
消费者2收到了消息。topic: test01, partition: 2, offset: 740, key: null, value: hello kafka-8
消费者3收到了消息。topic: test01, partition: 1, offset: 720, key: null, value: hello kafka-0
消费者3收到了消息。topic: test01, partition: 1, offset: 721, key: null, value: hello kafka-3
消费者3收到了消息。topic: test01, partition: 1, offset: 722, key: null, value: hello kafka-6
消费者3收到了消息。topic: test01, partition: 1, offset: 723, key: null, value: hello kafka-9
消费者3收到了消息。topic: test01, partition: 2, offset: 738, key: null, value: hello kafka-2
消费者3收到了消息。topic: test01, partition: 2, offset: 739, key: null, value: hello kafka-5
消费者3收到了消息。topic: test01, partition: 2, offset: 740, key: null, value: hello kafka-8
消费者1收到了消息。topic: test01, partition: 1, offset: 720, key: null, value: hello kafka-0
消费者1收到了消息。topic: test01, partition: 1, offset: 721, key: null, value: hello kafka-3
消费者1收到了消息。topic: test01, partition: 1, offset: 722, key: null, value: hello kafka-6
消费者1收到了消息。topic: test01, partition: 1, offset: 723, key: null, value: hello kafka-9

可以看到 “消费者1” 消费了 0 号和 1 号分区,”消费者2” 只消费了 2 号分区。这两个消费者位于同一个消费者组下,所以不会重复消费同一个分区。

而 “消费者3” 位于不同的消费者组下,其它组对它不影响,跟它没关系。并且这个组只有它一个消费者,所以 “消费者3” 可以大饱口福,消费所有的分区。

3.学习 Kafka - Java 客户端

2020-05-21   龙德   Kafka   Kafka  

引入客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

生产者

阻塞的生产者

生产者代码如下:

public class CustomProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1.配置生产者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#producerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2.实例化一个生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 3.实例化一条消息,第一个参数是主题的名称,第二个参数是消息的内容
            ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
            // 4.发送消息
            Future<RecordMetadata> future = producer.send(record);
            // 5.获取发送的结果,注意这个方法会阻塞当前线程
            RecordMetadata metadata = future.get();
            System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
        }
        // 6.关闭资源
        producer.close();
    }
}

带有回调的生产者

前面说过,future.get() 方法会阻塞当前线程,这样会影响到后面代码的执行,好在 Kafka 客户端提供了回调的方法。

代码如下:

public class CallbackProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1.配置生产者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#producerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2.实例化一个生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 3.实例化一条消息,第一个参数是主题的名称,第二个参数是消息的内容
            ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
            // 4.发送消息
            producer.send(record, new Callback() {
                // 5.在回调函数里处理结果,这样就不会阻塞了
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });
        }
        // 6.关闭资源
        producer.close();
    }
}

消息发送到哪个分区?

image

如上图所示,ProducerRecord 类的构造函数有很多重载,根据构造函数来决定消息的分区,规则如下:

  • 如果不指定分区和 key,那么消息会轮询负载均衡到每个分区上。
// 轮询负载均衡到每个分区上
ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
  • 如果不指定分区,但是指定了 key,那么会将 key hash,然后取模,根据模数决定分区。
// 消息发送到哪个分区取决于 key
ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i), Integer.toString(i));
  • 如果指定分区,那么直接发送到指定的分区。
// 指定消息发送到第 2 个分区
ProducerRecord<String,String> record = new ProducerRecord<>("test01", 2, Integer.toString(i), Integer.toString(i));

消费者

自动提交偏移量

public class AutoCommitConsumer {

    public static void main(String[] args) {
        // 1.配置消费者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#consumerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("group.id", "my-consumer-group-01");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 2.实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 3.订阅主题(可以订阅多个)
        consumer.subscribe(Arrays.asList("test01"));
        // 4.循环消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value()));
            }
        }
    }
}

通过配置 enable.auto.commit,表明这是个自动提交偏移量,偏移量由 auto.commit.interval.ms 控制自动提交的频率。

通过配置 bootstrap.servers 指定集群的一个或多个 broker。不用指定全部的 broker,它将自动发现集群中的其余的 borker(最好指定多个,万一有服务器故障)。

在这个例子中,客户端订阅了主题 test01。消费者组叫 my-consumer-group-01。

broker 通过心跳机制自动检测 my-consumer-group-01 组中失败的消费者实例,消费者实例会自动 ping 集群,告诉集群它还活着。只要消费者实例能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过 session.timeout.ms 设置的值,那么就会认为是故障的,它的分区将被分配到别的消费者实例。

通过配置 deserializer 如何把 byte 转成 object 类型,例子中,通过指定 string 解析器,我们告诉获取到的消息的 key 和 value 只是简单个 string 类型。

手动提交偏移量

通过设置 props.put(“enable.auto.commit”, “false”); 改为手动提交偏移量。

然后在消费消息后手动提交。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
    consumer.commitSync();
}

为什么需要手动提交?

可以想象一个场景:读取消息后,将它们批量插入到数据库中。如果我们设置 offset 自动提交,消费将被认为是已消费的。这样会出现问题,因为消息被插入到数据库之前可能会失败,这样我们就会丢失部分的数据。

List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= 100) {
        try {
            // 插入数据库
            insertIntoDb(buffer);
        } catch(Exception e) {
            // 如果插入数据库失败
            // TODO
        }
        // 插入数据库成功再手动提交
        consumer.commitSync();
        buffer.clear();
    }
}

订阅指定的分区

在前面的例子中,我们订阅感兴趣的 topic,让 Kafka 提供给我们平分后的分区。但是,在有些情况下,你可能需要自己来控制分配指定分区。

要使用此模式,你只需调用 assign(Collection partitions) 方法消费指定的分区即可:

// 订阅分区
TopicPartition partition0 = new TopicPartition("test01", 0);
TopicPartition partition1 = new TopicPartition("test01", 1);
consumer.assign(Arrays.asList(partition0, partition1));

注意订阅主题和订阅分区是互斥的。

控制消费的位置

大多数情况下,消费者只是简单的从头到尾的消费消息,周期性的提交 offset(自动或手动)。Kafka 也支持消费者去手动的控制消费的位置,可以消费之前的消息也可以跳过最近的消息。

要使用此模式,你只需调用 seek(TopicPartition partition, long offset) 方法消费指定的位置即可:

TopicPartition partition0 = new TopicPartition("test01", 0);
consumer.seek(partition0, 784);

结果如下图:

image

2.学习 Kafka - 重要概念

2020-05-20   龙德   Kafka   Kafka  

由于搭建集群的时候,3 台虚拟机都在前台开启了 Kafka,占用了窗口,所以我这里再开启两个窗口,分别用来操作 Kafka 和 Zookeeper。

image

下面的操作都在这两个窗口上进行。

主题(Topic)

如果把 Kafka 比作 MySQL 的话,那么主题就相当于数据库,所以想用 Kafka,就必须先创建主题,就好比我们想用 MySQL,就必须先创建数据库一样。

使用这个命令创建主题:

/usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.197.6:2181 --replication-factor 3 --partitions 3 --topic test01

关于这条命令的解释:

–zookeeper:指定 zookeeper 集群的 ip 地址和端口

–topic:创建名为 test01 的主题

–partitions:test01 这个主题分配 3 个分区

–replication-factor:每个分区分配 3 个副本,也就是每个分区备份 3 份

主题在 Zookeeper 中是如何体现的呢,我们进入 Zookeeper 看一下。

使用命令 ls /brokers/topics,可以看到刚才创建的 test01 主题在 Zookeeper 中已经创建了。

image

使用命令 /brokers/topics/test01/partitions,可以看到分区也创建了。

image

分区(partition)

主题只是一个逻辑的概念,在物理上并不存在,消息也不是存在主题里的,而是存在分区里的。分区是 Kafka 消息存储的最小单元,在物理上对应着一个个真实的文件。

我们上面新建了一个 test01 主题,分配了 3 个分区,这 3 个分区其实就是一个个的文件夹。我们可以进入 log.dirs 目录

cd /usr/local/kafka_2.13-2.5.0/logs/

image

可以看到创建了 3 个文件夹,分别对应 3 个分区,命名规则是 topic名称-N,N 就是分区的个数。后面生产者发送给 test01 主题的数据都会存在这 3 个文件夹里。

进入 test01 主题的第一个分区看一下

image

可以看到每个分区文件夹,都包含以下 4 类文件:

  • xxxxxxxxx.index:索引文件
  • xxxxxxxxx.log:数据文件
  • xxxxxxxxx.timeindex:时间戳索引文件
  • leader-epoch-checkpoint:保存了每一任 leader 开始写入消息时的 offset,会定时更新,当 follower 被选为 leader 时会根据这个确定哪些消息可用

我们重点看 .index .log .timeindex 这 3 个文件。

这 3 个文件都是成对出现的,每一对都叫做 segment。也就是说分区中的数据是分段存储,一个分区(partition)被切割成多个相同大小的段(segment)(这个是由 log.segment.bytes 决定,控制每个 segment 的大小)。

segment 文件命名规则:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

如下图所示:

image

通过上面的内容,我们可以这样总结:

在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,目录的命名规则为 topic名称+有序序号。第一个序号从 0 开始计,最大的序号为 partition 数量减 1。partition 是实际物理上的概念,而 topic 是逻辑上的概念,topic 更像是一个消息的类别。由于生产者生产的消息会不断追加到 log 文件的末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 将 partition 细分为 segment,一个 partition 物理上由多个 segment 组成。每个 segment 由 3 个文件组成,分别是 index 文件、log 文件和 timeindex 文件,这 3 个文件的命名规则是:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。

偏移量(offset)

分区中的每一条记录都会分配一个 id 号来表示顺序,我们称之为 offset,offset 用来唯一的标识分区中每一条记录。

每一个消费者中唯一保存的元数据是 offset(偏移量)即消费在 log 中的位置。偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据。也可以跳过最近的记录,从 “现在” 开始消费。

特别说明的是,老版本的消费者是依赖 Zookeeper 的,当启动一个消费者时向 Zookeeper 注册。新版本消费者去掉了对 Zookeeper 的依赖,而是由消费组协调器统一管理,已消费的消息偏移量提交后会保存到名为 “__consumer_offsets” 文件夹中。

image

Kafka 的 offset 是分区内有序的,但是在不同分区中是无序的,Kafka 不保证数据的全局有序。

关于 offset,可以通过以下几个问题来理解:

1.为啥 offset 在分区内是有序的?

因为分区的结构是队列,先进先出,所以保证了有序。

2.为啥在不同分区中是无序的?

因为消息写入哪一个分区不是固定的,有可能第一条消息写入第一个分区,第二条消息写入第二个区分。。。

3.如果有业务场景需要保证数据的有序呢?该如何做?

可以这样做:创建主题的时候只分配一个分区,这样就能保证数据的顺序性了。

但是只分配一个分区,又跟 Kafka 的分布式冲突,所以还可以这样做:

创建主题的时候分配多个分区,读取消息时,在代码层面处理消息的顺序性。

broker

Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。一个 broker 有多个 topic,一个 topic 有多个 partition,一个 partition 有多个 segment。

image

生产者(producer)

Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为 message(消息)发送到 Kafka 集群。默认情况下,每行将作为单独的 message 发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

[root@localhost kafka_2.13-2.5.0]# /usr/local/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 192.168.197.6:9092 --topic test01
>This is a message
>This is another message
>

关于这条命令的解释:

–broker-list:设置 kafka 集群地址

–topic:设置生产消息的主题

消费者(consumer)

Kafka 还有一个命令行 consumer(消费者),将消息转储到标准输出。

/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.12:9092 --topic test01 --from-beginning
This is a message
This is another message

关于这条命令的解释:

–bootstrap-server:设置 kafka 集群地址

–topic:设置消费消息的主题

–from-beginning:设置消息起始位置开始消费。默认是从新位置 latest 开始消费。

消费组(consumer-groups)

消费者使用一个 “消费组” 名称来进行标识,发布到 topic 中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。

如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。

如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。

image

如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。

通常情况下,每个 topic 都会有一些消费组,一个消费组对应一个”逻辑订阅者”。一个消费组由许多消费者实例组成,便于扩展和容错。这就是发布和订阅的概念,只不过订阅者是一组消费者而不是单个的进程。

在 Kafka 中实现消费的方式是将分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由 Kafka 协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区。如果一个实例消失或者故障,拥有的分区将被分发到剩余的实例。这被称为重新平衡分组。

Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序。每个 partition 分区按照key值排序足以满足大多数应用程序的需求。但如果你需要总记录在所有记录的上面,可使用仅有一个分区的主题来实现,这意味着每个消费者组只有一个消费者进程。

创建消费者时如果不指定消费组,那么就会默认分配一个,可以通过这个命令查看有哪些消费组。

/usr/local/kafka_2.13-2.5.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.197.6:9092 --list

image

如图,因为我启动了 4 个消费者,所以有 4 个消费组。由于这 4 个消费者都属于不同的消费组,所以他们都能收到 test01 主题的所有消息。

同一个消费组里的消费者消费唯一的分区,不可以重复消费。

打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。

消费组 my-consumer-group-01 有 3 个消费者,分别是 my-consumer-01,my-consumer-02,my-consumer-03。

那么,Kafka 会负载均衡的将分区划分给同一消费组里的消费者。

image

如上图所示,此时,test01-0 分区已经被第一个消费者消费了,那么它就不能被这个组里的其它消费者消费。除非第一个消费者挂了,那么它就会被重新分配给其它的消费者。

我们启动 3 个消费者来亲自感受一下。

使用这个命令可以指定消费者的组。

/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.6:9092 --consumer-property group.id=my-consumer-group-01 --consumer-property client.id=my-consumer-01 --topic test01

--consumer-property group.id=my-consumer-group-01 指定消费者的组是 my-consumer-group-01。

--consumer-property client.id=my-consumer-01 指定消费者的 ID 是 my-consumer-01。

image

如上图所示,我启动了 3 个消费者,这 3 个消费者都位于同一个组里。然后生产者往 test01 这个主题里发送消息,可以看到:对于同一个消费组而言,同一个时刻,只有一个消费者在消费消息,而且每个消费者消费唯一的分区,不可以重复消费。

不同消费组里的消费者可以消费相同的分区,可以重复消费。

打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。

消费组 my-consumer-group-01 有 2 个消费者,分别是 my-consumer-01 和 my-consumer-02。

消费组 my-consumer-group-02 有 1 个消费者,是 my-consumer-03。

image

如上图所示,my-consumer-01 和 my-consumer-02 位于同一个组,那么它们无法重复消费同一个分区,并且同一个时刻,只能有一个消费者消费。

虽然 test01-0,test01-1,test01-2 这 3 个分区已经被 my-consumer-01 和 my-consumer-02 消费过了,但是 my-consumer-03 位于不同的组,所以其它组对它是不影响的,它是独立的,可以消费其它组消费过的分区。

image

如上图所示,我启动了 3 个消费者,前两个消费者同一个组,第三个消费者另一个组。

由于 my-consumer-01 和 my-consumer-02 位于同一个组,所以它们只能消费分配给自己的分区,不能重复消费。而且同一个时刻只能有一个消费者消费,所以看起来就像是轮流消费。而 my-consumer-03 位于不同的组,即使分区已经被其它组的消费者消费过了,但是它还是能消费的。而且这个组只有它一个消费者,所以它消费了所有分区的消息。

1.学习 Kafka - 搭建集群环境

2020-05-18   龙德   Kafka   Kafka  

前提条件

请准备 3 台虚拟机,以下是我准备的:

image

安装 JDK1.8

Zookeeper 和 Kafka 都依赖 JDK,所以安装 JDK 是必须的。

到官网 https://www.oracle.com/java/technologies/javase-downloads.html 下载 JDK1.8。

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp jdk-8u251-linux-x64.tar.gz root@192.168.197.6:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.10:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.12:/usr/local/src

接着解压 jdk-8u251-linux-x64.tar.gz 压缩包到 /usr/local 目录。

tar zxvf /usr/local/src/jdk-8u251-linux-x64.tar.gz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set jdk environmentexport
export JAVA_HOME=/usr/local/jdk1.8.0_251
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

最后执行 java -version 命令,验证是否安装成功。输出以下内容则说明安装成功:

java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

搭建 Zookeeper 集群

由于 Kafka 的 broker 的状态、消费者的消费状态、偏移量以及消费者组等是交给 Zookeeper 维护的,所以安装 Kafka 之前必须先安装 Zookeeper。

安装 Zookeeper

到官网 https://zookeeper.apache.org/releases.html 下载 Zookeeper,我们下载 3.4.14 版本的。

image

image

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp zookeeper-3.4.14.tar.gz root@192.168.197.6:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.10:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.12:/usr/local/src

接着解压 zookeeper-3.4.14.tar.gz 压缩包到 /usr/local 目录。

tar zxvf /usr/local/src/zookeeper-3.4.14.tar.gz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set zookeeper environmentexport
export ZK_HOME=/usr/local/zookeeper-3.4.14
export PATH=$ZK_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

修改 Zookeeper 的配置文件

将 zoo_sample.cfg 文件备份并重命名为 zoo.cfg

cp /usr/local/zookeeper-3.4.14/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.14/conf/zoo.cfg

编辑 zoo.cfg 文件

vi zoo.cfg

修改数据文件夹路径

dataDir=/usr/local/zookeeper-3.4.14/data

添加日志文件夹路径

dataLogDir=/usr/local/zookeeper-3.4.14/logs

在文件末尾添加集群信息

server.1=192.168.197.6:2888:3888
server.2=192.168.197.10:2888:3888
server.3=192.168.197.12:2888:3888

image

创建数据文件夹

mkdir /usr/local/zookeeper-3.4.14/data

创建日志文件夹

mkdir /usr/local/zookeeper-3.4.14/logs

创建 myid 文件,在 myid 文件中添加本机的 server ID

echo "1" > /usr/local/zookeeper-3.4.14/data/myid

请注意,每台虚拟机的 myid 要跟 server ID 对应。

开放端口

每台虚拟机都开放以下端口,如果不开放的话,Zookeeper 集群无法与其它节点通信,启动会报错。

# 2181 端口对 client 端提供服务
firewall-cmd --zone=public --add-port=2181/tcp --permanent
# 2888 端口是集群内机器相互通信使用(由 Leader 负责监听此端口)
firewall-cmd --zone=public --add-port=2888/tcp --permanent
# 3888 端口是选举 leader 使用的
firewall-cmd --zone=public --add-port=3888/tcp --permanent

使端口生效

firewall-cmd --reload

启动 3 台虚拟机的 Zookeeper

使用以下命令分别启动每台虚拟机上的 Zookeeper。

/usr/local/zookeeper-3.4.14/bin/zkServer.sh start

查看 Zookeeper 的状态

/usr/local/zookeeper-3.4.14/bin/zkServer.sh status

如果输出以下信息,则说明 Zookeeper 集群搭建成功

192.168.197.6:

image

192.168.197.10:

image

192.168.197.12:

image

通过以上图片可以知道,3 台虚拟机上的 Zookeeper 已经成功启起来了。其中 192.168.197.10 节点被选举为 leader 节点,其它两个节点是 follower 节点。每个节点都监听 3888 端口,当 leader 挂掉时,其它节点会通过此端口相互通信,选举出新的 leader。

2888 端口是集群内机器相互通信使用的,由 Leader 负责监听此端口。

搭建 Kafka 集群

安装 Kafka

到官网 http://kafka.apache.org/downloads 下载 Kafka,我们下载 2.5.0 版本的。

image

image

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp kafka_2.13-2.5.0.tgz root@192.168.197.6:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.10:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.12:/usr/local/src

接着解压 kafka_2.13-2.5.0.tgz 压缩包到 /usr/local 目录。

tar zxvf kafka_2.13-2.5.0.tgz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set kafka environment
export KAFKA_HOME=/usr/local/kafka_2.13-2.5.0
export PATH=$KAFKA_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

修改 Kafka 的配置文件

修改 192.168.197.6 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=0
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.6:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

修改 192.168.197.10 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=1
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.10:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

修改 192.168.197.12 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=2
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.12:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

然后每个节点都创建 logs 文件夹

mkdir /usr/local/kafka_2.13-2.5.0/logs

开放端口

每台虚拟机都开放以下端口

# 9092 端口是 Kafka 对 client 端提供服务的端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent

使端口生效

firewall-cmd --reload

启动 3 台虚拟机的 Kafka

使用以下命令分别启动每台虚拟机上的 Kafka。

/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.5.0/config/server.properties &

上面那个命令是前台启动,会一直占用窗口,你也可以使用后台启动,命令如下:

/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/server.properties

如果输出以下信息,则说明启动成功

image

总结

最后来总结以下每台虚拟机安装的内容。

查看服务启动情况

使用这个命令查看服务启动情况:

jps

image

从上图可以看到安装了 Kafka 和 Zookeeper,并且都启动了。

查看端口占用情况

使用这个命令查看端口占用情况:

netstat -tunlp|egrep "(2181|2888|3888|9092)"

image

  • 2888 端口:此端口是 Zookeeper 集群内机器相互通信使用的,由 Leader 节点负责监听此端口。
  • 3888 端口:Zookeeper 集群的每个节点都监听 3888 端口,当 leader 挂掉时,其它节点会通过此端口相互通信,选举出新的 leader。
  • 2181 端口:Zookeeper 对 client 端提供服务的端口。
  • 9092 端口:Kafka 对 client 端提供服务的端口。

查看 Zookeeper 的目录情况

随便在一个节点上,使用以下命名,查看 Zookeeper 的目录情况。

使用客户端进入 Zookeeper

/usr/local/zookeeper-3.4.14/bin/zkCli.sh

image

使用 ls / 命令查看 Zookeeper 的目录情况。

image

上面的显示结果中:只有 zookeeper 目录是 Zookeeper 原生的,其它都是 Kafka 创建的。

关于各个目录的作用,后面再说。

至此,Kafka 的集群环境就搭建成功了。

17.学习ActiveMQ-在SpringBoot中使用ActiveMQ

2020-05-15   龙德   ActiveMQ   ActiveMQ SpringBoot  

新建工程

新建一个 Maven 工程,我的工程叫 springboot-activemq-example。

pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/>
    </parent>
    
    <groupId>wang.miansen</groupId>
    <artifactId>springboot-activemq-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>springboot-activemq-example</name>
    <description>activemq project for Spring Boot</description>
    
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>

    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
        <!--消息队列连接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

server.port=8080
# 配置消息的类型,如果是true则表示为topic消息,如果为false表示Queue消息
spring.jms.pub-sub-domain=true
# 连接用户名
spring.activemq.user=admin
# 连接密码
spring.activemq.password=admin
# 连接主机信息
spring.activemq.brokerUrl=tcp://localhost:61616
# 连接池的最大连接数
jms.pool.maxConnections=10

新建配置类 ActiveMQConfig

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }
    
    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }
    
    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }
    
}

队列生产者

@Component
public class QueueProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-queue-1")
    private Queue queue1;
    
    @Resource(name = "springboot-activemq-queue-2")
    private Queue queue2;
    
    // 发送消息到队列1
    public void sendQueueMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue1, mapMessage);
    }
    
    // 发送消息到队列2
    public void sendQueueMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue2, mapMessage);
    }

}

队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

@Component
public class TopicProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-topic-1")
    private Topic topic1;
    
    @Resource(name = "springboot-activemq-topic-2")
    private Topic topic2;
    
    // 发送消息到主题1
    public void sendTopicMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic1, mapMessage);
    }
    
    // 发送消息到主题2
    public void sendTopicMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic2, mapMessage);
    }

}

主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

测试

如果要测试主题,那么需要修改配置 spring.jms.pub-sub-domain=true

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private QueueProducer queueProducer;
    
    @Autowired
    private TopicProducer topicProducer;
    
    @Test
    public void sendQueueMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息1");
        queueProducer.sendQueueMessage1(map);
    }
    
    @Test
    public void sendQueueMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息2");
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
    }
    
    @Test
    public void sendTopicMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息1");
        topicProducer.sendTopicMessage1(map);
    }
    
    @Test
    public void sendTopicMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息2");
        topicProducer.sendTopicMessage2(map);
    }
    
}

同时配置队列和主题

上面说过,如果想要使用主题,那么需要修改配置文件,这样不方便。我们可以在 ActiveMQConfig 类里同时配置队列和主题。

修改 ActiveMQConfig 类:

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    @Value("${spring.activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String user;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${jms.pool.maxConnections}")
    private int maxConnections;

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }

    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }

    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }

    // 真正可以产生连接的工厂
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(user, password, brokerUrl);
    }

    // ActiveMQ 提供的连接池
    @Bean
    public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        PooledConnectionFactory pool = new PooledConnectionFactory();
        pool.setConnectionFactory(connectionFactory);
        pool.setMaxConnections(maxConnections);
        return pool;
    }

    // 队列监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable1(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-1");
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable2(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-2");
        return factory;
    }
    
    @Bean
    public JmsMessagingTemplate jmsMessagingTemplate(PooledConnectionFactory connectionFactory){
        return new JmsMessagingTemplate(connectionFactory);
    }

}

修改队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

修改主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1", containerFactory = "jmsListenerContainerTopic")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable1")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

16.学习ActiveMQ-在SpringMVC中使用ActiveMQ

2020-05-13   龙德   ActiveMQ   ActiveMQ SpringMVC  

新建工程

新建一个 Maven 工程,我的工程叫 springmvc-activemq-example。

pom.xml 文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>wang.miansen</groupId>
    <artifactId>springmvc-activemq-example</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>springmvc-activemq-example Maven Webapp</name>
    <url>http://maven.apache.org</url>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
    
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        
        <!-- 1)Spring核心 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 2)Spring DAO层 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 3)Spring web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 4)Spring test -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>

        <!-- 1)ActiveMQ 核心 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

        <!-- 2)ActiveMQ与 Spring 集成 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>

        <!--3)ActiveMQ所需要的pool包 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>springmvc-activemq-example</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <!-- web.xml版本 -->
                <version>3.1</version>
                <configuration>
                    <!-- jdk版本 -->
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

队列生产者

新建队列生产者 QueueProducer 类:

public class QueueProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

队列消费者

新建 3 个队列消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class QueueConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

新建主题生产者 TopicProducer 类:

public class TopicProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Topic topic, final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(topic, mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

主题消费者

新建 3 个主题消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class TopicConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

application.properties

新建 application.properties 文件,配置 ActiveMQ 的相关信息。

# jms服务器地址
jms.broker.url=tcp://localhost:61616
# 连接用户名
jms.broker.userName=admin
# 连接密码
jms.broker.password=admin
# 使用异步发送
jms.broker.useAsyncSend=true
# 连接池的最大连接数
jms.pool.maxConnections=10

application-context.xml

新建 application-context.xml 文件,配置各个 Bean。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

    <!-- 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker.url}"/>
        <property name="userName" value="${jms.broker.userName}"></property>  
        <property name="password" value="${jms.broker.password}"></property>  
        <property name="useAsyncSend" value="${jms.broker.useAsyncSend}" />  
    </bean>
    
    <!-- 连接池 -->
    <!-- ActiveMQ 为我们提供了一个 PooledConnectionFactory,通过往里面注入一个 ActiveMQConnectionFactory   
        可以用来将 Connection、Session 和  MessageProducer 池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool 包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <!--真正可以产生连接的连接工厂,由对应的 JMS 服务厂商提供-->
        <property name="connectionFactory" ref="activeMQConnectionFactory" />
        <!--连接池的最大连接数-->
        <property name="maxConnections" value="${jms.pool.maxConnections}"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 队列和非持久化订阅使用 -->
    <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory2" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory3" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
    </bean>
    
    <!--队列1-->
    <bean id="activeMQQueue1" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-1" />
    </bean>
    
    <!--队列2-->
    <bean id="activeMQQueue2" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-2" />
    </bean>
    
    <!--主题1-->
    <bean id="activeMQTopic1" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-1"/>
    </bean>
    
    <!--主题2-->
    <bean id="activeMQTopic2" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-2"/>
    </bean>
    
    <!-- 队列 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQQueue1"/>
        <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false"/>
    </bean>
    
    <!-- 主题 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateTopic" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQTopic1"/>
         <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true"/>
    </bean>
    
    <!--队列监听容器1,一经注册,自动监听-->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue1" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer1" />
    </bean>
    
    <!--队列监听容器2,一经注册,自动监听-->
    <bean id="queueListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer2" />
    </bean>
    
    <!--队列监听容器3,注意目的地跟容器2一样-->
    <bean id="queueListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer3" />
    </bean>
    
    <!--主题监听容器1(非持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic1" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="false" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer1" />
    </bean>
    
    <!--主题监听容器2(持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory2" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer2" />
    </bean>
    
    <!-- 主题监听容器3(持久化订阅),注意目的地跟容器2一样 -->
    <bean id="topicListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory3" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer3" />
    </bean>
    
    <!-- 队列生产者 -->
    <bean id="queueProducer" class="com.example.activemq.QueueProducer">
        <property name="jmsTemplate" ref="jmsTemplateQueue" />
    </bean>
    
    <!-- 主题生产者 -->
    <bean id="topicProducer" class="com.example.activemq.TopicProducer">
        <property name="jmsTemplate" ref="jmsTemplateTopic" />
    </bean>
    
    <!-- 队列消费者1 -->
    <bean id="queueConsumer1" class="com.example.activemq.QueueConsumer1" />
    <!-- 队列消费者2 -->
    <bean id="queueConsumer2" class="com.example.activemq.QueueConsumer2" />
    <!-- 队列消费者3 -->
    <bean id="queueConsumer3" class="com.example.activemq.QueueConsumer3" />
    
    <!-- 主题消费者1 -->
    <bean id="topicConsumer1" class="com.example.activemq.TopicConsumer1" />
    <!-- 主题消费者2 -->
    <bean id="topicConsumer2" class="com.example.activemq.TopicConsumer2" />
    <!-- 主题消费者3 -->
    <bean id="topicConsumer3" class="com.example.activemq.TopicConsumer3" />
    
    <!-- 消息转换器 -->
    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</beans>

项目结构

最后的项目结构如下:

image

测试

新建测试类 ApplicationTest

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:application-context.xml"})
public class ApplicationTest {

    @Resource(name = "activeMQQueue1")
    private Queue queue1;

    @Resource(name = "activeMQQueue2")
    private Queue queue2;

    @Resource(name = "activeMQTopic1")
    private Topic topic1;

    @Resource(name = "activeMQTopic2")
    private Topic topic2;

    @Autowired
    private QueueProducer queueProducer;

    @Autowired
    private TopicProducer topicProducer;

    private Map<String, Object> mapMessage;

    @Before
    public void initMessage() throws Exception {
        mapMessage = new HashMap<>();
        mapMessage.put("message", "Hello ActiveMQ - " + new Date().getTime());
    }

    // 发送消息到队列1
    @Test
    public void queue1() throws Exception {
        queueProducer.sendMessage(queue1, mapMessage);
    }

    // 发送消息到队列2
    @Test
    public void queue2() throws Exception {
        queueProducer.sendMessage(queue2, mapMessage);
    }

    // 发送消息到主题1
    @Test
    public void topic1() throws Exception {
        topicProducer.sendMessage(topic1, mapMessage);
    }

    // 发送消息到主题2
    @Test
    public void topic2() throws Exception {
        topicProducer.sendMessage(topic2, mapMessage);
    }

}

测试队列1

运行 queue1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

image

测试队列2

由于队列2被两个消费者监听,又由于 ActiveMQ 队列中的消息只能被一个消费者消费,所以这两个消费者应该是通过负载均衡的方式消费。

运行 queue2() 方法,可以看到有时是消费者2消费,有时又是消费者3消费。

测试主题1

运行 topic1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

测试主题2

运行 topic2() 方法,生产者发送消息后,由于是发布订阅的模式,所以消费者2和消费者3都能收到消息。

查看控制台,可以看到有两个持久化订阅的消费者。

image

14.学习ActiveMQ-主题的持久化

2020-05-12   龙德   ActiveMQ   ActiveMQ  

主题的持久化跟队列有点不一样,我们先看生产者。

生产者改动不大,只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。

以下是代码:

public class DurableTopicProducer {

    public static void main(String[] args) {

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

            // 2.使用连接工厂创建一个连接对象
            connection = connectionFactory.createConnection();

            // 3.开启连接
            connection.start();

            // 4.使用连接对象创建会话对象。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 5.使用会话对象创建目标对象,这里我们创建的是 topic,也就是一对多模式
            Destination destination = session.createTopic("topic-test-01");

            // 6.使用会话对象创建生产者对象
            producer = session.createProducer(destination);
            
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            
            // 7.使用会话对象创建一个文本消息对象
            TextMessage textMessage = session.createTextMessage();

            String text = "Hello ActiveMQ - " + new Date().getTime();

            // 8.设置消息内容
            textMessage.setText(text);

            // 9.发送消息
            producer.send(textMessage);
            System.out.println("已生产消息:" + text);

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 10.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

然后是消费者,消费者改动比较多,以下是代码:

public class DurableTopicConsumer {

    public static void main(String[] args) throws JMSException {

        // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

        // 2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        
        // 3.设置ClientID
        
        connection.setClientID("client1");

        // 4.开启连接
        connection.start();

        // 5.使用连接对象创建会话对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 6.使用会话对象创建目的地对象,生产者与消费者的名称要保持一致。
        Topic topic = session.createTopic("topic-test-01");

        // 7.使用会话对象创建主题订阅者
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subscriber-1");

        // 8.通过监听器的方式来接收消息
        subscriber.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        System.out.println("已接收消息:" + ((TextMessage) message).getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

启动消费者,打开控制台页面,点击 “Subscribers”,可以看到以下页面。

image

在Active Durable Topic Subscribers里可以看到有一个消费者的信息,说明订阅成功了。

此时,我们将这个消费者线程关闭,再来看看,可以看到这个消费者跑到了Offline Durable Topic Subscribers里面,如下图所示。表明这个消费者离线了,但是依旧保持订阅关系。

image

下面做一个试验,此时消费者已经离线,我们先启动生产者,后启动消费者,此时消费者还能接收到刚才生产者发送的消息吗?

答案是可以的。

此时再刷新控制台,可以看到刚才在Offline的订阅者跑到了Active里面。

上述内容可以联系微信公众号理解。我们可以想象一个场景,我订阅了某公众号,后来我手机没电关机了,当我意识到后,充电再开机,我依旧能收到公众号推送给我的消息,这里的持久化,可以认为,我和公众号的联系被持久化了,所以我上线后可以及时收到消息。

13.学习ActiveMQ-队列的持久化

2020-05-12   龙德   ActiveMQ   ActiveMQ  

持久化

持久化队列只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。默认是持久化的。

我们可以做个试验,使用 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 设置为持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者依然可以拿到消息,也就是消息在宕机之前已经被存储了下来,后面再启动的时候,会读取到存储的数据。

非持久化

非持久化队列也很简单,只需要设置 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 就可以了。

我们再做个试验,使用 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 设置为非持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者什么都拿不到,也就是消息已经丢了,我们在MQ的控制台页面看到,MQ里的消息数量是0。

15.学习ActiveMQ-持久化数据到MySQL

2020-05-12   龙德   ActiveMQ   ActiveMQ  

ActiveMQ 的持久化可以这样理解:生产者生产消息之后,消息首先会到达 broker,broker 就相当于一个调度中心。broker 首先将消息存储在本地的数据文件(或者是远程数据库),再将消息发送给消费者。如果消费者接收到了消息,则将消息从存储中删除,否则继续尝试重发。

ActiveMQ 启动后,首先检查存储位置,如果有未发送成功的消息,需要先把消息发送出去。

ActiveMQ 默认的持久化数据库是 KahaDB,我们也可以改成 MySQL。

第一步,首先将 mysql-connector-java 的jar 扔到 ActiveMQ的lib目录下。

image

然后修改 conf 目录下的 activmeq.xml 中的 persistenceAdapter 结点为 JDBC。

<persistenceAdapter>
    <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
    <jdbcPersistenceAdapter dataSource="#mysql-datasource"/>
</persistenceAdapter>

再添加 mysql-datasource,这样上面的配置文件才能拿到 DataSource。将下面这段内容放在 broker 结点和 import 结点之间。

<bean id="mysql-datasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&amp;characterEncoding=UTF-8"/> 
    <property name="username" value="root"/> 
    <property name="password" value="123"/> 
    <property name="poolPreparedStatements" value="true"/> 
</bean>

上面的用户名和密码要改成你自己的,还有要注意 url 参数,这里的&要改成&amp;,也就是&的转义,否则启动会报错。

启动 ActiveMQ,打开 activemq 库,就会发现自动生成了 3 张表。

image

每张表的作用如下:

activemq_msgs(消息表):

image

activemq_acks(订阅关系表,针对于持久化的Topic,订阅者和服务器的订阅关系会保存在这个表中):

image

activemq_lock:

在集群环境中才有用,只有一个 Broker 可以获取消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用来记录哪个 Broker 是当前的 Master Broker。

测试队列的数据

运行队列生产者生产 3 条数据,然后查看 activemq_msgs 表,可以看到这 3 条消息已经存进 activemq_msgs 表了。

image

然后启动队列消费者消费消息,在刷新 activemq_msgs 表,可以看到数据没了,说明消息的消费操作也同步更新了数据库。

image

测试主题的数据

启动消费者 DurableTopicConsumer 类,查看数据库的 activemq_acks 表,可以发现订阅关系已经持久化到数据库里了。

image

启动生产者 DurableTopicProducer 类,这时候消息已经被消费了。但是再查看数据库的 activemq_msgs 表,里面的数据依然存在。

image

上述内容可以联系微信公众号来理解,微信公众号发布过的文章,不能因为订阅者已经收到了或者是服务器关闭了,就把发布过的文章和订阅者清空,这是不合理的,从这个角度来理解就能明白了。