前面讲的签收是没有事务的,如果有事务,签收又是怎么样的呢?来看下面的实验。
将connection.createSession()的第一个参数改为true,表示消费者采用事务的方式来消费消息。
同时将第二个参数改为Session.CLIENT_ACKNOWLEDGE,表示手动签收消息。
写 session.commit() 方法,不写 TextMessage.acknowledge() 方法。
依次启动生产者和消费者,这时候,消费者可以拿到消息。
把消费者关了再启动,却不会出现重复消费的情况,这是为什么呢?
我们可以理解为执行了 session.commit() 操作,也就告知了 MQ,消费者已经完成了签收,这里的 CLIENT_ACKNOWLEDGE 也就相当于 AUTO_ACKNOWLEDGE 了。
不写 session.commit() 方法,写 TextMessage.acknowledge() 方法。
依次启动生产者和消费者,这时候,消费者可以拿到消息。
把消费者关了再启动,发现消费者重复消费了消息。
通过上面两种情况,我们可以得出结论:事务的作用 > 签收的作用。
在事务模式下,当一个事务被成功提交,则消息被自动签收,如果事务回滚,消息会被再次传送。
在非事务模式下,消息何时被确认取决于创建会话的签收模式。如果是 AUTO_ACKNOWLEDGE 模式,则自动签收。如果是 CLIENT_ACKNOWLEDGE 模式,则调用 TextMessage.acknowledge() 方法签收。
事务主要是偏向生产者,签收主要是偏向消费者。
之前我们说过,在创建 session 的时候,我们传了两个参数,第一个是事务,第二个是签收,在这篇博客中,我们来说说签收。
签收模式主要有以下4种:
自动签收消息。只要有消息就消费。
对于这种模式,可以理解为快递员自动给你签收了并放到了快递柜中。
手动签收消息。需要在消费端调用一下 TextMessage.acknowledge() 方法才会签收。
对于这种模式,可以理解为见到快递员,开箱验货,再签收。
表示 session 不必确保对传送消息的签收,它可能会引起消息的重复,但是降低了 session 开销,所以消费者需要能容忍重复的消息。在第二次重新传递消息的时候,消息头的 JmsDelivered 会被置为 true 标示当前消息已经传送过一次,消费者需要进行消息的重复处理控制
表示消息生产者提供了消息发送的事务处理方式。也就是指,消息生产者发送消息给 broker 后,broker 只是暂存该消息,只有当生产者给 broker 进行事务确认消息后,broker 才把消息加入到待发送队列中,换言之,如果消息发送者进行了事务回滚,消息会直接从 broker 中删除
第一种前面已经见过了,就不介绍了。后面那两种用得比较少,也不介绍了。我们来看看第二种。
将connection.createSession()的第二个参数改为Session.CLIENT_ACKNOWLEDGE,表示手动签收消息。
然后启动生产者生产1条消息。
接着启动消费者消费消息。
可以看到消费者消费了1条消息。
但是注意看控制台,这时候队列中还存在1条消息等待消息,有0条消息已消费。
这时候如果我们把消费者关了再启动,发现又做了一次消费,也就是重复消费了。
所以需要在消费端调用一下 TextMessage.acknowledge() 方法才会签收,否则会重复消费。
主题模式和队列模式非常类似,只需要将 createQueue 改成 createTopic 就行了。
下面是代码:
public class TopicProducer {
public static void main(String[] args) {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
// 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// 2.使用连接工厂创建一个连接对象
connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.使用连接对象创建会话对象。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用会话对象创建目标对象,这里我们创建的是 topic,也就是一对多模式
Destination destination = session.createTopic("topic-test-01");
// 6.使用会话对象创建生产者对象
producer = session.createProducer(destination);
// 7.使用会话对象创建一个文本消息对象
TextMessage textMessage = session.createTextMessage();
String text = "Hello ActiveMQ - " + new Date().getTime();
// 8.设置消息内容
textMessage.setText(text);
// 9.发送消息
producer.send(textMessage);
System.out.println("已生产消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 10.关闭资源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
主题模式下,ActiveMQ 是不保存消息的,它是无状态的。假如无人订阅,那么生产出来的消息就是一条废消息,所以,一般情况下,需要先启动消费者,再启动生产者。
上述内容可以联系微信里的关注公众号来理解,我关注了公众号之后,公众号会给我推送消息,我没有关注,自然不会收到微信公众号发来的消息。并且,这个推送是自我关注之后,才收到的,之前微信公众号发布过的消息,不会给我补发和补推送。
1.生产者将消息发布到队列中,每个消息只能被一个消费者消费,属于一对一的关系。
2.生产者和消费者不存在时间上的相关性,先启动生产者还是消费者都是一样的。
3.生产者生产时,队列模式默认是会持久化消息的,所以先启动生产者,再启动消费者,消费者还是能消费到消息,也就是说队列中的消息不会丢弃。
1.先生产,然后启动1号消费者,1号消费者能消费到消息吗?
答:可以,因为队列模式默认会持久化数据。
2.先生产,然后启动1号消费者,再启动2号消费者,1号消费者能消费吗?2号消费者能消费吗?
答:1号可以消费,2号不能消费。因为队列模式下的消息只允许一个消费者消费,不允许重复消费。又因为消息已经被1号消费者消费完了,没有待消费的消息,所以2号不能消息到消息。
3.先启动2个消费者,然后再生产消息,消费情况如何?
答:经过多次测试,可以发现,消费者是轮询消费消息的,这样可以让消息平均分配,不至于一个消费者特别忙,一个消费者特别闲,也不会出现抢占资源的情况。
消费者消费消息有两种方式,第一种方式是使用 receive() 方法,这种方式是同步阻塞的,会阻塞等待生产者生产消息。第二种方式是使用 MessageListener,这种方式是异步非阻塞的,当有消息到达的时候,会调用它的 onMessage() 方法。
public class QueueConsumer {
public static void main(String[] args) {
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
// 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// 2.使用连接工厂创建一个连接对象
connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.使用连接对象创建会话对象。
// 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。与生产者保持一致,并且名称也要一致。
Destination destination = session.createQueue("queue-test-01");
// 6.使用会话对象创建消费者对象
consumer = session.createConsumer(destination);
// 7.使用 receive() 方法消费消息,当没有消息的时候会阻塞在这,等待提供者提供消息
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("已接收消息:" + msg.getText());
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 8.关闭资源
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
运行消费者,查看控制台,可以看到消费了 1 条消息,有 0 条消息等待消费。
由于队列中已经存在一条消息了,所以看不到阻塞的效果。我们重新启动生产者,就可以看到阻塞在了 receive() 处。
此时如果没有消息进入队列中,消费者就会一直阻塞着。
如何做到一直接收消息呢,总不能接收到一条消息就关了吧。我们可以使用 while() 循环一直接收消息
while (true) {
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("已接收消息:" + msg.getText());
}
但这种方式毕竟会阻塞当前线程,如果使用这种方式,那么后面的代码也就别执行了。所以我们可以另起一条线程来解决阻塞的问题。
public class QueueConsumer {
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// 2.使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.使用连接对象创建会话对象。
// 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。与生产者保持一致,并且名称也要一致。
Destination destination = session.createQueue("queue-test-01");
// 6.使用会话对象创建消费者对象
MessageConsumer consumer = session.createConsumer(destination);
// 7.使用 receive() 方法消费消息,当没有消息的时候会阻塞,我们可以开一个线程解决阻塞的问题
new Thread("My-ActiveMQ-Thread") {
@Override
public void run() {
try {
while (true) {
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("已接收消息:" + msg.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}.start();
}
}
public class QueueConsumer {
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// 2.使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.使用连接对象创建会话对象。
// 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。与生产者保持一致,并且名称也要一致。
Destination destination = session.createQueue("queue-test-01");
// 6.使用会话对象创建消费者对象
MessageConsumer consumer = session.createConsumer(destination);
// 7.通过监听器的方式来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println("已接收消息:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
ActiveMQ 消息传送有两种模型:
队列模式的特点:
1.生产者将消息发布到队列中,每个消息只能被一个消费者消费,属于一对一的关系。
2.生产者和消费者不存在时间上的相关性,先启动生产者还是消费者都是一样的。
3.生产者生产时,队列模式默认是会持久化消息的,所以先启动生产者,再启动消费者,消费者还是能消费到消息,也就是说队列中的消息不会丢弃。
在 Java 中使用 ActiveMQ,跟使用 JDBC、JPA 一样,都是有套路的。下面我们通过一个 Hello World 级别的例子,来感受一下在 Java 中如何使用 ActiveMQ。
先引入依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
public class QueueProducer {
public static void main(String[] args) {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
// 1.创建连接工厂,需要传入ip和端口号,这里我们使用默认的
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
// 2.使用连接工厂创建一个连接对象
connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.使用连接对象创建会话对象。
// 这里有两个参数:第一个参数是开启事务,第二个参数表示签收模式,后面再详细介绍。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用会话对象创建目标对象,这里我们创建的是 queue,也就是点对点的模式。除此之外还有一对多模式:createTopic()
Destination destination = session.createQueue("queue-test-01");
// 6.使用会话对象创建生产者对象
producer = session.createProducer(destination);
// 7.使用会话对象创建一个文本消息对象
TextMessage textMessage = session.createTextMessage();
String text = "Hello ActiveMQ - " + new Date().getTime();
// 8.设置消息内容
textMessage.setText(text);
// 9.发送消息
producer.send(textMessage);
System.out.println("已发送消息:" + text);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 10.关闭资源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
运行生产者,查看控制台,点击 “Queues”,可以看到生产了 1 条消息,有 1 条消息等待消费。
上面的表头分别是:
从官网 http://activemq.apache.org 可以下载 ActiveMQ,现在最新版本是 5.x,我们下载 Windows 平台的。
ActiveMQ 各个目录的作用:
bin:ActiveMQ的启动脚本activemq.bat,注意分32、64位。
conf:配置文件,重点关注的是activemq.xml、jetty.xml、jetty-realm.properties。在登录ActiveMQ Web控制台需要用户名、密码信息;在JMS CLIENT和ActiveMQ进行何种协议的连接、端口是什么等这些信息都在上面的配置文件中可以体现。
data:ActiveMQ进行消息持久化存放的地方,默认采用的是kahadb,当然我们可以采用leveldb,或者采用JDBC存储到MySQL,或者干脆不使用持久化机制。
webapps:ActiveMQ自带Jetty提供Web管控台。
lib:ActiveMQ为我们提供了分功能的JAR包,当然也提供了activemq-all-5.14.4.jar。
进入 bin/win64 目录,直接运行 activemq.bat,就可以启动 ActiveMQ。
访问首页:http://localhost:8161/index.html
看到这个页面就说明已经成功启动 ActiveMQ 了。
访问控制台:http://localhost:8161/admin/index.jsp
访问控制台需要输入账号和密码,账号和密码配置在 conf 目录下的 jetty-realm.properties 文件里。
端口号配置在 jetty.xml 文件里。
输入账号密码就可以访问控制台了。
至此,ActiveMQ 的安装和启动就完成了。
这是我学习 ActiveMQ 的笔记,里面的知识点和代码都是经过我测试过的,源码可以在这里下载:https://github.com/miansen/activemq-example
5.学习ActiveMQ-队列模式的特点以及消费者消费消息的3种情况
8.学习ActiveMQ-主题模式的特点以及消费者消费消息的3种情况
16.学习ActiveMQ-在SpringMVC中使用ActiveMQ
17.学习ActiveMQ-在SpringBoot中使用ActiveMQ
百度百科的描述:
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
看到 “接口” 两个字,下意识就想到了规范。没错,JMS 和 JDBC、JPA 一样,只是一个规范,也就是只给出了接口,具体的内容由各个厂商去实现。
从上图可以看出,JMS 的组成有4个部分,它们分别是:
MQ 产品有很多种,以下是最常见的 4 种:
我们接下来学习的 ActiveMQ 就是 apache 出品的一个非常流行的消息中间件。
在 HTTP 协议的请求头中,有一个很重要的属性:Content-Type,它的作用是:
作为请求:告诉服务器,客户端发送的数据类型。
作为响应:告诉客户端,服务器响应的数据类型。
客户端或者服务器拿到数据后,通过该数据类型,就可以正确的解析数据。
由于 GET 请求不存在请求体部分,它的参数都是拼接在 URL 尾部,浏览器把数据转换成一个字符串(key1=value1&key2=value2…),然后把这个字符串追加到 URL 后面,用 ? 分割,因此 GET 请求的请求头不需要设置 Content-Type 字段。
所以下面我们所讲的都是针对 POST 请求而言。
Content-Type 属性的值有很多种,下面列举几种最常见的:
这是最常见的数据类型,前端表单默认使用的就是这种类型。
比如有这么一个登录表单:
<form action="login.do" enctype="application/x-www-form-urlencoded" method="post">
<input type="text" name="username" />
<input type="password" name="password" />
<button type="submit">登录</button>
</form>
注意 “enctype” 属性,该属性规定在发送到服务器之前,浏览器应该如何对表单数据进行编码。默认是 “application/x-www-form-urlencoded”。
当我们点击 “登录” 按钮时,浏览器会将所有内容进行编码,拼接成 “key=value” 的格式,也就是键值对的格式。多个键值对之间用 “&” 分割。
当我们提交时,在 Chrome 浏览器中,可以看到请求体的数据格式是这样子的:
可以看到浏览器帮我们拼接成了键值对的格式,key 就是输入框的 name,而 value 就是我们输入的内容。
当服务器接收到数据后,根据 Content-Type 属性获取到数据类型,就能正确的解析数据。
接下来是后台代码:
@RequestMapping(value = "/login.do", method = RequestMethod.POST)
public String login(@RequestParam(value = "username") String username,
@RequestParam(value = "password") String password) {
return "index.jsp";
}
当 Spring MVC 收到客户端发送来的数据时,根据 Content-Type 声明的数据类型,通过使用 HandlerAdapter 配置的 HttpMessageConverters 来解析请求体中的数据,然后绑定到相应的 bean 上,bean 的类型、个数和顺序跟我们方法声明的参数是一样的。这样后台就接收到了数据。
这也是一种很常见的数据类型,这种类型的数据是序列化后的 JSON 字符串。
比如有这么一个登录表单:
<form>
<input type="text" name="username" />
<input type="password" name="password" />
<button type="button">登录</button>
</form>
后台代码如下:
@RequestMapping(value = "/login.do", method = RequestMethod.POST)
@ResponseBody
public String login(@RequestBody User user) {
return "login success.";
}
处理 application/json 编码的数据时,必须使用 @RequestBody 注解,Spring MVC 通过使用 HandlerAdapter 配置的 HttpMessageConverters 来解析请求体中的数据,然后绑定到 @RequestBody 注解的对象上。
AJAX 代码如下:
var username = $("input[name='username']").val();
var password = $("input[name='password']").val();
var data = {
username: username,
password: password
}
$.ajax({
type: "post",
url: "/login.do",
contentType: "application/json",
data: JSON.stringify(data),
success: function(data){
},
error: function(data){
}
});
JQuery 的 AJAX 默认传递的是 application/x-www-form-urlencoded 类型的数据,如果想传递 JSON 类型,则需指定 contentType: application/json。
由于 application/json 类型的数据是序列化后的 JSON 字符串,所以我们也必须手动把 JSON 对象序列化成字符串,我们可通过 JSON.stringify(data) 方法进行序列化。
通过 Chrome 浏览器可以看到请求体的数据格式是这样子的:
在最初的 http 协议中,没有上传文件方面的功能。后来为了支持文件上传,提高二进制文件的传输效率,新增了 multipart/form-data 数据类型。
比如有这么一个登录表单:
<form action="login.do" enctype="multipart/form-data" method="post">
<input type="text" name="username" />
<input type="password" name="password" />
<input type="file" name="avatar">
<button type="submit">登录</button>
</form>
表单的 enctype 属性必须指定为 multipart/form-data,否则不能上传文件。
后台代码如下:
@RequestMapping(value = "/login.do", method = RequestMethod.POST)
public String login(String username, String password, MultipartFile avatar) {
return "index.jsp";
}
MultipartFile 这个类是用来接受前台传过来的文件。
重点是 multipart/form-data 数据类型的请求体,这种数据类型的请求体跟前面两个完全不一样。
通过 Chrome 浏览器我们可以看到请求体的数据格式,请看下图:
首先先看 Content-Type 属性,它的值是:multipart/form-data; boundary=—-WebKitFormBoundaryxJ5HRAtPAwUo1RsG
multipart/form-data 是我们在表单里指定的数据类型,这个是必须的。
那么这个 boundary=—-WebKitFormBoundaryxJ5HRAtPAwUo1RsG 又是什么呢?
boundary 是边界符,作用是用来分割多个表单项和文件。它是浏览器自动帮我们加上的,边界符的值不是固定的,可以随便取,边界符也是必须的。
通过上图,我们来分析一下请求体的数据结构。
首先先看一个公式:
分割符 = "--" + 边界符
结束符 = "--" + 边界符 + "--"
根据上面的公式,我们可以得出分割符和结束符的值:
分割符:------WebKitFormBoundaryxJ5HRAtPAwUo1RsG
结束符:------WebKitFormBoundaryxJ5HRAtPAwUo1RsG--
先分析第一个表单项:
------WebKitFormBoundary8vr5fnZ5TkqDW6kZ
Content-Disposition: form-data; name="username"
zhangsan
第一行是分割符,但是仅仅有分割符是不够的,这样的数据格式还是不对,服务器还是无法解析数据。通过查资料得知,分割符必须以回车符(\r)+换行符(\n)结尾。
也就是说,第一行的内容其实是:分割符 + 回车符 + 换行符。
------WebKitFormBoundaryxJ5HRAtPAwUo1RsG\r\n
接下来看第二行,第二行的作用是告诉服务器对应字段(表单)的相关信息,第二行也必须以 “回车符 + 换行符” 结尾。所以第二行的内容其实是:
Content-Disposition: form-data; name="username"\r\n
接下来看第三行,第三行虽然没有显示任何内容,但第三行的内容其实是 “回车符 + 换行符”,只是不显示而已。
也就是说第三行的内容其实是:
\r\n
接下来看第四行,第四行是我们输入的表单值了,同样必须以 “回车符 + 换行符” 结尾。
也就是说第四行的内容其实是:
zhangsan\r\n
综上所述,一个完整的文本格式的数据结构应该是这样的:
------WebKitFormBoundary8vr5fnZ5TkqDW6kZ\r\nContent-Disposition: form-data; name="username"\r\n\r\nzhangsan\r\n
接下来再分析一下文件的内容:
------WebKitFormBoundary8vr5fnZ5TkqDW6kZ
Content-Disposition: form-data; name="avatar"; filename=""
Content-Type: application/octet-stream
文件只比文本多了一行 “Content-Type: application/octet-stream” 内容,但其实两者的数据结构其实是一样的:
------WebKitFormBoundary8vr5fnZ5TkqDW6kZ\r\nContent-Disposition: form-data; name="avatar"; filename=""\r\nContent-Type: application/octet-stream\r\n\r\n(文件的二进制数据)\r\n
最后一行是结束符,结束符也必须以 “回车符 + 换行符” 结尾。
所以最后一行的内容其实是:
------WebKitFormBoundary8vr5fnZ5TkqDW6kZ--\r\n
综上所述,请求体的完整数据结构如下图: