17.学习ActiveMQ-在SpringBoot中使用ActiveMQ

2020-05-15   龙德   ActiveMQ   ActiveMQ SpringBoot  

新建工程

新建一个 Maven 工程,我的工程叫 springboot-activemq-example。

pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/>
    </parent>
    
    <groupId>wang.miansen</groupId>
    <artifactId>springboot-activemq-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>springboot-activemq-example</name>
    <description>activemq project for Spring Boot</description>
    
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>

    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
        <!--消息队列连接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

server.port=8080
# 配置消息的类型,如果是true则表示为topic消息,如果为false表示Queue消息
spring.jms.pub-sub-domain=true
# 连接用户名
spring.activemq.user=admin
# 连接密码
spring.activemq.password=admin
# 连接主机信息
spring.activemq.brokerUrl=tcp://localhost:61616
# 连接池的最大连接数
jms.pool.maxConnections=10

新建配置类 ActiveMQConfig

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }
    
    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }
    
    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }
    
}

队列生产者

@Component
public class QueueProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-queue-1")
    private Queue queue1;
    
    @Resource(name = "springboot-activemq-queue-2")
    private Queue queue2;
    
    // 发送消息到队列1
    public void sendQueueMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue1, mapMessage);
    }
    
    // 发送消息到队列2
    public void sendQueueMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(queue2, mapMessage);
    }

}

队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

@Component
public class TopicProducer {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Resource(name = "springboot-activemq-topic-1")
    private Topic topic1;
    
    @Resource(name = "springboot-activemq-topic-2")
    private Topic topic2;
    
    // 发送消息到主题1
    public void sendTopicMessage1(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic1, mapMessage);
    }
    
    // 发送消息到主题2
    public void sendTopicMessage2(final Map<String, Object> mapMessage) {
        jmsMessagingTemplate.convertAndSend(topic2, mapMessage);
    }

}

主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

测试

如果要测试主题,那么需要修改配置 spring.jms.pub-sub-domain=true

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private QueueProducer queueProducer;
    
    @Autowired
    private TopicProducer topicProducer;
    
    @Test
    public void sendQueueMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息1");
        queueProducer.sendQueueMessage1(map);
    }
    
    @Test
    public void sendQueueMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是队列消息2");
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
        queueProducer.sendQueueMessage2(map);
    }
    
    @Test
    public void sendTopicMessage1() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息1");
        topicProducer.sendTopicMessage1(map);
    }
    
    @Test
    public void sendTopicMessage2() throws Exception {
        Map<String, Object> map = new HashMap<>();
        map.put("message", "Hello ActiveMQ,我是主题消息2");
        topicProducer.sendTopicMessage2(map);
    }
    
}

同时配置队列和主题

上面说过,如果想要使用主题,那么需要修改配置文件,这样不方便。我们可以在 ActiveMQConfig 类里同时配置队列和主题。

修改 ActiveMQConfig 类:

@Configuration
@EnableJms // 开启JMS适配的注解
public class ActiveMQConfig {

    @Value("${spring.activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String user;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${jms.pool.maxConnections}")
    private int maxConnections;

    // 队列1
    @Bean(name = "springboot-activemq-queue-1")
    public Queue queue1() {
        return new ActiveMQQueue("springboot-activemq-queue-1");
    }

    // 队列2
    @Bean(name = "springboot-activemq-queue-2")
    public Queue queue2() {
        return new ActiveMQQueue("springboot-activemq-queue-2");
    }

    // 主题1
    @Bean(name = "springboot-activemq-topic-1")
    public Topic topic1() {
        return new ActiveMQTopic("springboot-activemq-topic-1");
    }

    // 主题2
    @Bean(name = "springboot-activemq-topic-2")
    public Topic topic2() {
        return new ActiveMQTopic("springboot-activemq-topic-2");
    }

    // 真正可以产生连接的工厂
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(user, password, brokerUrl);
    }

    // ActiveMQ 提供的连接池
    @Bean
    public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        PooledConnectionFactory pool = new PooledConnectionFactory();
        pool.setConnectionFactory(connectionFactory);
        pool.setMaxConnections(maxConnections);
        return pool;
    }

    // 队列监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable1(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-1");
        return factory;
    }

    // 主题监听容器(持久化订阅)
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerSubscriptionDurable2(
            PooledConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        // 持久化订阅
        factory.setSubscriptionDurable(true);
        // 持久订阅必须指定一个 clientId
        factory.setClientId("springboot-activemq-topic-clientId-2");
        return factory;
    }
    
    @Bean
    public JmsMessagingTemplate jmsMessagingTemplate(PooledConnectionFactory connectionFactory){
        return new JmsMessagingTemplate(connectionFactory);
    }

}

