8.学习ActiveMQ-主题模式的特点以及消费者消费消息的3种情况

2020-05-11   龙德   ActiveMQ   ActiveMQ  

主题模式的特点

1.生产者将消息发布到主题中,每个消息可以被多个消费者消费,属于一对多的关系。

2.生产者和消费者存在时间上的相关性,必须先启动消费者。

3.生产者生产时,主题模式不会持久化消息,所以先启动消费者,再启动生产者,这样消息才不会丢弃。

消费者消费消息的3种情况

1.先生产,然后启动1号消费者,1号消费者能消费到消息吗?

答:不可以。

2.先生产,然后启动1号消费者,再启动2号消费者,1号消费者能消费吗?2号消费者能消费吗?

答:都不可以。

3.先启动2个消费者,然后再生产消息,消费情况如何?

答:都可以消费到消息。

9.学习ActiveMQ-消息生产者的事务

2020-05-11   龙德   ActiveMQ   ActiveMQ  

之前我们说过,在创建 session 的时候,我们传了两个参数,第一个是事务,第二个是签收,在这篇博客中,我们来说说事务。

connection.createSession()方法的第一个参数就是事务,它的值可以是true或false,代表session的提交是事务提交还是非事务提交。

当事务的值是false时,只要执行了producer.send()方法,消息就到了队列中,也就是自动提交了。

当事务的值是true时,在执行完roducer.send()方法后,在session关闭之前需要多加一个session.commit()方法提交事务。

第一个参数传入 true,这样就开启事务了。

image

session关闭之前提交事务,否则消息不会进入队列中。

image

事务的提交,用于实际复杂的业务场景,可能有多个消息需要入队列,假设有一条入队列报错了,我们希望这一批次的都要回滚,这时候就要用到session.rollback()方法了,可以将session.rollback()方法放在catch语句块中来执行。

image

10.学习ActiveMQ-消息消费者的事务

2020-05-11   龙德   ActiveMQ   ActiveMQ  

事务主要是针对生产者而言的,但是对于消费者,也有事务。

将connection.createSession()的第一个参数改为true,表示消费者采用事务的方式来消费消息。

image

通过前面的介绍,我们知道,如果开启了事务,就需要手动执行session.commit()来提交事务,假设这时候,我们不提交事务,看看会出现什么情况。

我们先启动生产者。

可以看到生产者生产了一条消息。

image

image

接着启动消费者。

这时候消费者是可以消费到消息的。

image

但是注意看控制台,这时候队列中还存在1条消息等待消息,有0条消息已消费。

image

这时候我们如果再启动一个消费者,发现又做了一次消费,也就是重复消费了。

也就是说在事务模式下,消费者虽然消费到了消息,但是它没有提交事务,也就没有通知到MQ,所以MQ并不知道消息是否被消费了,所以就会出现重复消费的情况。

所以消费者如果开启了事务,那么也要提交事务。否则MQ并不知道消息是否被消费了,最后就会出现重复消费的情况。

7.学习ActiveMQ-消息消费者(主题模式)

2020-05-11   龙德   ActiveMQ   ActiveMQ  

主题模式和队列模式非常类似,只需要将 createQueue 改成 createTopic 就行了。

下面是代码:

public class TopicConsumer {

    public static void main(String[] args) throws JMSException {

        // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

        // 2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        // 3.开启连接
        connection.start();

        // 4.使用连接对象创建会话对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.使用会话对象创建目标对象,这里我们创建的是主题模式,生产者与消费者的名称要保持一致。
        Destination destination = session.createTopic("topic-test-01");

        // 6.使用会话对象创建消费者对象
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.通过监听器的方式来接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        System.out.println("已消费消息:" + ((TextMessage) message).getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

先启动消费者,为了看清1对多的效果,我们启动3个消费者。

image

可以看到,名为 “topic-test-01” 的主题有 3 个订阅者。

接着我们启动生产者,生产1条消息。就可以看到3个消费者都消费到了消息。

image

12.学习ActiveMQ-事务模式下消费者签收

2020-05-11   龙德   ActiveMQ   ActiveMQ  

前面讲的签收是没有事务的,如果有事务,签收又是怎么样的呢?来看下面的实验。

将connection.createSession()的第一个参数改为true,表示消费者采用事务的方式来消费消息。

同时将第二个参数改为Session.CLIENT_ACKNOWLEDGE,表示手动签收消息。

image

第一种情况

写 session.commit() 方法,不写 TextMessage.acknowledge() 方法。

image

依次启动生产者和消费者,这时候,消费者可以拿到消息。

把消费者关了再启动,却不会出现重复消费的情况,这是为什么呢?

我们可以理解为执行了 session.commit() 操作,也就告知了 MQ,消费者已经完成了签收,这里的 CLIENT_ACKNOWLEDGE 也就相当于 AUTO_ACKNOWLEDGE 了。

第二种情况

不写 session.commit() 方法,写 TextMessage.acknowledge() 方法。

image

依次启动生产者和消费者,这时候,消费者可以拿到消息。

把消费者关了再启动,发现消费者重复消费了消息。

结论

通过上面两种情况,我们可以得出结论:事务的作用 > 签收的作用。

  • 在事务模式下,当一个事务被成功提交,则消息被自动签收,如果事务回滚,消息会被再次传送。

  • 在非事务模式下,消息何时被确认取决于创建会话的签收模式。如果是 AUTO_ACKNOWLEDGE 模式,则自动签收。如果是 CLIENT_ACKNOWLEDGE 模式,则调用 TextMessage.acknowledge() 方法签收。

11.学习ActiveMQ-非事务模式下消费者签收

2020-05-11   龙德   ActiveMQ   ActiveMQ  

事务主要是偏向生产者,签收主要是偏向消费者。

之前我们说过,在创建 session 的时候,我们传了两个参数,第一个是事务,第二个是签收,在这篇博客中,我们来说说签收。

签收模式主要有以下4种:

  • AUTO_ACKNOWLEDGE

自动签收消息。只要有消息就消费。

对于这种模式,可以理解为快递员自动给你签收了并放到了快递柜中。

  • CLIENT_ACKNOWLEDGE

手动签收消息。需要在消费端调用一下 TextMessage.acknowledge() 方法才会签收。

对于这种模式,可以理解为见到快递员,开箱验货,再签收。

  • DUPS_OK_ACKNOWLEDGE

表示 session 不必确保对传送消息的签收,它可能会引起消息的重复,但是降低了 session 开销,所以消费者需要能容忍重复的消息。在第二次重新传递消息的时候,消息头的 JmsDelivered 会被置为 true 标示当前消息已经传送过一次,消费者需要进行消息的重复处理控制

  • SESSION_TRANSACTED

表示消息生产者提供了消息发送的事务处理方式。也就是指,消息生产者发送消息给 broker 后,broker 只是暂存该消息,只有当生产者给 broker 进行事务确认消息后,broker 才把消息加入到待发送队列中,换言之,如果消息发送者进行了事务回滚,消息会直接从 broker 中删除

第一种前面已经见过了,就不介绍了。后面那两种用得比较少,也不介绍了。我们来看看第二种。

将connection.createSession()的第二个参数改为Session.CLIENT_ACKNOWLEDGE,表示手动签收消息。

image

然后启动生产者生产1条消息。

接着启动消费者消费消息。

image

可以看到消费者消费了1条消息。

但是注意看控制台,这时候队列中还存在1条消息等待消息,有0条消息已消费。

image

这时候如果我们把消费者关了再启动,发现又做了一次消费,也就是重复消费了。

所以需要在消费端调用一下 TextMessage.acknowledge() 方法才会签收,否则会重复消费。

image

6.学习ActiveMQ-消息生产者(主题模式)

2020-05-10   龙德   ActiveMQ   ActiveMQ  

主题模式和队列模式非常类似,只需要将 createQueue 改成 createTopic 就行了。

下面是代码:

public class TopicProducer {

    public static void main(String[] args) {

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

            // 2.使用连接工厂创建一个连接对象
            connection = connectionFactory.createConnection();

            // 3.开启连接
            connection.start();

            // 4.使用连接对象创建会话对象。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 5.使用会话对象创建目标对象,这里我们创建的是 topic,也就是一对多模式
            Destination destination = session.createTopic("topic-test-01");

            // 6.使用会话对象创建生产者对象
            producer = session.createProducer(destination);

            // 7.使用会话对象创建一个文本消息对象
            TextMessage textMessage = session.createTextMessage();

            String text = "Hello ActiveMQ - " + new Date().getTime();

            // 8.设置消息内容
            textMessage.setText(text);

            // 9.发送消息
            producer.send(textMessage);
            System.out.println("已生产消息:" + text);

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 10.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

主题模式下,ActiveMQ 是不保存消息的,它是无状态的。假如无人订阅,那么生产出来的消息就是一条废消息,所以,一般情况下,需要先启动消费者,再启动生产者。

上述内容可以联系微信里的关注公众号来理解,我关注了公众号之后,公众号会给我推送消息,我没有关注,自然不会收到微信公众号发来的消息。并且,这个推送是自我关注之后,才收到的,之前微信公众号发布过的消息,不会给我补发和补推送。

5.学习ActiveMQ-队列模式的特点以及消费者消费消息的3种情况

2020-05-09   龙德   ActiveMQ   ActiveMQ  

队列模式的特点

1.生产者将消息发布到队列中,每个消息只能被一个消费者消费,属于一对一的关系。

2.生产者和消费者不存在时间上的相关性,先启动生产者还是消费者都是一样的。

3.生产者生产时,队列模式默认是会持久化消息的,所以先启动生产者,再启动消费者,消费者还是能消费到消息,也就是说队列中的消息不会丢弃。

消费者消费消息的3种情况

1.先生产,然后启动1号消费者,1号消费者能消费到消息吗?

答:可以,因为队列模式默认会持久化数据。

2.先生产,然后启动1号消费者,再启动2号消费者,1号消费者能消费吗?2号消费者能消费吗?

答:1号可以消费,2号不能消费。因为队列模式下的消息只允许一个消费者消费,不允许重复消费。又因为消息已经被1号消费者消费完了,没有待消费的消息,所以2号不能消息到消息。

3.先启动2个消费者,然后再生产消息,消费情况如何?

答:经过多次测试,可以发现,消费者是轮询消费消息的,这样可以让消息平均分配,不至于一个消费者特别忙,一个消费者特别闲,也不会出现抢占资源的情况。

4.学习ActiveMQ-消息消费者(队列模式)

2020-05-08   龙德   ActiveMQ   ActiveMQ  

消费者消费消息有两种方式,第一种方式是使用 receive() 方法,这种方式是同步阻塞的,会阻塞等待生产者生产消息。第二种方式是使用 MessageListener,这种方式是异步非阻塞的,当有消息到达的时候,会调用它的 onMessage() 方法。

第一种方式:receive()

public class QueueConsumer {

    public static void main(String[] args) {
        
        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;
        
        try {
            // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

            // 2.使用连接工厂创建一个连接对象
            connection = connectionFactory.createConnection();

            // 3.开启连接
            connection.start();

            // 4.使用连接对象创建会话对象。
            // 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。与生产者保持一致,并且名称也要一致。
            Destination destination = session.createQueue("queue-test-01");

            // 6.使用会话对象创建消费者对象
            consumer = session.createConsumer(destination);

            // 7.使用 receive() 方法消费消息,当没有消息的时候会阻塞在这,等待提供者提供消息
            TextMessage msg = (TextMessage) consumer.receive();
            System.out.println("已接收消息:" + msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 8.关闭资源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

运行消费者,查看控制台,可以看到消费了 1 条消息,有 0 条消息等待消费。

image

由于队列中已经存在一条消息了,所以看不到阻塞的效果。我们重新启动生产者,就可以看到阻塞在了 receive() 处。

image

此时如果没有消息进入队列中,消费者就会一直阻塞着。

如何做到一直接收消息呢,总不能接收到一条消息就关了吧。我们可以使用 while() 循环一直接收消息

while (true) {
    TextMessage msg = (TextMessage) consumer.receive();
    System.out.println("已接收消息:" + msg.getText());
}

但这种方式毕竟会阻塞当前线程,如果使用这种方式,那么后面的代码也就别执行了。所以我们可以另起一条线程来解决阻塞的问题。

public class QueueConsumer {

    public static void main(String[] args) throws JMSException {

        // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

        // 2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        // 3.开启连接
        connection.start();

        // 4.使用连接对象创建会话对象。
        // 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。与生产者保持一致,并且名称也要一致。
        Destination destination = session.createQueue("queue-test-01");

        // 6.使用会话对象创建消费者对象
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.使用 receive() 方法消费消息,当没有消息的时候会阻塞,我们可以开一个线程解决阻塞的问题
        new Thread("My-ActiveMQ-Thread") {

            @Override
            public void run() {

                try {
                    while (true) {
                        TextMessage msg = (TextMessage) consumer.receive();
                        System.out.println("已接收消息:" + msg.getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        }.start();
    }
}

第二种方式:MessageListener

public class QueueConsumer {

    public static void main(String[] args) throws JMSException {

        // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);

        // 2.使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();

        // 3.开启连接
        connection.start();

        // 4.使用连接对象创建会话对象。
        // 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。与生产者保持一致,并且名称也要一致。
        Destination destination = session.createQueue("queue-test-01");

        // 6.使用会话对象创建消费者对象
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.通过监听器的方式来接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        System.out.println("已接收消息:" + ((TextMessage) message).getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

3.学习ActiveMQ-消息生产者(队列模式)

2020-05-07   龙德   ActiveMQ   ActiveMQ  

ActiveMQ 消息传送有两种模型:

  • 点对点(P2P),即队列模式
  • 发布订阅(PUB/SUB)模式,即主题模式

队列模式的特点:

1.生产者将消息发布到队列中,每个消息只能被一个消费者消费,属于一对一的关系。

2.生产者和消费者不存在时间上的相关性,先启动生产者还是消费者都是一样的。

3.生产者生产时,队列模式默认是会持久化消息的,所以先启动生产者,再启动消费者,消费者还是能消费到消息,也就是说队列中的消息不会丢弃。

在 Java 中使用 ActiveMQ,跟使用 JDBC、JPA 一样,都是有套路的。下面我们通过一个 Hello World 级别的例子,来感受一下在 Java 中如何使用 ActiveMQ。

先引入依赖

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
</dependency>
public class QueueProducer {

    public static void main(String[] args) {
        
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        
        try {
            // 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
            
            // 2.使用连接工厂创建一个连接对象
            connection = connectionFactory.createConnection();
            
            // 3.开启连接
            connection.start();
            
            // 4.使用连接对象创建会话对象。
            // 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。除此之外还有一对多模式:createTopic()
            Destination destination = session.createQueue("queue-test-01");
            
            // 6.使用会话对象创建生产者对象
            producer = session.createProducer(destination);
            
            // 7.使用会话对象创建一个文本消息对象
            TextMessage textMessage = session.createTextMessage();
            
            String text = "Hello ActiveMQ - " + new Date().getTime();
            
            // 8.设置消息内容
            textMessage.setText(text);
            
            // 9.发送消息
            producer.send(textMessage);
            
            System.out.println("已发送消息:" + text);
            
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 10.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

运行生产者,查看控制台,点击 “Queues”,可以看到生产了 1 条消息,有 1 条消息等待消费。

image

上面的表头分别是:

  • Name:队列名称
  • Messages Enqueued:生产条数,记做 P
  • Messages Dequeued:消费条数,记做 C
  • Number Of Consumers:消费者数量
  • Number Of Pending Messages:表示还有多少条消息没有被消费,实际上是表示消息的积压程度,就是 P-C