idea debug 报 Method threw 'java.lang.StackOverflowError' exception. Cannot ...

2020-10-22   龙德   杂七杂八   StackOverflowError  

今天用 idea debug 的时候,遇到一个错误:Method threw ‘java.lang.StackOverflowError’ exception. Cannot …

如图所示:

image

类结构如下:

public class Student {

    private Long id;

    private String name;

    private Classroom classroom;
    
    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", classroom=" + classroom +
                '}';
    }
}
public class Classroom {

    private Long id;

    private List<Student> students;
    
    @Override
    public String toString() {
        return "Classroom{" +
                "id=" + id +
                ", students=" + students +
                '}';
    }
}

因为 debug 模式下需要显示变量信息,这个信息就是要调用 toString() 方法得到的。而我采用的结构是 1 对多的关系,对象互相引用,调用 toString() 方法的时候栈溢出了。

解决这个问题也很简单,修改任意一个类的 toString() 方法,不要造成循环调用就行。

public class Classroom {

    private Long id;

    private List<Student> students;

    @Override
    public String toString() {
        return "Classroom{" +
                "id=" + id +
                '}';
    }
    
}

image

dubbo hessian 反序列化报错

2020-10-21   龙德   杂七杂八   dubbo hessian  

今天工作中遇到一个问题,dubbo 服务A调用服务B,服务A报空指针异常,反序列化失败。异常信息如下:

Failed to invoke the method getChainById in the service cn.tanzhou.chain.config.stub.service.ChainAPI. Tried 1 times of the providers [10.0.65.146:20882] (1/2) from the registry 192.168.1.20:2181 on the consumer 192.168.78.1 using the dubbo version 2.6.2. Last error is: Failed to invoke remote method: getChainById, provider: dubbo://10.0.65.146:20882/cn.tanzhou.chain.config.stub.service.ChainAPI?anyhost=true&application=chain-scheduling-dubbo&check=false&default.check=false&default.retries=0&default.service.filter=providerTraceIdFilter&default.timeout=3000&dubbo=2.6.2&generic=false&interface=cn.tanzhou.chain.config.stub.service.ChainAPI&methods=getChainById,page&organization=tz&owner=chenyx&pid=3064&register.ip=192.168.78.1&remote.timestamp=1602745219717&revision=1.0.0&side=consumer&timestamp=1602747456735&version=1.0.0, cause: com.alibaba.com.caucho.hessian.io.HessianFieldException: cn.tanzhou.chain.config.stub.domain.response.rule.AbstractEvent.childrenNodesOfTrue: 'cn.tanzhou.chain.config.stub.domain.response.rule.WaitEvent' could not be instantiated
com.alibaba.com.caucho.hessian.io.HessianFieldException: cn.tanzhou.chain.config.stub.domain.response.rule.AbstractEvent.childrenNodesOfTrue: 'cn.tanzhou.chain.config.stub.domain.response.rule.WaitEvent' could not be instantiated
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.logDeserializeError(JavaDeserializer.java:167)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer$ObjectListFieldDeserializer.deserialize(JavaDeserializer.java:531)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:276)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:203)
    at com.alibaba.com.caucho.hessian.io.SerializerFactory.readObject(SerializerFactory.java:526)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObjectInstance(Hessian2Input.java:2810)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2750)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2279)
    at com.alibaba.com.caucho.hessian.io.CollectionDeserializer.readLengthList(CollectionDeserializer.java:122)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2251)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer$ObjectListFieldDeserializer.deserialize(JavaDeserializer.java:525)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:276)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:203)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObjectInstance(Hessian2Input.java:2808)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2146)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2075)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2119)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2075)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer$ObjectFieldDeserializer.deserialize(JavaDeserializer.java:408)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:276)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:203)
    at com.alibaba.com.caucho.hessian.io.SerializerFactory.readObject(SerializerFactory.java:526)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObjectInstance(Hessian2Input.java:2810)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2750)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2279)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2724)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2279)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2081)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2075)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer$ObjectFieldDeserializer.deserialize(JavaDeserializer.java:408)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:276)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:203)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObjectInstance(Hessian2Input.java:2808)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2146)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2075)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2119)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2075)
    at com.alibaba.dubbo.common.serialize.hessian2.Hessian2ObjectInput.readObject(Hessian2ObjectInput.java:91)
    at com.alibaba.dubbo.common.serialize.hessian2.Hessian2ObjectInput.readObject(Hessian2ObjectInput.java:96)
    at com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(DecodeableRpcResult.java:83)
    at com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult.decode(DecodeableRpcResult.java:113)
    at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(DubboCodec.java:89)
    at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.decode(ExchangeCodec.java:124)
    at com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec.decode(ExchangeCodec.java:84)
    at com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec.decode(DubboCountCodec.java:46)
    at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.messageReceived(NettyCodecAdapter.java:133)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:280)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:200)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.alibaba.com.caucho.hessian.io.HessianProtocolException: 'cn.tanzhou.chain.config.stub.domain.response.rule.WaitEvent' could not be instantiated
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.instantiate(JavaDeserializer.java:316)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.readObject(JavaDeserializer.java:201)
    at com.alibaba.com.caucho.hessian.io.SerializerFactory.readObject(SerializerFactory.java:526)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObjectInstance(Hessian2Input.java:2810)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2750)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2279)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2724)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2279)
    at com.alibaba.com.caucho.hessian.io.CollectionDeserializer.readLengthList(CollectionDeserializer.java:122)
    at com.alibaba.com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:2251)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer$ObjectListFieldDeserializer.deserialize(JavaDeserializer.java:525)
    ... 57 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.alibaba.com.caucho.hessian.io.JavaDeserializer.instantiate(JavaDeserializer.java:312)
    ... 67 more
Caused by: java.lang.NullPointerException
    at cn.tanzhou.chain.config.stub.domain.response.rule.WaitEvent.init(WaitEvent.java:63)
    at cn.tanzhou.chain.config.stub.domain.response.rule.WaitEvent.<init>(WaitEvent.java:59)
    ... 72 more

可以看到最根本的异常是:java.lang.NullPointerException。抛出此异常的方法是 init() 方法,构造函数调用了 init() 方法。所以我们先去看一下构造函数。

构造函数如下:

public WaitEvent(Long nodeId, Long parentNodeId, Long chainId, Long eventId, String eventName, String eventCode,
                     Map<String, Object> waitEventProperties) {
        super(nodeId, parentNodeId, chainId, NodeEnum.NODE_2002, eventId, eventName, eventCode);
        init(waitEventProperties);
    }

private void init(Map<String, Object> waitEventProperties) {
    Object waitSecond = waitEventProperties.get("waitSecond");
    Object afterDays = waitEventProperties.get("afterDays");
    Object atTimeOfSecond = waitEventProperties.get("atTimeOfSecond");
    Object waitType = waitEventProperties.get("waitType");
    Object selectType = waitEventProperties.get("selectType");
    Object courseId = waitEventProperties.get("courseId");
    Object courseTimeOfSecond = waitEventProperties.get("courseTimeOfSecond");
    Object courseTimeType = waitEventProperties.get("courseTimeType");
    ......

猜想一下应该是序列化的时候调用了这个构造方法,参数传的都是 null,所以就报了空指针异常。

我 debug 跟了一下,发现参数果然都是 null。

image

image

在 JavaDeserializer 类的构造方法有这么一段:

public JavaDeserializer(Class cl) {
        _type = cl;
        _fieldMap = getFieldMap(cl);

        _readResolve = getReadResolve(cl);

        if (_readResolve != null) {
            _readResolve.setAccessible(true);
        }

        Constructor[] constructors = cl.getDeclaredConstructors();
        long bestCost = Long.MAX_VALUE;

        for (int i = 0; i < constructors.length; i++) {
            Class[] param = constructors[i].getParameterTypes();
            long cost = 0;

            for (int j = 0; j < param.length; j++) {
                cost = 4 * cost;

                if (Object.class.equals(param[j]))
                    cost += 1;
                else if (String.class.equals(param[j]))
                    cost += 2;
                else if (int.class.equals(param[j]))
                    cost += 3;
                else if (long.class.equals(param[j]))
                    cost += 4;
                else if (param[j].isPrimitive())
                    cost += 5;
                else
                    cost += 6;
            }

            if (cost < 0 || cost > (1 << 48))
                cost = 1 << 48;

            cost += (long) param.length << 48;

            if (cost < bestCost) {
                _constructor = constructors[i];
                bestCost = cost;
            }
        }

        if (_constructor != null) {
            _constructor.setAccessible(true);
            Class[] params = _constructor.getParameterTypes();
            _constructorArgs = new Object[params.length];
            for (int i = 0; i < params.length; i++) {
                _constructorArgs[i] = getParamArg(params[i]);
            }
        }
    }

上面这段代码,是 hessian 在反序列化的时候,用于在被反序列化的类里面找一个“得分最低”的构造函数,反序列化时会加以调用。构造函数的“得分”规则大致是:参数越少得分越低;参数个数相同时,参数类型越接近 JDK 内置类得分越低。而我们的调用返回的类只有一个构造函数,当然只有这个构造函数会被选中 但是,hessian 反序列化调用被选中的构造函数时,是这样来创造该构造函数需要的参数的:

protected static Object getParamArg(Class cl) {
    if (!cl.isPrimitive())
        return null;
    else if (boolean.class.equals(cl))
        return Boolean.FALSE;
    else if (byte.class.equals(cl))
        return new Byte((byte) 0);
    else if (short.class.equals(cl))
        return new Short((short) 0);
    else if (char.class.equals(cl))
        return new Character((char) 0);
    else if (int.class.equals(cl))
        return Integer.valueOf(0);
    else if (long.class.equals(cl))
        return Long.valueOf(0);
    else if (float.class.equals(cl))
        return Float.valueOf(0);
    else if (double.class.equals(cl))
        return Double.valueOf(0);
    else
        throw new UnsupportedOperationException();
}

可以看到,如果参数不是 primitive 类型,会被 null 代替。但不巧的是我们的构造函数内部对该参数调用了一个方法,因此抛出了空指针异常。 也就是说这个空指针异常是抛在客户端反序列化的时候而不是服务端内部,因此服务端当然找不到对应的错误日志了。解决的办法也很简单:可以新增一个无参构造函数(无参构造函数肯定“得分”最低一定会被选中),或者修改代码保证任意参数为 null 的时候都不会出问题。

我最后增加了一个空参构造函数解决了这个问题。

总结:通过 dubbo 服务传递的对象,需要保证有无参构造函数,避免出现反序列化失败的问题。

但是还是有一点疑问,既然这样的话,为啥刚开始测试并没有出现这个问题,而是后来才突然出现,按理说这个问题一开始就应该出现才对,至今仍然困惑中。。。

参考资料:hessian反序列化问题

Java 并发之 synchronized

2020-06-04   龙德   Java   并发 synchronized  

Java 提供了多线程之间同步的机制,其中最基本的就是使用 synchronization 关键字。这个关键字大家用得比较多,但是可能没仔细梳理过它,这篇博客将仔细梳理 synchronization 关键字,让大家对它有一个更全面和深刻的认识。

想要全面了解 synchronized,得先了解 3 个知识点:Java 内存模型(JMM)、对象的内存布局和 Monitor 对象。

Java 内存模型(JMM)

现代多核 CPU 中每个核心拥有自己的一级缓存或一级缓存加上二级缓存等,问题就发生在每个核心的占缓存上。每个核心都会将自己需要的数据读到独占缓存中,数据修改后也是写入到缓存中,然后等待刷入到主存中。所以会导致有些核心读取的值是一个过期的值。

Java 作为高级语言,屏蔽了这些底层细节,用 JMM 定义了一套读写内存数据的规范,虽然我们不再需要关心一级缓存和二级缓存的问题,但是,JMM 抽象了主内存和本地内存的概念。

所有的共享变量存在于主内存中,每个线程有自己的本地内存,线程读写共享数据也是通过本地内存交换的,所以可见性问题依然是存在的。这里说的本地内存并不是真的是一块给每个线程分配的内存,而是 JMM 的一个抽象,是对于寄存器、一级缓存、二级缓存等的抽象。

image

总而言之,在 Java 内存模型下,所有的共享变量都存储在主内存中,每条线程还有自己的工作内存,线程的工作内存保存的是用到的共享变量的主内存副本的拷贝。线程对共享变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。这就可能造成一个线程修改了一个变量的值,而另外一个线程还继续使用它在工作内存中的变量值的拷贝,造成数据的不一致。

那 JMM 跟 synchronized 有什么关系呢?

一个线程在获取到监视器锁以后才能进入 synchronized 控制的代码块,一旦进入代码块,首先,该线程对于共享变量的缓存就会失效,因此 synchronized 代码块中对于共享变量的读取需要从主内存中重新获取,也就能获取到最新的值。

退出代码块的时候的,会将该线程写在缓冲区中的数据刷到主内存中,所以在 synchronized 代码块之前或 synchronized 代码块中对于共享变量的操作随着该线程退出 synchronized 块,会立即对其他线程可见(这句话的前提是其他读取共享变量的线程会从主内存读取最新值)。

也就是说,当线程进入 synchronized 控制的代码块中,这个线程对共享变量的读取从主内存中获取,退出 synchronized 块,会将修改后的数据刷到主内存中。

对象的内存布局

image

一个 Java 对象在堆内存中包括对象头、实例数据和补齐填充 3 个部分。

  • 对象头包括 Mark Words(存储哈希码、GC分代年龄、锁标志位等)和 Klass Words(指向当前对象所属的类的地址,也就是 Class 对象),如果是数组对象,还有一个保存数组长度的空间。

  • 实例数据是对象真正存储的有效信息,包括了对象的所有成员变量,其大小由各个成员变量的大小共同决定。

  • 对齐填充不是必然存在的,仅仅起占位符的作用。

对象的内存布局跟 synchronized 有什么关系呢?

别着急,下面介绍 synchronized 的使用的时候,会根据对象的内存布局来画图,更加直观的了解 synchronized。现在只需要记住对象头就可以了,对象头的 Mark Words 有一块空间记录了指向 Monitor 对象的指针,通过它可以关联到 Monitor 对象。

image

Monitor 对象

Monitor 其实是对 Java 对象的锁的一种抽象,称为监视器锁。每个对象都有一个 Monitor 相关联,它和 Java 对象是一对一的关系的。为了便于理解,我们可以简单的把它想象成一个对象。

Monitor 对象记录了持有锁的线程信息、阻塞队列、等待队列等。既然是对象就会有字段,Monitor 对象主要包含以下三个字段:

  • _Owner:记录当前持有锁的线程
  • _EntryList:阻塞队列,记录所有阻塞等待锁的线程
  • _WaitSet:等待队列,记录调用 wait() 方法并还未被通知的线程

当线程获得对象的监视器锁的时候,线程 id 等信息会拷贝进 _Owner 字段,其余线程会进入阻塞队列 _Entrylist,当持有锁的线程执行 wait() 方法,会立即释放锁进入 _Waitset 队列。当线程释放锁的时候,_Owner 会被置空,然后 _Entrylist 中的线程会竞争锁,竞争成功的线程 id 会写入 _Owner,其余线程继续在 _Entrylist 中等待。

Java 对象如何跟 Monitor 关联?

前面说过,对象头的 Mark Words 有一块空间记录了指向 Monitor 对象的指针,通过它可以关联到 Monitor 对象。请看下图:

image

当锁状态变成重量级锁时,对象头有一块空间指向重量级锁的指针,通过它可以关联到 Monitor 对象。

关于锁的 3 种状态:

JDK1.6 对 synchronized 的实现引入了大量的优化,如偏向锁、轻量级锁、重量级锁等技术来减少 synchronized 操作的开销。,这 3 种锁的区别如下:

  • 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。

  • 轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。

  • 重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

synchronized 的使用

synchronized 关键字主要有两种使用方式:

  • synchronized 作用于方法,称为同步方法。同步方法被调用时,会自动执行加锁操作,只有加锁成功,方法体才会得到执行。如果被 synchronized 修饰的方法是实例方法,那么这个实例的监视器会被锁定。如果是 static 方法,线程会锁住相应的 Class 对象的监视器。方法体执行完成或者异常退出后,会自动执行解锁操作。

  • synchronized 代码块。synchronized(object) 在对某个对象上执行加锁时,会尝试在该对象的监视器上进行加锁操作,只有成功获取锁之后,线程才会继续往下执行。线程获取到了监视器锁后,将继续执行 synchronized 代码块中的代码,如果代码块执行完成,或者抛出了异常,线程将会自动对该对象上的监视器执行解锁操作。

synchronized 修饰实例方法

synchronized 修饰实例方法,锁住的是实例对象的监视器,请看下面的例子。

public class Person {

	private String name;
	
	private Integer age;
	
	private Dog dog;

	public Person(String name, Integer age, Dog dog) {
		this.name = name;
		this.age = age;
		this.dog = dog;
	}

	public synchronized void setA() {
		System.out.println(Thread.currentThread().getName() + " 线程进入了 setA() 方法");
	}
	
	public synchronized void setB() {
		System.out.println(Thread.currentThread().getName() + " 线程进入了 setB() 方法");
	}
	
	// 省略 Getters 和 Setters...
}
public class Dog {

	private String name;
	
	private Integer age;

	public Dog(String name, Integer age) {
		this.name = name;
		this.age = age;
	}
	
	// 省略 Getters 和 Setters...
}
public static void main(String[] args) {
	Dog dog = new Dog("汪汪", 1);
	Person zhangsan = new Person("张三", 15, dog);
	Thread t1 = new Thread(() -> zhangsan.setA());
	Thread t2 = new Thread(() -> zhangsan.setA());
	t1.start();
	t2.start();
}

启动 2 个线程,共同访问张三的 setA() 方法,由于 setA() 方法被 synchronized 修饰,所以只会有一个线程获得张三的监视器锁,获得锁之后才有资格执行 setA() 方法,其它获取不到锁的线程都会阻塞。

image

如果 t2 线程访问的是 setB() 方法呢?

Thread t2 = new Thread(() -> zhangsan.setB());

也会发生多个线程争同一把锁的问题,因为 setA() 和 setB() 都是实例方法,synchronized 锁住的都是张三这个实例的监视器。

image

这时候的内存结构是这样的:

image

我画的不是很严谨,仅供参考,大家能理解就好。

如果 t2 线程访问的是李四的 setA() 方法呢?

Person lisi = new Person("李四", 16, dog);
Thread t2 = new Thread(() -> lisi.setA());

这时候这两个线程都能执行各自对象的 setA() 方法,因为 t1 线程竞争的是张三的监视器锁,t2 线程竞争的是李四的监视器锁。这是两个不同的锁了,自然不会发生多个线程争同一把锁的问题。

image

但是这样是有安全隐患的,别忘了张三和李四养的都是同一条狗,如果在 setA() 方法里对这条狗做了增删改操作,那么势必会引起线程安全问题。如果是这种情况,那么必须在狗对象里思考可能引起的线程安全问题,并作出相应的同步操作。

这时候的内存结构大概是这样的:

image

synchronized 修饰静态方法

Person 类新增两个静态方法

public synchronized static void setC() {
	System.out.println(Thread.currentThread().getName() + " 线程进入了 setC() 方法");
}

public synchronized static void setD() {
	System.out.println(Thread.currentThread().getName() + " 线程进入了 setD() 方法");
}

t1 和 t2 线程都访问 setC() 方法

Thread t1 = new Thread(() -> zhangsan.setC());
Thread t2 = new Thread(() -> zhangsan.setC());

前面说过,synchronized 修饰静态方法,锁住的是相应的 Class 对象的监视器。Class 对象是啥对象,干啥用的?怎么生成的?下面我就简单的说一下。

image

个人觉得学习 Java 有几座难以跨越的大山,除了并发这座大山之外,Class 对象也算得上一座大山了。Class 对象其实不是很好理解,只有理解了 Class 对象,才能理解类加载机制、类型机制、反射以及 synchronized 修饰静态方法等等重要的知识点。

简单的来说,Class 对象也就是 Class 类的对象,我们每写的一个 Java 类,都会编译成一个 .class 文件,这个文件包含了类的字段、方法、注解等等元信息。既然在 Java 里万物都是对象,那么 .class 文件就可以看成一个个的对象,哪个类的对象呢?就是 Class 类的对象。JVM 加载 .class 文件的时候,会在方法区生成一个个 Class 对象。然后我们每 new 一个实例对象,其实都是根据这个类对应的 Class 对象 new 出来的实例对象。不管你 new 多少个,这些实例对象都是根据 Class 对象这个模板生成的,这些实例对象都指向同一个 Class 对象。前面说对象头的时候,有个 Klass Words 空间,里面存的就是指向 Class 对象的指针,通过它就可以关联到 Class 对象。

image

所以 t1 和 t2 线程都访问 setC() 方法,由于 setC() 方法是静态方法,并且被 synchronized 修饰,锁的是张三对应的 Class 对象的监视器,所以会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是 setD() 方法呢?

Thread t2 = new Thread(() -> zhangsan.setD());

答案大家都想到了,也会发生多个线程争同一把锁的问题。因为 setC() 和 setD() 都是静态方法,synchronized 锁住的都是张三这个实例对应的 Class 对象的监视器,所以会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是 setA() 方法呢?

那这时候不会发生多个线程争同一把锁的问题。因为 t1 线程竞争的是张三的 Class 对象的监视器锁,t2 线程竞争的是张三这个实例对象的监视器锁。这是两个不同的锁了,自然不会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是李四的 setC() 方法呢?

Thread t2 = new Thread(() -> lisi.setC());

由于张三和李四的 Class 对象都是同一个,所以张三和李四的 setC() 方法锁的都是同一个 Class 对象的监视器,所以也会发生多个线程争同一把锁的问题。

image

这时候的内存结构图大概是这样的:

image

synchronized 代码块

Person 类新增 setE() 方法

public void setE() {
	synchronized (dog) {
		System.out.println(Thread.currentThread().getName() + " 线程进入了同步代码块");
	}
}

可以看到 setE() 方法有个同步代码块,锁住的是 dog 对象的监视器。

t1 和 t2 线程都访问张三的 setE() 方法

Thread t1 = new Thread(() -> zhangsan.setE());
Thread t2 = new Thread(() -> zhangsan.setE());

结果大家肯定想到了,肯定会发生多个线程争同一把锁的问题。

image

如果 t2 线程访问的是李四的 setE() 方法呢?

Thread t2 = new Thread(() -> lisi.setE());

也会发生多个线程争同一把锁的问题。因为张三和李四养的都是同一条狗。

image

这时候的内存结构图大概是这样的:

image

如果李四养的是另一条狗呢?

Dog haha = new Dog("哈哈", 2);
Person lisi = new Person("李四", 16, haha);
Thread t2 = new Thread(() -> lisi.setE());

这时候就不会发生多个线程争同一把锁的问题。因为张三和李四的狗不是同一只,这两个线程抢的不是同一把锁。

image

这时候的内存结构图大概是这样的:

image

总结

总的来说就两点:

1.为什么需要 synchronized

因为在 Java 内存模型下,所有的共享变量都存储在主内存中,每条线程还有自己的工作内存,线程的工作内存保存的是用到的共享变量的主内存副本的拷贝。线程对共享变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。这就可能造成一个线程修改了一个变量的值,而另外一个线程还继续使用它在工作内存中的变量值的拷贝,造成数据的不一致。所以在多线程的环境中,必须要考虑多个线程访问临界资源的安全问题。可以使用 Java 语言提供的 synchronized 关键字或者 java.util.concurrent.locks 包下的各种锁(比如 ReentrantLock) 来保证线程操作的原子性。

对于 synchronized 来说,当线程进入 synchronized 控制的代码块中,这个线程对共享变量的读取从主内存中获取,退出 synchronized 块,会将修改后的数据刷到主内存中。

2.如何使用 synchronized

要分清楚 synchronized 锁的是哪个对象的监视器:

  • synchronized 修饰实例方法,那么这个实例对象的监视器会被锁定。

  • synchronized 修饰静态方法,那么这个实例对象对应的 Class 对象的监视器会被锁定。

  • synchronized 作用于代码块,那么括号里的对象的监视器会被锁定。

参考资料

https://javadoop.com/post/java-memory-model

https://www.cnblogs.com/ZoHy/p/11313155.html

https://zhuanlan.zhihu.com/p/138427106

5.学习 Kafka - 在 Spring Boot 中使用 Kafka

2020-05-23   龙德   Kafka   Kafka SpringBoot  

源码地址:https://github.com/miansen/kafka-example/tree/master/kafka-example-springboot

在 Spring Boot 中使用 Kafka 跟在 Spring MVC 中是差不多的,而且还更简单,不需要各种 xml 配置了,只需要添加注解就可以了。

以下是我写的简单的实例代码,看一下就知道怎么使用了,感觉都是套路。

application.properties

首先是配置文件,这是必不可少的。Spring Boot 会自动加载配置文件的参数,按照你设置的参数初始化好 kafka 相关的 bean,注入到 IOC 容器里。

server.port=8080
spring.kafka.consumer.bootstrap-servers=192.168.197.6:9092
# 确保新的消费者组能获得我们之前发送的消息,为了测试方便(生产配置latest,只获取最新的消息)。
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.bootstrap-servers=192.168.197.6:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 监听的 topic 如果不存在,则不报错
spring.kafka.listener.missing-topics-fatal=false

以上几个参数是比较常用的,但是我们知道 Kafka 客户端提供的参数配置不止这些,那么如何在 Spring Boot 中配置呢?

熟悉 Spring Boot 套路的同学可能就知道了,Spring Boot 一般都会提供一个 xxxProperties 的配置类,你在配置文件里配置的参数都会映射都这个配置类里。

所以直接搜索 KafkaProperties 类查看它有哪些属性就知道 Spring Boot 支持哪些配置参数了。

image

KafkaProperties 类的属性都写了注释,结合 Kafka 官方文档 http://kafka.apachecn.org/documentation.html#producerapi 就可以配置了。

生产者

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String value) {
        sendMessage(topic, null, value);
    }
    
    public void sendMessage(String topic, String key, String value) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, value);
        future.addCallback(success -> {
            RecordMetadata metadata = success.getRecordMetadata();
            System.out.println("生产者发送消息成功。topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
        }, failure -> {
            System.out.println("生产者发送消息失败,原因:" + failure.getMessage());
        });
    }
    
}