修改队列消费者

@Component
public class QueueConsumer {

    // 监听队列1的消息
    @JmsListener(destination = "springboot-activemq-queue-1", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听队列2的消息
    @JmsListener(destination = "springboot-activemq-queue-2", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

修改主题消费者

@Component
public class TopicConsumer {

    // 监听主题1的消息
    @JmsListener(destination = "springboot-activemq-topic-1", containerFactory = "jmsListenerContainerTopic")
    public void receiveQueueMessage1(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable1")
    public void receiveQueueMessage2(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者2收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    // 监听主题2的消息
    @JmsListener(destination = "springboot-activemq-topic-2", containerFactory = "jmsListenerContainerSubscriptionDurable2")
    public void receiveQueueMessage3(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者3收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

16.学习ActiveMQ-在SpringMVC中使用ActiveMQ

2020-05-13   龙德   ActiveMQ   ActiveMQ SpringMVC  

新建工程

新建一个 Maven 工程,我的工程叫 springmvc-activemq-example。

pom.xml 文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>wang.miansen</groupId>
    <artifactId>springmvc-activemq-example</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>springmvc-activemq-example Maven Webapp</name>
    <url>http://maven.apache.org</url>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
    
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        
        <!-- 1)Spring核心 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 2)Spring DAO层 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 3)Spring web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <!-- 4)Spring test -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>

        <!-- 1)ActiveMQ 核心 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

        <!-- 2)ActiveMQ与 Spring 集成 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>

        <!--3)ActiveMQ所需要的pool包 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>springmvc-activemq-example</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <!-- web.xml版本 -->
                <version>3.1</version>
                <configuration>
                    <!-- jdk版本 -->
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

队列生产者

新建队列生产者 QueueProducer 类:

public class QueueProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

队列消费者