还记得在 Spring MVC 里,KafkaTemplate 是需要我们手动配置的吗,在 Spring Boot 里是自动配置的,可以直接拿来使用。

消费者

还记得在 Spring MVC 中是怎么配置消费者的吗?首先配置监听类,然后配置监听信息,最后再配置监听容器。

在 Spring Boot 中只需要配置 @KafkaListener 注解就可以了。

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = {"test01"}, groupId = "group01")
    public void onMessage1(ConsumerRecord<String, String> record) {
        System.out.println(String.format("[group01-消费者1]收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
    
    @KafkaListener(topics = {"test01"}, groupId = "group01")
    public void onMessage2(ConsumerRecord<String, String> record) {
        System.out.println(String.format("[group01-消费者2]收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
    
    @KafkaListener(topics = {"test01"}, groupId = "group02")
    public void onMessage3(ConsumerRecord<String, String> record) {
        System.out.println(String.format("[group02-消费者3]收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
}

我配置了 3 个消费者,它们都消费同一个主题。其中前两个消费者位于同一个消费者组下,第 3 个消费者位于另一个消费者组下。

测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private KafkaProducerService producerService;

    @Test
    public void sendMessageTest() throws Exception {
        for (int i = 0; i < 10; i++) {
            producerService.sendMessage("test01", Integer.toString(i), "hello kafka-" + i + "-" + new Date().getTime());
        }
        System.in.read(); // 让 main 线程阻塞一下,否则消费者线程可能会来不及消费就死了
    }
    
}

运行测试类,可以看到有三条消费者线程在监听消息,分别对应 KafkaConsumerService 类的 3 个 @KafkaListener 注解方法。

image

控制台输出如下:

生产者发送消息成功。topic: test01, partition: 2, offset: 773
生产者发送消息成功。topic: test01, partition: 0, offset: 922
生产者发送消息成功。topic: test01, partition: 2, offset: 774
生产者发送消息成功。topic: test01, partition: 2, offset: 775
生产者发送消息成功。topic: test01, partition: 1, offset: 740
生产者发送消息成功。topic: test01, partition: 0, offset: 923
生产者发送消息成功。topic: test01, partition: 1, offset: 741
生产者发送消息成功。topic: test01, partition: 0, offset: 924
生产者发送消息成功。topic: test01, partition: 0, offset: 925
生产者发送消息成功。topic: test01, partition: 2, offset: 776
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 773, key: 0, value: hello kafka-0-1593143698305
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 773, key: 0, value: hello kafka-0-1593143698305
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 922, key: 1, value: hello kafka-1-1593143713542
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 774, key: 2, value: hello kafka-2-1593143744094
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 775, key: 3, value: hello kafka-3-1593143746141
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 922, key: 1, value: hello kafka-1-1593143713542
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 774, key: 2, value: hello kafka-2-1593143744094
[group01-消费者2]收到了消息。topic: test01, partition: 1, offset: 740, key: 4, value: hello kafka-4-1593143746861
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 923, key: 5, value: hello kafka-5-1593143746893
[group01-消费者2]收到了消息。topic: test01, partition: 1, offset: 741, key: 6, value: hello kafka-6-1593143746964
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 776, key: 9, value: hello kafka-9-1593143747124
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 924, key: 7, value: hello kafka-7-1593143747027
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 925, key: 8, value: hello kafka-8-1593143747063
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 923, key: 5, value: hello kafka-5-1593143746893
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 924, key: 7, value: hello kafka-7-1593143747027
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 925, key: 8, value: hello kafka-8-1593143747063
[group02-消费者3]收到了消息。topic: test01, partition: 1, offset: 740, key: 4, value: hello kafka-4-1593143746861
[group02-消费者3]收到了消息。topic: test01, partition: 1, offset: 741, key: 6, value: hello kafka-6-1593143746964
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 775, key: 3, value: hello kafka-3-1593143746141
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 776, key: 9, value: hello kafka-9-1593143747124

可以看到生产者生产了 10 条消息,均匀的分布在 3 个分区里。

“消费者1” 只消费了 2 号分区,而”消费者2” 消费了 0 号和 1 号分区。由于这两个消费者位于同一个消费者组下,所以不会重复消费同一个分区。

而 “消费者3” 位于不同的消费者组下,其它组对它不影响,跟它没关系。并且这个组只有它一个消费者,所以 “消费者3” 可以大饱口福,消费所有的分区。

4.学习 Kafka - 在 Spring MVC 中使用 Kafka

2020-05-22   龙德   Kafka   Kafka SpringMVC  

源码地址:https://github.com/miansen/kafka-example/tree/master/kafka-example-springmvc

引入依赖

Spring MVC 的依赖就不贴了,可以去源码里看。

既然是在 Spring MVC 应用中使用 Kafka,那肯定会有一个 spring-kafka 的玩意,一般 Spring 应用都是通过这种方式将第三方应用集成进来,简化开发,开箱即用。

<!-- 1)Kafka与 Spring 集成 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.0.5.RELEASE</version>
</dependency>

要注意 spring-kafka 与 Spring MVC 的版本,太高了可能会不兼容。

application.properties

application.properties 文件配置 kafka 用到的各个属性,这样就不用再代码里或者是 xml 配置文件里写死了,更灵活一点。

bootstrap.servers=192.168.197.6:9092
acks=all
retries=0
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=my-consumer-group-04
session.timeout.ms=15000
enable.auto.commit=true
auto.commit.interval.ms=1000

各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html

配置文件

既然是 Spring MVC 应用,那么 xml 配置文件也是必不可少的。虽然可以通过 Java Config 的形式配置,但是还是用 xml 配置比较好,这样直观一点,可以更明显的看出配置了哪个 bean,哪个 bean 跟 哪个 bean 关联。

我分两个配置文件,一个是 kafka-producer.xml,这个是配置生产者的。还有一个是 kafka-consumer.xml,这个是配置消费者的。

kafka-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

     <!-- 配置生产者的属性 -->  
     <bean id="kafkaProducerProperties" class="java.util.HashMap">  
        <constructor-arg>  
            <map>  
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="acks" value="${acks}"/>
                <entry key="retries" value="${retries}"/>  
                <entry key="batch.size" value="${batch.size}"/>  
                <entry key="linger.ms" value="${linger.ms}"/>  
                <entry key="buffer.memory" value="${buffer.memory}"/>  
                <entry key="key.serializer" value="${key.serializer}"/>  
                <entry key="value.serializer" value="${value.serializer}"/>
                <entry key="key.deserializer" value="${key.deserializer}"/>  
                <entry key="value.deserializer" value="${value.deserializer}"/> 
            </map>  
        </constructor-arg>  
     </bean>  
    
    <!-- 创建 kafkaProducerTemplate 需要使用的 kafkaProducerFactory bean -->
     <bean id="kafkaProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">  
        <constructor-arg ref="kafkaProducerProperties" /> 
     </bean>
     
     <!-- 创建 kafkaTemplate bean,使用的时候,只需要注入这个 bean,即可使用 template 的 send 消息方法 -->
     <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">  
        <constructor-arg ref="kafkaProducerFactory"/>
        <!--设置对应 topic-->
        <property name="defaultTopic" value="test01"/>
     </bean>
     
    <!-- 生产者 -->
    <bean id="kafkaProducerService" class="wang.miansen.kafka.springmvc.KafkaProducerService" />
</beans>

kafka-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

    <!-- 配置消费者的属性 -->
    <bean id="kafkaConsumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                <entry key="group.id" value="${group.id}" />
                <entry key="enable.auto.commit" value="${enable.auto.commit}" />
                <entry key="session.timeout.ms" value="${session.timeout.ms}" />
                <entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}" />
                <entry key="key.serializer" value="${key.serializer}" />
                <entry key="value.serializer" value="${value.serializer}" />
                <entry key="key.deserializer" value="${key.deserializer}" />
                <entry key="value.deserializer" value="${value.deserializer}" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 创建 kafkaConsumerFactory bean -->
    <bean id="kafkaConsumerFactory"
        class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg ref="kafkaConsumerProperties" />
    </bean>

    <!--具体监听的 bean -->
    <bean id="kafkaConsumerListener1" class="wang.miansen.kafka.springmvc.KafkaConsumerListener1" />
    <bean id="kafkaConsumerListener2" class="wang.miansen.kafka.springmvc.KafkaConsumerListener2" />

    <!-- 消费者容器配置信息 -->
    <bean id="kafkaMessageListenerContainerProperties1"
        class="org.springframework.kafka.listener.config.ContainerProperties">
        <!-- 订阅主题 -->
        <constructor-arg value="test01" />
        <property name="messageListener" ref="kafkaConsumerListener1" />
    </bean>
    <bean id="kafkaMessageListenerContainerProperties2"
        class="org.springframework.kafka.listener.config.ContainerProperties">
        <!-- 订阅主题 -->
        <constructor-arg value="test01" />
        <property name="messageListener" ref="kafkaConsumerListener2" />
    </bean>

    <!-- 配置消费者容器 -->
    <bean id="kafkaMessageListenerContainer1"
        class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
        <constructor-arg ref="kafkaConsumerFactory" />
        <constructor-arg ref="kafkaMessageListenerContainerProperties1" />
    </bean>
    <bean id="kafkaMessageListenerContainer2"
        class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
        init-method="doStart">
        <constructor-arg ref="kafkaConsumerFactory" />
        <constructor-arg ref="kafkaMessageListenerContainerProperties2" />
    </bean>

</beans>

业务代码

KafkaProducerService

public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String value) {
        sendMessage(null, value);
    }
    
    public void sendMessage(String key, String value) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.sendDefault(value);
        future.addCallback(success -> {
            RecordMetadata metadata = success.getRecordMetadata();
            System.out.println("生产者发送消息成功。topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
        }, failure -> {
            System.out.println("生产者发送消息失败,原因:" + failure.getMessage());
        });
    }
}

KafkaConsumerListener1

public class KafkaConsumerListener1 implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(String.format("消费者1收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(),
                record.partition(), record.offset(), record.key(), record.value()));
    }

}

KafkaConsumerListener2

public class KafkaConsumerListener2 implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(String.format("消费者2收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s", record.topic(),
                record.partition(), record.offset(), record.key(), record.value()));
    }

}