新建 3 个队列消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class QueueConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("队列消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

主题生产者

新建主题生产者 TopicProducer 类:

public class TopicProducer {

    private JmsTemplate jmsTemplate;
    
    public void sendMessage(final Topic topic, final Map<String, Object> mapMessage) {
        jmsTemplate.convertAndSend(topic, mapMessage);
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
}

主题消费者

新建 3 个主题消费者,这是其中一个,其它两个都一样的,就不贴出来了。

public class TopicConsumer1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                System.out.println("主题消费者1收到了消息:" + mapMessage.getString("message"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

application.properties

新建 application.properties 文件,配置 ActiveMQ 的相关信息。

# jms服务器地址
jms.broker.url=tcp://localhost:61616
# 连接用户名
jms.broker.userName=admin
# 连接密码
jms.broker.password=admin
# 使用异步发送
jms.broker.useAsyncSend=true
# 连接池的最大连接数
jms.pool.maxConnections=10

application-context.xml

新建 application-context.xml 文件,配置各个 Bean。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:application.properties</value>
            </list>
        </property>
    </bean>

    <!-- 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker.url}"/>
        <property name="userName" value="${jms.broker.userName}"></property>  
        <property name="password" value="${jms.broker.password}"></property>  
        <property name="useAsyncSend" value="${jms.broker.useAsyncSend}" />  
    </bean>
    
    <!-- 连接池 -->
    <!-- ActiveMQ 为我们提供了一个 PooledConnectionFactory,通过往里面注入一个 ActiveMQConnectionFactory   
        可以用来将 Connection、Session 和  MessageProducer 池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool 包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <!--真正可以产生连接的连接工厂,由对应的 JMS 服务厂商提供-->
        <property name="connectionFactory" ref="activeMQConnectionFactory" />
        <!--连接池的最大连接数-->
        <property name="maxConnections" value="${jms.pool.maxConnections}"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 队列和非持久化订阅使用 -->
    <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory2" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
    </bean>
    
    <!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
    <!-- 持久化订阅使用 -->
    <bean id="subscriptionDurableSingleConnectionFactory3" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标 ConnectionFactory,对应真实的可以产生 JMS Connection 的 ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
         <!--持久订阅ID -->  
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
    </bean>
    
    <!--队列1-->
    <bean id="activeMQQueue1" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-1" />
    </bean>
    
    <!--队列2-->
    <bean id="activeMQQueue2" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-queue-2" />
    </bean>
    
    <!--主题1-->
    <bean id="activeMQTopic1" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-1"/>
    </bean>
    
    <!--主题2-->
    <bean id="activeMQTopic2" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 主题的名称 -->
        <constructor-arg index="0" value="springmvc-activemq-topic-2"/>
    </bean>
    
    <!-- 队列 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQQueue1"/>
        <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false"/>
    </bean>
    
    <!-- 主题 JMS 模板,Spring 提供的 JMS 工具类,用它发送、接收消息。 -->
    <bean id="jmsTemplateTopic" class="org.springframework.jms.core.JmsTemplate">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory"/>
        <!-- 默认的目的地,如果发送时不指定目的地,那么就用这个默认的目的地 -->
        <property name="defaultDestination" ref="activeMQTopic1"/>
         <!-- 消息转换器 -->
        <property name="messageConverter" ref="simpleMessageConverter" />
        <!-- 发送模式,1: 非持久化,2: 持久化 -->
        <property name="deliveryMode" value="2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true"/>
    </bean>
    
    <!--队列监听容器1,一经注册,自动监听-->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue1" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer1" />
    </bean>
    
    <!--队列监听容器2,一经注册,自动监听-->
    <bean id="queueListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer2" />
    </bean>
    
    <!--队列监听容器3,注意目的地跟容器2一样-->
    <bean id="queueListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="false" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQQueue2" />
        <!-- 监听类 -->
        <property name="messageListener" ref="queueConsumer3" />
    </bean>
    
    <!--主题监听容器1(非持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="singleConnectionFactory" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic1" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="false" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer1" />
    </bean>
    
    <!--主题监听容器2(持久化订阅),一经注册,自动监听-->
    <bean id="topicListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory2" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-2" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer2" />
    </bean>
    
    <!-- 主题监听容器3(持久化订阅),注意目的地跟容器2一样 -->
    <bean id="topicListenerContainer3" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!-- 连接池 -->
        <property name="connectionFactory" ref="subscriptionDurableSingleConnectionFactory3" />
        <!-- 监听目的地 -->
        <property name="destination" ref="activeMQTopic2" />
        <!-- 持久化订阅 -->
        <property name="subscriptionDurable" value="true" />
        <!-- 设置接收客户端的 ID,配置持久订阅必须指定一个 clientId -->
        <property name="clientId" value="springmvc-activemq-topic-clientId-3" />
        <!-- 是否为发布订阅模式,队列是 false,主题是 true --> 
        <property name="pubSubDomain" value="true" />
        <!-- 监听类 -->
        <property name="messageListener" ref="topicConsumer3" />
    </bean>
    
    <!-- 队列生产者 -->
    <bean id="queueProducer" class="com.example.activemq.QueueProducer">
        <property name="jmsTemplate" ref="jmsTemplateQueue" />
    </bean>
    
    <!-- 主题生产者 -->
    <bean id="topicProducer" class="com.example.activemq.TopicProducer">
        <property name="jmsTemplate" ref="jmsTemplateTopic" />
    </bean>
    
    <!-- 队列消费者1 -->
    <bean id="queueConsumer1" class="com.example.activemq.QueueConsumer1" />
    <!-- 队列消费者2 -->
    <bean id="queueConsumer2" class="com.example.activemq.QueueConsumer2" />
    <!-- 队列消费者3 -->
    <bean id="queueConsumer3" class="com.example.activemq.QueueConsumer3" />
    
    <!-- 主题消费者1 -->
    <bean id="topicConsumer1" class="com.example.activemq.TopicConsumer1" />
    <!-- 主题消费者2 -->
    <bean id="topicConsumer2" class="com.example.activemq.TopicConsumer2" />
    <!-- 主题消费者3 -->
    <bean id="topicConsumer3" class="com.example.activemq.TopicConsumer3" />
    
    <!-- 消息转换器 -->
    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</beans>

项目结构

最后的项目结构如下:

image

测试

新建测试类 ApplicationTest

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:application-context.xml"})
public class ApplicationTest {

    @Resource(name = "activeMQQueue1")
    private Queue queue1;

    @Resource(name = "activeMQQueue2")
    private Queue queue2;

    @Resource(name = "activeMQTopic1")
    private Topic topic1;

    @Resource(name = "activeMQTopic2")
    private Topic topic2;

    @Autowired
    private QueueProducer queueProducer;

    @Autowired
    private TopicProducer topicProducer;

    private Map<String, Object> mapMessage;

    @Before
    public void initMessage() throws Exception {
        mapMessage = new HashMap<>();
        mapMessage.put("message", "Hello ActiveMQ - " + new Date().getTime());
    }

    // 发送消息到队列1
    @Test
    public void queue1() throws Exception {
        queueProducer.sendMessage(queue1, mapMessage);
    }

    // 发送消息到队列2
    @Test
    public void queue2() throws Exception {
        queueProducer.sendMessage(queue2, mapMessage);
    }

    // 发送消息到主题1
    @Test
    public void topic1() throws Exception {
        topicProducer.sendMessage(topic1, mapMessage);
    }

    // 发送消息到主题2
    @Test
    public void topic2() throws Exception {
        topicProducer.sendMessage(topic2, mapMessage);
    }

}

测试队列1

运行 queue1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

image

测试队列2

由于队列2被两个消费者监听,又由于 ActiveMQ 队列中的消息只能被一个消费者消费,所以这两个消费者应该是通过负载均衡的方式消费。

运行 queue2() 方法,可以看到有时是消费者2消费,有时又是消费者3消费。

测试主题1

运行 topic1() 方法,生产者发送消息后,消费者通过监听的方式收到了消息。

测试主题2

运行 topic2() 方法,生产者发送消息后,由于是发布订阅的模式,所以消费者2和消费者3都能收到消息。

查看控制台,可以看到有两个持久化订阅的消费者。

image

14.学习ActiveMQ-主题的持久化

2020-05-12   龙德   ActiveMQ   ActiveMQ  

主题的持久化跟队列有点不一样,我们先看生产者。

生产者改动不大,只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。

以下是代码:

public class DurableTopicProducer {

    public static void main(String[] args) {

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

            // 2.使用连接工厂创建一个连接对象
            connection = connectionFactory.createConnection();

            // 3.开启连接
            connection.start();

            // 4.使用连接对象创建会话对象。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 5.使用会话对象创建目标对象,这里我们创建的是 topic,也就是一对多模式
            Destination destination = session.createTopic("topic-test-01");

            // 6.使用会话对象创建生产者对象
            producer = session.createProducer(destination);
            
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            
            // 7.使用会话对象创建一个文本消息对象
            TextMessage textMessage = session.createTextMessage();

            String text = "Hello ActiveMQ - " + new Date().getTime();

            // 8.设置消息内容
            textMessage.setText(text);

            // 9.发送消息
            producer.send(textMessage);
            System.out.println("已生产消息:" + text);

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 10.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

然后是消费者,消费者改动比较多,以下是代码:

public class DurableTopicConsumer {

    public static void main(String[] args) throws JMSException {

        // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

        // 2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        
        // 3.设置ClientID
        
        connection.setClientID("client1");

        // 4.开启连接
        connection.start();

        // 5.使用连接对象创建会话对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 6.使用会话对象创建目的地对象,生产者与消费者的名称要保持一致。
        Topic topic = session.createTopic("topic-test-01");

        // 7.使用会话对象创建主题订阅者
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subscriber-1");

        // 8.通过监听器的方式来接收消息
        subscriber.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        System.out.println("已接收消息:" + ((TextMessage) message).getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

启动消费者,打开控制台页面,点击 “Subscribers”,可以看到以下页面。

image

在Active Durable Topic Subscribers里可以看到有一个消费者的信息,说明订阅成功了。

此时,我们将这个消费者线程关闭,再来看看,可以看到这个消费者跑到了Offline Durable Topic Subscribers里面,如下图所示。表明这个消费者离线了,但是依旧保持订阅关系。

image

下面做一个试验,此时消费者已经离线,我们先启动生产者,后启动消费者,此时消费者还能接收到刚才生产者发送的消息吗?

答案是可以的。

此时再刷新控制台,可以看到刚才在Offline的订阅者跑到了Active里面。

上述内容可以联系微信公众号理解。我们可以想象一个场景,我订阅了某公众号,后来我手机没电关机了,当我意识到后,充电再开机,我依旧能收到公众号推送给我的消息,这里的持久化,可以认为,我和公众号的联系被持久化了,所以我上线后可以及时收到消息。

13.学习ActiveMQ-队列的持久化

2020-05-12   龙德   ActiveMQ   ActiveMQ  

持久化

持久化队列只需要设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 就可以了。默认是持久化的。

我们可以做个试验,使用 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 设置为持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者依然可以拿到消息,也就是消息在宕机之前已经被存储了下来,后面再启动的时候,会读取到存储的数据。

非持久化

非持久化队列也很简单,只需要设置 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 就可以了。

我们再做个试验,使用 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 设置为非持久化模式,先用生产者产生3条消息发送给MQ,然后手动关闭MQ,再启动MQ,然后用consumer去消费,可以发现消费者什么都拿不到,也就是消息已经丢了,我们在MQ的控制台页面看到,MQ里的消息数量是0。

15.学习ActiveMQ-持久化数据到MySQL

2020-05-12   龙德   ActiveMQ   ActiveMQ  

ActiveMQ 的持久化可以这样理解:生产者生产消息之后,消息首先会到达 broker,broker 就相当于一个调度中心。broker 首先将消息存储在本地的数据文件(或者是远程数据库),再将消息发送给消费者。如果消费者接收到了消息,则将消息从存储中删除,否则继续尝试重发。

ActiveMQ 启动后,首先检查存储位置,如果有未发送成功的消息,需要先把消息发送出去。

ActiveMQ 默认的持久化数据库是 KahaDB,我们也可以改成 MySQL。

第一步,首先将 mysql-connector-java 的jar 扔到 ActiveMQ的lib目录下。

image

然后修改 conf 目录下的 activmeq.xml 中的 persistenceAdapter 结点为 JDBC。

<persistenceAdapter>
    <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
    <jdbcPersistenceAdapter dataSource="#mysql-datasource"/>
</persistenceAdapter>

再添加 mysql-datasource,这样上面的配置文件才能拿到 DataSource。将下面这段内容放在 broker 结点和 import 结点之间。

<bean id="mysql-datasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&amp;characterEncoding=UTF-8"/> 
    <property name="username" value="root"/> 
    <property name="password" value="123"/> 
    <property name="poolPreparedStatements" value="true"/> 
</bean>

上面的用户名和密码要改成你自己的,还有要注意 url 参数,这里的&要改成&amp;,也就是&的转义,否则启动会报错。

启动 ActiveMQ,打开 activemq 库,就会发现自动生成了 3 张表。

image

每张表的作用如下:

activemq_msgs(消息表):

image

activemq_acks(订阅关系表,针对于持久化的Topic,订阅者和服务器的订阅关系会保存在这个表中):

image

activemq_lock:

在集群环境中才有用,只有一个 Broker 可以获取消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用来记录哪个 Broker 是当前的 Master Broker。

测试队列的数据

运行队列生产者生产 3 条数据,然后查看 activemq_msgs 表,可以看到这 3 条消息已经存进 activemq_msgs 表了。

image

然后启动队列消费者消费消息,在刷新 activemq_msgs 表,可以看到数据没了,说明消息的消费操作也同步更新了数据库。

image

测试主题的数据

启动消费者 DurableTopicConsumer 类,查看数据库的 activemq_acks 表,可以发现订阅关系已经持久化到数据库里了。

image

启动生产者 DurableTopicProducer 类,这时候消息已经被消费了。但是再查看数据库的 activemq_msgs 表,里面的数据依然存在。

image

上述内容可以联系微信公众号来理解,微信公众号发布过的文章,不能因为订阅者已经收到了或者是服务器关闭了,就把发布过的文章和订阅者清空,这是不合理的,从这个角度来理解就能明白了。

8.学习ActiveMQ-主题模式的特点以及消费者消费消息的3种情况

2020-05-11   龙德   ActiveMQ   ActiveMQ  

主题模式的特点

1.生产者将消息发布到主题中,每个消息可以被多个消费者消费,属于一对多的关系。

2.生产者和消费者存在时间上的相关性,必须先启动消费者。

3.生产者生产时,主题模式不会持久化消息,所以先启动消费者,再启动生产者,这样消息才不会丢弃。

消费者消费消息的3种情况

1.先生产,然后启动1号消费者,1号消费者能消费到消息吗?

答:不可以。

2.先生产,然后启动1号消费者,再启动2号消费者,1号消费者能消费吗?2号消费者能消费吗?

答:都不可以。

3.先启动2个消费者,然后再生产消息,消费情况如何?

答:都可以消费到消息。

9.学习ActiveMQ-消息生产者的事务

2020-05-11   龙德   ActiveMQ   ActiveMQ  

之前我们说过,在创建 session 的时候,我们传了两个参数,第一个是事务,第二个是签收,在这篇博客中,我们来说说事务。

connection.createSession()方法的第一个参数就是事务,它的值可以是true或false,代表session的提交是事务提交还是非事务提交。

当事务的值是false时,只要执行了producer.send()方法,消息就到了队列中,也就是自动提交了。

当事务的值是true时,在执行完roducer.send()方法后,在session关闭之前需要多加一个session.commit()方法提交事务。

第一个参数传入 true,这样就开启事务了。

image

session关闭之前提交事务,否则消息不会进入队列中。

image

事务的提交,用于实际复杂的业务场景,可能有多个消息需要入队列,假设有一条入队列报错了,我们希望这一批次的都要回滚,这时候就要用到session.rollback()方法了,可以将session.rollback()方法放在catch语句块中来执行。

image

10.学习ActiveMQ-消息消费者的事务

2020-05-11   龙德   ActiveMQ   ActiveMQ  

事务主要是针对生产者而言的,但是对于消费者,也有事务。

将connection.createSession()的第一个参数改为true,表示消费者采用事务的方式来消费消息。

image

通过前面的介绍,我们知道,如果开启了事务,就需要手动执行session.commit()来提交事务,假设这时候,我们不提交事务,看看会出现什么情况。

我们先启动生产者。

可以看到生产者生产了一条消息。

image

image

接着启动消费者。

这时候消费者是可以消费到消息的。

image

但是注意看控制台,这时候队列中还存在1条消息等待消息,有0条消息已消费。

image

这时候我们如果再启动一个消费者,发现又做了一次消费,也就是重复消费了。

也就是说在事务模式下,消费者虽然消费到了消息,但是它没有提交事务,也就没有通知到MQ,所以MQ并不知道消息是否被消费了,所以就会出现重复消费的情况。

所以消费者如果开启了事务,那么也要提交事务。否则MQ并不知道消息是否被消费了,最后就会出现重复消费的情况。

7.学习ActiveMQ-消息消费者(主题模式)

2020-05-11   龙德   ActiveMQ   ActiveMQ  

主题模式和队列模式非常类似,只需要将 createQueue 改成 createTopic 就行了。

下面是代码:

public class TopicConsumer {

    public static void main(String[] args) throws JMSException {

        // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

        // 2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        // 3.开启连接
        connection.start();

        // 4.使用连接对象创建会话对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.使用会话对象创建目标对象,这里我们创建的是主题模式,生产者与消费者的名称要保持一致。
        Destination destination = session.createTopic("topic-test-01");

        // 6.使用会话对象创建消费者对象
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.通过监听器的方式来接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        System.out.println("已消费消息:" + ((TextMessage) message).getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

先启动消费者,为了看清1对多的效果,我们启动3个消费者。

image

可以看到,名为 “topic-test-01” 的主题有 3 个订阅者。

接着我们启动生产者,生产1条消息。就可以看到3个消费者都消费到了消息。

image

12.学习ActiveMQ-事务模式下消费者签收

2020-05-11   龙德   ActiveMQ   ActiveMQ  

前面讲的签收是没有事务的,如果有事务,签收又是怎么样的呢?来看下面的实验。

将connection.createSession()的第一个参数改为true,表示消费者采用事务的方式来消费消息。

同时将第二个参数改为Session.CLIENT_ACKNOWLEDGE,表示手动签收消息。

image

第一种情况

写 session.commit() 方法,不写 TextMessage.acknowledge() 方法。

image

依次启动生产者和消费者,这时候,消费者可以拿到消息。

把消费者关了再启动,却不会出现重复消费的情况,这是为什么呢?

我们可以理解为执行了 session.commit() 操作,也就告知了 MQ,消费者已经完成了签收,这里的 CLIENT_ACKNOWLEDGE 也就相当于 AUTO_ACKNOWLEDGE 了。

第二种情况

不写 session.commit() 方法,写 TextMessage.acknowledge() 方法。

image

依次启动生产者和消费者,这时候,消费者可以拿到消息。

把消费者关了再启动,发现消费者重复消费了消息。

结论

通过上面两种情况,我们可以得出结论:事务的作用 > 签收的作用。

  • 在事务模式下,当一个事务被成功提交,则消息被自动签收,如果事务回滚,消息会被再次传送。

  • 在非事务模式下,消息何时被确认取决于创建会话的签收模式。如果是 AUTO_ACKNOWLEDGE 模式,则自动签收。如果是 CLIENT_ACKNOWLEDGE 模式,则调用 TextMessage.acknowledge() 方法签收。