各个 bean 的作用都写上了注释,看一眼就明白是干啥用的了。如果你仔细看就会发现,在 Spring 应用中使用 Kafka,其实跟你用 kafka-clients 的 API 差不多的,只不过是 Spring 封一层,一般都会提供一个 xxxFactory 和 xxxTemplate 的 Bean,将第三方应用融合进 Spring 应用,以便开发者更容易上手。

在生产者的配置中,我配置了一个 kafkaProducerService bean,这个 bean 是生产者服务的接口,用来处理业务请求,这个 bean 注入了 Spring 提供的操作模板类 kafkaTemplate,这个类有很多操作 Kafka 的方法,通过它可以很方便的将消息发送到 Kafka。

kafkaTemplate 类的 send 方法会返回一个 Future 对象,这个对象接收两个对象,一个用来处理发送成功,一个用来处理发送失败。

在消费者的配置中,我配置了两个监听 bean:kafkaConsumerListener1 和 kafkaConsumerListener2。既然是监听的方式,那么说明消费者是异步消费的,当有消息进来时,会另起线程去处理消费逻辑。

测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:kafka-producer.xml", "classpath:kafka-consumer.xml"})
public class ApplicationTest {

    @Autowired
    private KafkaProducerService producerService;
    
    @Test
    public void sendMessageTest() throws Exception {
        for (int i = 0; i < 10; i++) {
            producerService.sendMessage(Integer.toString(i), "hello kafka-" + i);
        }
        System.in.read(); // 让 main 线程阻塞一下,否则消费者线程可能会来不及消费就死了
    }
}

生产者向 Kafka 发送数据,我这里指定了 key。前面说过,如果不指定分区,但是指定了 key,那么 Kafka 会将 key hash,然后取模,根据模数决定消息分配到哪个分区。所以这 10 条数据会分散到各个分区存储。

控制台输出信息如下,为了方便查看,我截成了两部分。

生产者发送消息成功。topic: test01, partition: 0, offset: 880
生产者发送消息成功。topic: test01, partition: 0, offset: 881
生产者发送消息成功。topic: test01, partition: 0, offset: 882
生产者发送消息成功。topic: test01, partition: 1, offset: 714
生产者发送消息成功。topic: test01, partition: 1, offset: 715
生产者发送消息成功。topic: test01, partition: 1, offset: 716
生产者发送消息成功。topic: test01, partition: 2, offset: 731
生产者发送消息成功。topic: test01, partition: 2, offset: 732
生产者发送消息成功。topic: test01, partition: 2, offset: 733
生产者发送消息成功。topic: test01, partition: 2, offset: 734

可以看到这 10 条消息均匀的分布在 3 个分区里。

我配置了两个消费者,这两个消费者都位于一个消费者组下,所以这两个消费者会轮流消费,而且只会消费到分配给自己的分区,不会重复消费相同的分区。

image

可以看到有两条消费者线程在监听处理消息。

消费者1收到了消息。topic: test01, partition: 0, offset: 880, key: null, value: hello kafka-2
消费者1收到了消息。topic: test01, partition: 0, offset: 881, key: null, value: hello kafka-5
消费者1收到了消息。topic: test01, partition: 0, offset: 882, key: null, value: hello kafka-8
消费者2收到了消息。topic: test01, partition: 2, offset: 731, key: null, value: hello kafka-0
消费者2收到了消息。topic: test01, partition: 2, offset: 732, key: null, value: hello kafka-3
消费者2收到了消息。topic: test01, partition: 2, offset: 733, key: null, value: hello kafka-6
消费者2收到了消息。topic: test01, partition: 2, offset: 734, key: null, value: hello kafka-9
消费者1收到了消息。topic: test01, partition: 1, offset: 714, key: null, value: hello kafka-1
消费者1收到了消息。topic: test01, partition: 1, offset: 715, key: null, value: hello kafka-4
消费者1收到了消息。topic: test01, partition: 1, offset: 716, key: null, value: hello kafka-7

可以看到 “消费者1” 消费了 0 号和 1 号分区,”消费者2” 只消费了 2 号分区。再次强调,这两个消费者位于同一个消费者组下,它们只会消费分配给自己的分区,不会消费他人的分区,也就是说不会重复消费相同的分区。

为了更直观的对比,加深印象,我们再配置一个 “消费者3”,不过这个 “消费者3” 位于不同的消费者组下。

具体配置就不贴出来了,可以到源码里看。

我们直接看结果,控制台输出如下,为了方便查看,我截成了两部分。

生产者发送消息成功。topic: test01, partition: 0, offset: 887
生产者发送消息成功。topic: test01, partition: 0, offset: 888
生产者发送消息成功。topic: test01, partition: 0, offset: 889
生产者发送消息成功。topic: test01, partition: 1, offset: 720
生产者发送消息成功。topic: test01, partition: 1, offset: 721
生产者发送消息成功。topic: test01, partition: 1, offset: 722
生产者发送消息成功。topic: test01, partition: 1, offset: 723
生产者发送消息成功。topic: test01, partition: 2, offset: 738
生产者发送消息成功。topic: test01, partition: 2, offset: 739
生产者发送消息成功。topic: test01, partition: 2, offset: 740

可以看到生产者生产了 10 条消息,均匀的分布在 3 个分区里。

消费者1收到了消息。topic: test01, partition: 0, offset: 887, key: null, value: hello kafka-1
消费者1收到了消息。topic: test01, partition: 0, offset: 888, key: null, value: hello kafka-4
消费者1收到了消息。topic: test01, partition: 0, offset: 889, key: null, value: hello kafka-7
消费者3收到了消息。topic: test01, partition: 0, offset: 887, key: null, value: hello kafka-1
消费者3收到了消息。topic: test01, partition: 0, offset: 888, key: null, value: hello kafka-4
消费者3收到了消息。topic: test01, partition: 0, offset: 889, key: null, value: hello kafka-7
消费者2收到了消息。topic: test01, partition: 2, offset: 738, key: null, value: hello kafka-2
消费者2收到了消息。topic: test01, partition: 2, offset: 739, key: null, value: hello kafka-5
消费者2收到了消息。topic: test01, partition: 2, offset: 740, key: null, value: hello kafka-8
消费者3收到了消息。topic: test01, partition: 1, offset: 720, key: null, value: hello kafka-0
消费者3收到了消息。topic: test01, partition: 1, offset: 721, key: null, value: hello kafka-3
消费者3收到了消息。topic: test01, partition: 1, offset: 722, key: null, value: hello kafka-6
消费者3收到了消息。topic: test01, partition: 1, offset: 723, key: null, value: hello kafka-9
消费者3收到了消息。topic: test01, partition: 2, offset: 738, key: null, value: hello kafka-2
消费者3收到了消息。topic: test01, partition: 2, offset: 739, key: null, value: hello kafka-5
消费者3收到了消息。topic: test01, partition: 2, offset: 740, key: null, value: hello kafka-8
消费者1收到了消息。topic: test01, partition: 1, offset: 720, key: null, value: hello kafka-0
消费者1收到了消息。topic: test01, partition: 1, offset: 721, key: null, value: hello kafka-3
消费者1收到了消息。topic: test01, partition: 1, offset: 722, key: null, value: hello kafka-6
消费者1收到了消息。topic: test01, partition: 1, offset: 723, key: null, value: hello kafka-9

可以看到 “消费者1” 消费了 0 号和 1 号分区,”消费者2” 只消费了 2 号分区。由于这两个消费者位于同一个消费者组下,所以不会重复消费同一个分区。

而 “消费者3” 位于不同的消费者组下,其它组对它不影响,跟它没关系。并且这个组只有它一个消费者,所以 “消费者3” 可以大饱口福,消费所有的分区。

3.学习 Kafka - Java 客户端

2020-05-21   龙德   Kafka   Kafka  

引入客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

生产者

阻塞的生产者

生产者代码如下:

public class CustomProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1.配置生产者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#producerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2.实例化一个生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 3.实例化一条消息,第一个参数是主题的名称,第二个参数是消息的内容
            ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
            // 4.发送消息
            Future<RecordMetadata> future = producer.send(record);
            // 5.获取发送的结果,注意这个方法会阻塞当前线程
            RecordMetadata metadata = future.get();
            System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
        }
        // 6.关闭资源
        producer.close();
    }
}

带有回调的生产者

前面说过,future.get() 方法会阻塞当前线程,这样会影响到后面代码的执行,好在 Kafka 客户端提供了回调的方法。

代码如下:

public class CallbackProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1.配置生产者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#producerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2.实例化一个生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // 3.实例化一条消息,第一个参数是主题的名称,第二个参数是消息的内容
            ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
            // 4.发送消息
            producer.send(record, new Callback() {
                // 5.在回调函数里处理结果,这样就不会阻塞了
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });
        }
        // 6.关闭资源
        producer.close();
    }
}

消息发送到哪个分区?

image

如上图所示,ProducerRecord 类的构造函数有很多重载,根据构造函数来决定消息的分区,规则如下:

  • 如果不指定分区和 key,那么消息会轮询负载均衡到每个分区上。
// 轮询负载均衡到每个分区上
ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i));
  • 如果不指定分区,但是指定了 key,那么会将 key hash,然后取模,根据模数决定分区。
// 消息发送到哪个分区取决于 key
ProducerRecord<String,String> record = new ProducerRecord<>("test01", Integer.toString(i), Integer.toString(i));
  • 如果指定分区,那么直接发送到指定的分区。
// 指定消息发送到第 2 个分区
ProducerRecord<String,String> record = new ProducerRecord<>("test01", 2, Integer.toString(i), Integer.toString(i));

消费者

自动提交偏移量

public class AutoCommitConsumer {

    public static void main(String[] args) {
        // 1.配置消费者的属性,各个属性的作用可以在这里看:http://kafka.apachecn.org/documentation.html#consumerconfigs
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.197.6:9092");
        props.put("group.id", "my-consumer-group-01");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 2.实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 3.订阅主题(可以订阅多个)
        consumer.subscribe(Arrays.asList("test01"));
        // 4.循环消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value()));
            }
        }
    }
}

通过配置 enable.auto.commit,表明这是个自动提交偏移量,偏移量由 auto.commit.interval.ms 控制自动提交的频率。

通过配置 bootstrap.servers 指定集群的一个或多个 broker。不用指定全部的 broker,它将自动发现集群中的其余的 borker(最好指定多个,万一有服务器故障)。

在这个例子中,客户端订阅了主题 test01。消费者组叫 my-consumer-group-01。

broker 通过心跳机制自动检测 my-consumer-group-01 组中失败的消费者实例,消费者实例会自动 ping 集群,告诉集群它还活着。只要消费者实例能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过 session.timeout.ms 设置的值,那么就会认为是故障的,它的分区将被分配到别的消费者实例。

通过配置 deserializer 如何把 byte 转成 object 类型,例子中,通过指定 string 解析器,我们告诉获取到的消息的 key 和 value 只是简单个 string 类型。

手动提交偏移量

通过设置 props.put(“enable.auto.commit”, “false”); 改为手动提交偏移量。

然后在消费消息后手动提交。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(String.format("topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
    consumer.commitSync();
}

为什么需要手动提交?

可以想象一个场景:读取消息后,将它们批量插入到数据库中。如果我们设置 offset 自动提交,消费将被认为是已消费的。这样会出现问题,因为消息被插入到数据库之前可能会失败,这样我们就会丢失部分的数据。

List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= 100) {
        try {
            // 插入数据库
            insertIntoDb(buffer);
        } catch(Exception e) {
            // 如果插入数据库失败
            // TODO
        }
        // 插入数据库成功再手动提交
        consumer.commitSync();
        buffer.clear();
    }
}

订阅指定的分区

在前面的例子中,我们订阅感兴趣的 topic,让 Kafka 提供给我们平分后的分区。但是,在有些情况下,你可能需要自己来控制分配指定分区。

要使用此模式,你只需调用 assign(Collection partitions) 方法消费指定的分区即可:

// 订阅分区
TopicPartition partition0 = new TopicPartition("test01", 0);
TopicPartition partition1 = new TopicPartition("test01", 1);
consumer.assign(Arrays.asList(partition0, partition1));

注意订阅主题和订阅分区是互斥的。

控制消费的位置

大多数情况下,消费者只是简单的从头到尾的消费消息,周期性的提交 offset(自动或手动)。Kafka 也支持消费者去手动的控制消费的位置,可以消费之前的消息也可以跳过最近的消息。

要使用此模式,你只需调用 seek(TopicPartition partition, long offset) 方法消费指定的位置即可:

TopicPartition partition0 = new TopicPartition("test01", 0);
consumer.seek(partition0, 784);

结果如下图:

image

2.学习 Kafka - 重要概念

2020-05-20   龙德   Kafka   Kafka  

由于搭建集群的时候,3 台虚拟机都在前台开启了 Kafka,占用了窗口,所以我这里再开启两个窗口,分别用来操作 Kafka 和 Zookeeper。

image

下面的操作都在这两个窗口上进行。

主题(Topic)

如果把 Kafka 比作 MySQL 的话,那么主题就相当于数据库,所以想用 Kafka,就必须先创建主题,就好比我们想用 MySQL,就必须先创建数据库一样。

使用这个命令创建主题:

/usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.197.6:2181 --replication-factor 3 --partitions 3 --topic test01

关于这条命令的解释:

–zookeeper:指定 zookeeper 集群的 ip 地址和端口

–topic:创建名为 test01 的主题

–partitions:test01 这个主题分配 3 个分区

–replication-factor:每个分区分配 3 个副本,也就是每个分区备份 3 份

主题在 Zookeeper 中是如何体现的呢,我们进入 Zookeeper 看一下。

使用命令 ls /brokers/topics,可以看到刚才创建的 test01 主题在 Zookeeper 中已经创建了。

image

使用命令 /brokers/topics/test01/partitions,可以看到分区也创建了。

image

分区(partition)

主题只是一个逻辑的概念,在物理上并不存在,消息也不是存在主题里的,而是存在分区里的。分区是 Kafka 消息存储的最小单元,在物理上对应着一个个真实的文件。

我们上面新建了一个 test01 主题,分配了 3 个分区,这 3 个分区其实就是一个个的文件夹。我们可以进入 log.dirs 目录

cd /usr/local/kafka_2.13-2.5.0/logs/

image

可以看到创建了 3 个文件夹,分别对应 3 个分区,命名规则是 topic名称-N,N 就是分区的个数。后面生产者发送给 test01 主题的数据都会存在这 3 个文件夹里。

进入 test01 主题的第一个分区看一下

image

可以看到每个分区文件夹,都包含以下 4 类文件:

  • xxxxxxxxx.index:索引文件
  • xxxxxxxxx.log:数据文件
  • xxxxxxxxx.timeindex:时间戳索引文件
  • leader-epoch-checkpoint:保存了每一任 leader 开始写入消息时的 offset,会定时更新,当 follower 被选为 leader 时会根据这个确定哪些消息可用

我们重点看 .index .log .timeindex 这 3 个文件。

这 3 个文件都是成对出现的,每一对都叫做 segment。也就是说分区中的数据是分段存储,一个分区(partition)被切割成多个相同大小的段(segment)(这个是由 log.segment.bytes 决定,控制每个 segment 的大小)。

segment 文件命名规则:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。

如下图所示:

image

通过上面的内容,我们可以这样总结:

在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,目录的命名规则为 topic名称+有序序号。第一个序号从 0 开始计,最大的序号为 partition 数量减 1。partition 是实际物理上的概念,而 topic 是逻辑上的概念,topic 更像是一个消息的类别。由于生产者生产的消息会不断追加到 log 文件的末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 将 partition 细分为 segment,一个 partition 物理上由多个 segment 组成。每个 segment 由 3 个文件组成,分别是 index 文件、log 文件和 timeindex 文件,这 3 个文件的命名规则是:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。

偏移量(offset)

分区中的每一条记录都会分配一个 id 号来表示顺序,我们称之为 offset,offset 用来唯一的标识分区中每一条记录。

每一个消费者中唯一保存的元数据是 offset(偏移量)即消费在 log 中的位置。偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据。也可以跳过最近的记录,从 “现在” 开始消费。

特别说明的是,老版本的消费者是依赖 Zookeeper 的,当启动一个消费者时向 Zookeeper 注册。新版本消费者去掉了对 Zookeeper 的依赖,而是由消费组协调器统一管理,已消费的消息偏移量提交后会保存到名为 “__consumer_offsets” 文件夹中。

image

Kafka 的 offset 是分区内有序的,但是在不同分区中是无序的,Kafka 不保证数据的全局有序。

关于 offset,可以通过以下几个问题来理解:

1.为啥 offset 在分区内是有序的?

因为分区的结构是队列,先进先出,所以保证了有序。

2.为啥在不同分区中是无序的?

因为消息写入哪一个分区不是固定的,有可能第一条消息写入第一个分区,第二条消息写入第二个区分。。。

3.如果有业务场景需要保证数据的有序呢?该如何做?

可以这样做:创建主题的时候只分配一个分区,这样就能保证数据的顺序性了。

但是只分配一个分区,又跟 Kafka 的分布式冲突,所以还可以这样做:

创建主题的时候分配多个分区,读取消息时,在代码层面处理消息的顺序性。

broker

Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。一个 broker 有多个 topic,一个 topic 有多个 partition,一个 partition 有多个 segment。

image

生产者(producer)

Kafka 自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为 message(消息)发送到 Kafka 集群。默认情况下,每行将作为单独的 message 发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

[root@localhost kafka_2.13-2.5.0]# /usr/local/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 192.168.197.6:9092 --topic test01
>This is a message
>This is another message
>

关于这条命令的解释:

–broker-list:设置 kafka 集群地址

–topic:设置生产消息的主题

消费者(consumer)

Kafka 还有一个命令行 consumer(消费者),将消息转储到标准输出。

/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.12:9092 --topic test01 --from-beginning
This is a message
This is another message

关于这条命令的解释:

–bootstrap-server:设置 kafka 集群地址

–topic:设置消费消息的主题

–from-beginning:设置消息起始位置开始消费。默认是从新位置 latest 开始消费。

消费组(consumer-groups)

消费者使用一个 “消费组” 名称来进行标识,发布到 topic 中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。

如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。

如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。

image

如图,这个 Kafka 集群有两台 server 的,四个分区(p0-p3)和两个消费者组。消费组A有两个消费者,消费组B有四个消费者。

通常情况下,每个 topic 都会有一些消费组,一个消费组对应一个”逻辑订阅者”。一个消费组由许多消费者实例组成,便于扩展和容错。这就是发布和订阅的概念,只不过订阅者是一组消费者而不是单个的进程。

在 Kafka 中实现消费的方式是将分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。维护消费组中的消费关系由 Kafka 协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区。如果一个实例消失或者故障,拥有的分区将被分发到剩余的实例。这被称为重新平衡分组。

Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序。每个 partition 分区按照key值排序足以满足大多数应用程序的需求。但如果你需要总记录在所有记录的上面,可使用仅有一个分区的主题来实现,这意味着每个消费者组只有一个消费者进程。

创建消费者时如果不指定消费组,那么就会默认分配一个,可以通过这个命令查看有哪些消费组。

/usr/local/kafka_2.13-2.5.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.197.6:9092 --list

image

如图,因为我启动了 4 个消费者,所以有 4 个消费组。由于这 4 个消费者都属于不同的消费组,所以他们都能收到 test01 主题的所有消息。

同一个消费组里的消费者消费唯一的分区,不可以重复消费。

打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。

消费组 my-consumer-group-01 有 3 个消费者,分别是 my-consumer-01,my-consumer-02,my-consumer-03。

那么,Kafka 会负载均衡的将分区划分给同一消费组里的消费者。

image

如上图所示,此时,test01-0 分区已经被第一个消费者消费了,那么它就不能被这个组里的其它消费者消费。除非第一个消费者挂了,那么它就会被重新分配给其它的消费者。

我们启动 3 个消费者来亲自感受一下。

使用这个命令可以指定消费者的组。

/usr/local/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.197.6:9092 --consumer-property group.id=my-consumer-group-01 --consumer-property client.id=my-consumer-01 --topic test01

--consumer-property group.id=my-consumer-group-01 指定消费者的组是 my-consumer-group-01。

--consumer-property client.id=my-consumer-01 指定消费者的 ID 是 my-consumer-01。

image

如上图所示,我启动了 3 个消费者,这 3 个消费者都位于同一个组里。然后生产者往 test01 这个主题里发送消息,可以看到:对于同一个消费组而言,同一个时刻,只有一个消费者在消费消息,而且每个消费者消费唯一的分区,不可以重复消费。

不同消费组里的消费者可以消费相同的分区,可以重复消费。

打个比方,test01 主题分了 3 个分区,分别是 test01-0,test01-1,test01-2。

消费组 my-consumer-group-01 有 2 个消费者,分别是 my-consumer-01 和 my-consumer-02。

消费组 my-consumer-group-02 有 1 个消费者,是 my-consumer-03。

image

如上图所示,my-consumer-01 和 my-consumer-02 位于同一个组,那么它们无法重复消费同一个分区,并且同一个时刻,只能有一个消费者消费。

虽然 test01-0,test01-1,test01-2 这 3 个分区已经被 my-consumer-01 和 my-consumer-02 消费过了,但是 my-consumer-03 位于不同的组,所以其它组对它是不影响的,它是独立的,可以消费其它组消费过的分区。

image

如上图所示,我启动了 3 个消费者,前两个消费者同一个组,第三个消费者另一个组。

由于 my-consumer-01 和 my-consumer-02 位于同一个组,所以它们只能消费分配给自己的分区,不能重复消费。而且同一个时刻只能有一个消费者消费,所以看起来就像是轮流消费。而 my-consumer-03 位于不同的组,即使分区已经被其它组的消费者消费过了,但是它还是能消费的。而且这个组只有它一个消费者,所以它消费了所有分区的消息。

1.学习 Kafka - 搭建集群环境

2020-05-18   龙德   Kafka   Kafka  

前提条件

请准备 3 台虚拟机,以下是我准备的:

image

安装 JDK1.8

Zookeeper 和 Kafka 都依赖 JDK,所以安装 JDK 是必须的。

到官网 https://www.oracle.com/java/technologies/javase-downloads.html 下载 JDK1.8。

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp jdk-8u251-linux-x64.tar.gz root@192.168.197.6:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.10:/usr/local/src
scp jdk-8u251-linux-x64.tar.gz root@192.168.197.12:/usr/local/src

接着解压 jdk-8u251-linux-x64.tar.gz 压缩包到 /usr/local 目录。

tar zxvf /usr/local/src/jdk-8u251-linux-x64.tar.gz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set jdk environmentexport
export JAVA_HOME=/usr/local/jdk1.8.0_251
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

最后执行 java -version 命令,验证是否安装成功。输出以下内容则说明安装成功:

java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

搭建 Zookeeper 集群

由于 Kafka 的 broker 的状态、消费者的消费状态、偏移量以及消费者组等是交给 Zookeeper 维护的,所以安装 Kafka 之前必须先安装 Zookeeper。

安装 Zookeeper

到官网 https://zookeeper.apache.org/releases.html 下载 Zookeeper,我们下载 3.4.14 版本的。

image

image

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp zookeeper-3.4.14.tar.gz root@192.168.197.6:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.10:/usr/local/src
scp zookeeper-3.4.14.tar.gz root@192.168.197.12:/usr/local/src

接着解压 zookeeper-3.4.14.tar.gz 压缩包到 /usr/local 目录。

tar zxvf /usr/local/src/zookeeper-3.4.14.tar.gz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set zookeeper environmentexport
export ZK_HOME=/usr/local/zookeeper-3.4.14
export PATH=$ZK_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

修改 Zookeeper 的配置文件

将 zoo_sample.cfg 文件备份并重命名为 zoo.cfg

cp /usr/local/zookeeper-3.4.14/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.14/conf/zoo.cfg

编辑 zoo.cfg 文件

vi zoo.cfg

修改数据文件夹路径

dataDir=/usr/local/zookeeper-3.4.14/data

添加日志文件夹路径

dataLogDir=/usr/local/zookeeper-3.4.14/logs

在文件末尾添加集群信息

server.1=192.168.197.6:2888:3888
server.2=192.168.197.10:2888:3888
server.3=192.168.197.12:2888:3888

image

创建数据文件夹

mkdir /usr/local/zookeeper-3.4.14/data

创建日志文件夹

mkdir /usr/local/zookeeper-3.4.14/logs

创建 myid 文件,在 myid 文件中添加本机的 server ID

echo "1" > /usr/local/zookeeper-3.4.14/data/myid

请注意,每台虚拟机的 myid 要跟 server ID 对应。

开放端口

每台虚拟机都开放以下端口,如果不开放的话,Zookeeper 集群无法与其它节点通信,启动会报错。

# 2181 端口对 client 端提供服务
firewall-cmd --zone=public --add-port=2181/tcp --permanent
# 2888 端口是集群内机器相互通信使用(由 Leader 负责监听此端口)
firewall-cmd --zone=public --add-port=2888/tcp --permanent
# 3888 端口是选举 leader 使用的
firewall-cmd --zone=public --add-port=3888/tcp --permanent

使端口生效

firewall-cmd --reload

启动 3 台虚拟机的 Zookeeper

使用以下命令分别启动每台虚拟机上的 Zookeeper。

/usr/local/zookeeper-3.4.14/bin/zkServer.sh start

查看 Zookeeper 的状态

/usr/local/zookeeper-3.4.14/bin/zkServer.sh status

如果输出以下信息,则说明 Zookeeper 集群搭建成功

192.168.197.6:

image

192.168.197.10:

image

192.168.197.12:

image

通过以上图片可以知道,3 台虚拟机上的 Zookeeper 已经成功启起来了。其中 192.168.197.10 节点被选举为 leader 节点,其它两个节点是 follower 节点。每个节点都监听 3888 端口,当 leader 挂掉时,其它节点会通过此端口相互通信,选举出新的 leader。

2888 端口是集群内机器相互通信使用的,由 Leader 负责监听此端口。

搭建 Kafka 集群

安装 Kafka

到官网 http://kafka.apache.org/downloads 下载 Kafka,我们下载 2.5.0 版本的。

image

image

下载完后,分别上传到每台虚拟机的 /usr/local/src 目录下。

我是用 Git 上传的。

scp kafka_2.13-2.5.0.tgz root@192.168.197.6:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.10:/usr/local/src
scp kafka_2.13-2.5.0.tgz root@192.168.197.12:/usr/local/src

接着解压 kafka_2.13-2.5.0.tgz 压缩包到 /usr/local 目录。

tar zxvf kafka_2.13-2.5.0.tgz -C /usr/local/

然后配置每一台虚拟机的环境变量,编辑 profile 文件

vi /etc/profile

在最末尾添加以下内容

# set kafka environment
export KAFKA_HOME=/usr/local/kafka_2.13-2.5.0
export PATH=$KAFKA_HOME/bin:$PATH

保存 profile 文件,执行生效命令:

source /etc/profile

修改 Kafka 的配置文件

修改 192.168.197.6 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=0
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.6:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

修改 192.168.197.10 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=1
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.10:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

修改 192.168.197.12 节点的配置文件

编辑配置文件

vi /usr/local/kafka_2.13-2.5.0/config/server.properties

修改配置如下(IP 地址应该根据实际情况填写)

# 当前节点在集群中的唯一标识,和 zookeeper 的 myid 性质是一样
broker.id=2
# 当前 kafka 对外提供服务的 ip 和端口,默认是 9092
listeners=PLAINTEXT://192.168.197.12:9092
# 消息存放的目录
log.dirs=/usr/local/kafka_2.13-2.5.0/logs
# zookeeper 集群的信息
zookeeper.connect=192.168.197.6:2181,192.168.197.10:2181,192.168.197.12:2181
# 连接 zookeeper 超时的最大时间
zookeeper.connection.timeout.ms=180000

然后每个节点都创建 logs 文件夹

mkdir /usr/local/kafka_2.13-2.5.0/logs

开放端口

每台虚拟机都开放以下端口

# 9092 端口是 Kafka 对 client 端提供服务的端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent

使端口生效

firewall-cmd --reload

启动 3 台虚拟机的 Kafka

使用以下命令分别启动每台虚拟机上的 Kafka。

/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.5.0/config/server.properties &

上面那个命令是前台启动,会一直占用窗口,你也可以使用后台启动,命令如下:

/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/server.properties

如果输出以下信息,则说明启动成功

image

总结

最后来总结以下每台虚拟机安装的内容。

查看服务启动情况

使用这个命令查看服务启动情况:

jps

image

从上图可以看到安装了 Kafka 和 Zookeeper,并且都启动了。

查看端口占用情况

使用这个命令查看端口占用情况:

netstat -tunlp|egrep "(2181|2888|3888|9092)"

image

  • 2888 端口:此端口是 Zookeeper 集群内机器相互通信使用的,由 Leader 节点负责监听此端口。
  • 3888 端口:Zookeeper 集群的每个节点都监听 3888 端口,当 leader 挂掉时,其它节点会通过此端口相互通信,选举出新的 leader。
  • 2181 端口:Zookeeper 对 client 端提供服务的端口。
  • 9092 端口:Kafka 对 client 端提供服务的端口。

查看 Zookeeper 的目录情况

随便在一个节点上,使用以下命名,查看 Zookeeper 的目录情况。

使用客户端进入 Zookeeper

/usr/local/zookeeper-3.4.14/bin/zkCli.sh

image

使用 ls / 命令查看 Zookeeper 的目录情况。

image

上面的显示结果中:只有 zookeeper 目录是 Zookeeper 原生的,其它都是 Kafka 创建的。

关于各个目录的作用,后面再说。

至此,Kafka 的集群环境就搭建成功了。

17.学习ActiveMQ-在SpringBoot中使用ActiveMQ

2020-05-15   龙德   ActiveMQ   ActiveMQ SpringBoot  

新建工程

新建一个 Maven 工程,我的工程叫 springboot-activemq-example。

pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/>
    </parent>
    
    <groupId>wang.miansen</groupId>
    <artifactId>springboot-activemq-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>springboot-activemq-example</name>
    <description>activemq project for Spring Boot</description>
    
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>

    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
        <!--消息队列连接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

server.port=8080
# 配置消息的类型,如果是true则表示为topic消息,如果为false表示Queue消息
spring.jms.pub-sub-domain=true
# 连接用户名
spring.activemq.user=admin
# 连接密码
spring.activemq.password=admin
# 连接主机信息
spring.activemq.brokerUrl=tcp://localhost:61616
# 连接池的最大连接数
jms.pool.maxConnections=10

新建配置类 ActiveMQConfig

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }
    
    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }
    
    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }
    
}

队列生产者

@Component
public class QueueProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-queue-1")
    private Queue queue1;
    
    @Resource(name = "springboot-activemq-queue-2")
    private Queue queue2;
    
    // 发送消息到队列1
    public void sendQueueMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue1, mapMessage);
    }
    
    // 发送消息到队列2
    public void sendQueueMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue2, mapMessage);
    }

}

队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

@Component
public class TopicProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-topic-1")
    private Topic topic1;
    
    @Resource(name = "springboot-activemq-topic-2")
    private Topic topic2;
    
    // 发送消息到主题1
    public void sendTopicMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic1, mapMessage);
    }
    
    // 发送消息到主题2
    public void sendTopicMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic2, mapMessage);
    }

}

主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

测试

如果要测试主题,那么需要修改配置 spring.jms.pub-sub-domain=true

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private QueueProducer queueProducer;
    
    @Autowired
    private TopicProducer topicProducer;
    
    @Test
    public void sendQueueMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息1");
        queueProducer.sendQueueMessage1(map);
    }
    
    @Test
    public void sendQueueMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息2");
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
    }
    
    @Test
    public void sendTopicMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息1");
        topicProducer.sendTopicMessage1(map);
    }
    
    @Test
    public void sendTopicMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息2");
        topicProducer.sendTopicMessage2(map);
    }
    
}

同时配置队列和主题

上面说过,如果想要使用主题,那么需要修改配置文件,这样不方便。我们可以在 ActiveMQConfig 类里同时配置队列和主题。

修改 ActiveMQConfig 类:

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    @Value("${spring.activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String user;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${jms.pool.maxConnections}")
    private int maxConnections;

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }

    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }

    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }

    // 真正可以产生连接的工厂
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(user, password, brokerUrl);
    }

    // ActiveMQ 提供的连接池
    @Bean
    public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        PooledConnectionFactory pool = new PooledConnectionFactory();
        pool.setConnectionFactory(connectionFactory);
        pool.setMaxConnections(maxConnections);
        return pool;
    }

    // 队列监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable1(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-1");
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable2(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-2");
        return factory;
    }
    
    @Bean
    public JmsMessagingTemplate jmsMessagingTemplate(PooledConnectionFactory connectionFactory){
        return new JmsMessagingTemplate(connectionFactory);
    }

}

修改队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

修改主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1", containerFactory = "jmsListenerContainerTopic")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable1")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

16.学习ActiveMQ-在SpringMVC中使用ActiveMQ

2020-05-13   龙德   ActiveMQ   ActiveMQ SpringMVC  

新建工程

新建一个 Maven 工程,我的工程叫 springmvc-activemq-example。

pom.xml 文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>wang.miansen</groupId>
    <artifactId>springmvc-activemq-example</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>springmvc-activemq-example Maven Webapp</name>
    <url>http://maven.apache.org</url>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
    
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        
        <!-- 1)Spring核心 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 2)Spring DAO层 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 3)Spring web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 4)Spring test -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>

        <!-- 1)ActiveMQ 核心 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

        <!-- 2)ActiveMQ与 Spring 集成 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>

        <!--3)ActiveMQ所需要的pool包 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>springmvc-activemq-example</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <!-- web.xml版本 -->
                <version>3.1</version>
                <configuration>
                    <!-- jdk版本 -->
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

队列生产者

新建队列生产者 QueueProducer 类:

public class QueueProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

队列消费者

新建 3 个队列消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class QueueConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

新建主题生产者 TopicProducer 类:

public class TopicProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Topic topic, final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(topic, mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

主题消费者

新建 3 个主题消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class TopicConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

application.properties

新建 application.properties 文件,配置 ActiveMQ 的相关信息。

# jms服务器地址
jms.broker.url=tcp://localhost:61616
# 连接用户名
jms.broker.userName=admin
# 连接密码
jms.broker.password=admin
# 使用异步发送
jms.broker.useAsyncSend=true
# 连接池的最大连接数
jms.pool.maxConnections=10

application-context.xml

新建 application-context.xml 文件,配置各个 Bean。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

    <!-- 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker.url}"/>
        <property name="userName" value="${jms.broker.userName}"></property>  
        <property name="password" value="${jms.broker.password}"></property>  
        <property name="useAsyncSend" value="${jms.broker.useAsyncSend}" />  
    </bean>
    
    <!-- 连接池 -->
    <!-- ActiveMQ 为我们提供了一个 PooledConnectionFactory,通过往里面注入一个 ActiveMQConnectionFactory   
        可以用来将 Connection、Session 和  MessageProducer 池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool 包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <!--真正可以产生连接的连接工厂,由对应的 JMS 服务厂商提供-->
        <property name="connectionFactory" ref="activeMQConnectionFactory" />
        <!--连接池的最大连接数-->
        <property name="maxConnections" value="${jms.pool.maxConnections}"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 队列和非持久化订阅使用 -->
    <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory2" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory3" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
    </bean>
    
    <!--队列1-->
    <bean id="activeMQQueue1" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-1" />
    </bean>
    
    <!--队列2-->
    <bean id="activeMQQueue2" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-2" />
    </bean>
    
    <!--主题1-->
    <bean id="activeMQTopic1" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-1"/>
    </bean>
    
    <!--主题2-->
    <bean id="activeMQTopic2" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-2"/>
    </bean>
    
    <!-- 队列 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQQueue1"/>
        <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false"/>
    </bean>
    
    <!-- 主题 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateTopic" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQTopic1"/>
         <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true"/>
    </bean>
    
    <!--队列监听容器1,一经注册,自动监听-->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue1" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer1" />
    </bean>
    
    <!--队列监听容器2,一经注册,自动监听-->
    <bean id="queueListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer2" />
    </bean>
    
    <!--队列监听容器3,注意目的地跟容器2一样-->
    <bean id="queueListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer3" />
    </bean>
    
    <!--主题监听容器1(非持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic1" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="false" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer1" />
    </bean>
    
    <!--主题监听容器2(持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory2" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer2" />
    </bean>
    
    <!-- 主题监听容器3(持久化订阅),注意目的地跟容器2一样 -->
    <bean id="topicListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory3" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer3" />
    </bean>
    
    <!-- 队列生产者 -->
    <bean id="queueProducer" class="com.example.activemq.QueueProducer">
        <property name="jmsTemplate" ref="jmsTemplateQueue" />
    </bean>
    
    <!-- 主题生产者 -->
    <bean id="topicProducer" class="com.example.activemq.TopicProducer">
        <property name="jmsTemplate" ref="jmsTemplateTopic" />
    </bean>
    
    <!-- 队列消费者1 -->
    <bean id="queueConsumer1" class="com.example.activemq.QueueConsumer1" />
    <!-- 队列消费者2 -->
    <bean id="queueConsumer2" class="com.example.activemq.QueueConsumer2" />
    <!-- 队列消费者3 -->
    <bean id="queueConsumer3" class="com.example.activemq.QueueConsumer3" />
    
    <!-- 主题消费者1 -->
    <bean id="topicConsumer1" class="com.example.activemq.TopicConsumer1" />
    <!-- 主题消费者2 -->
    <bean id="topicConsumer2" class="com.example.activemq.TopicConsumer2" />
    <!-- 主题消费者3 -->
    <bean id="topicConsumer3" class="com.example.activemq.TopicConsumer3" />
    
    <!-- 消息转换器 -->
    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</beans>

项目结构

最后的项目结构如下:

image

测试

新建测试类 ApplicationTest

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:application-context.xml"})
public class ApplicationTest {

    @Resource(name = "activeMQQueue1")
    private Queue queue1;

    @Resource(name = "activeMQQueue2")
    private Queue queue2;

    @Resource(name = "activeMQTopic1")
    private Topic topic1;

    @Resource(name = "activeMQTopic2")
    private Topic topic2;

    @Autowired
    private QueueProducer queueProducer;

    @Autowired
    private TopicProducer topicProducer;

    private Map<String, Object> mapMessage;

    @Before
    public void initMessage() throws Exception {
        mapMessage = new HashMap<>();
        mapMessage.put("message", "Hello ActiveMQ - " + new Date().getTime());
    }

    // 发送消息到队列1
    @Test
    public void queue1() throws Exception {
        queueProducer.sendMessage(queue1, mapMessage);
    }

    // 发送消息到队列2
    @Test
    public void queue2() throws Exception {
        queueProducer.sendMessage(queue2, mapMessage);
    }

    // 发送消息到主题1
    @Test
    public void topic1() throws Exception {
        topicProducer.sendMessage(topic1, mapMessage);
    }

    // 发送消息到主题2
    @Test
    public void topic2() throws Exception {
        topicProducer.sendMessage(topic2, mapMessage);
    }

}

测试队列1

运行 queue1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

image

测试队列2

由于队列2被两个消费者监听,又由于 ActiveMQ 队列中的消息只能被一个消费者消费,所以这两个消费者应该是通过负载均衡的方式消费。

运行 queue2() 方法,可以看到有时是消费者2消费,有时又是消费者3消费。

测试主题1

运行 topic1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

测试主题2

运行 topic2() 方法,生产者发送消息后,由于是发布订阅的模式,所以消费者2和消费者3都能收到消息。

查看控制台,可以看到有两个持久化订阅的消费者。

image