源码地址:https://github.com/miansen/kafka-example/tree/master/kafka-example-springboot

在 Spring Boot 中使用 Kafka 跟在 Spring MVC 中是差不多的,而且还更简单,不需要各种 xml 配置了,只需要添加注解就可以了。

以下是我写的简单的实例代码,看一下就知道怎么使用了,感觉都是套路。

application.properties

首先是配置文件,这是必不可少的。Spring Boot 会自动加载配置文件的参数,按照你设置的参数初始化好 kafka 相关的 bean,注入到 IOC 容器里。

server.port=8080
spring.kafka.consumer.bootstrap-servers=192.168.197.6:9092
# 确保新的消费者组能获得我们之前发送的消息,为了测试方便(生产配置latest,只获取最新的消息)。
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.bootstrap-servers=192.168.197.6:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 监听的 topic 如果不存在,则不报错
spring.kafka.listener.missing-topics-fatal=false

以上几个参数是比较常用的,但是我们知道 Kafka 客户端提供的参数配置不止这些,那么如何在 Spring Boot 中配置呢?

熟悉 Spring Boot 套路的同学可能就知道了,Spring Boot 一般都会提供一个 xxxProperties 的配置类,你在配置文件里配置的参数都会映射都这个配置类里。

所以直接搜索 KafkaProperties 类查看它有哪些属性就知道 Spring Boot 支持哪些配置参数了。

image

KafkaProperties 类的属性都写了注释,结合 Kafka 官方文档 http://kafka.apachecn.org/documentation.html#producerapi 就可以配置了。

生产者

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String value) {
        sendMessage(topic, null, value);
    }
    
    public void sendMessage(String topic, String key, String value) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, value);
        future.addCallback(success -> {
            RecordMetadata metadata = success.getRecordMetadata();
            System.out.println("生产者发送消息成功。topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
        }, failure -> {
            System.out.println("生产者发送消息失败,原因:" + failure.getMessage());
        });
    }
    
}

还记得在 Spring MVC 里,KafkaTemplate 是需要我们手动配置的吗,在 Spring Boot 里是自动配置的,可以直接拿来使用。

消费者

还记得在 Spring MVC 中是怎么配置消费者的吗?首先配置监听类,然后配置监听信息,最后再配置监听容器。

在 Spring Boot 中只需要配置 @KafkaListener 注解就可以了。

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = {"test01"}, groupId = "group01")
    public void onMessage1(ConsumerRecord<String, String> record) {
        System.out.println(String.format("[group01-消费者1]收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
    
    @KafkaListener(topics = {"test01"}, groupId = "group01")
    public void onMessage2(ConsumerRecord<String, String> record) {
        System.out.println(String.format("[group01-消费者2]收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
    
    @KafkaListener(topics = {"test01"}, groupId = "group02")
    public void onMessage3(ConsumerRecord<String, String> record) {
        System.out.println(String.format("[group02-消费者3]收到了消息。topic: %s, partition: %s, offset: %s, key: %s, value: %s",
                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
    }
}

我配置了 3 个消费者,它们都消费同一个主题。其中前两个消费者位于同一个消费者组下,第 3 个消费者位于另一个消费者组下。

测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private KafkaProducerService producerService;

    @Test
    public void sendMessageTest() throws Exception {
        for (int i = 0; i < 10; i++) {
            producerService.sendMessage("test01", Integer.toString(i), "hello kafka-" + i + "-" + new Date().getTime());
        }
        System.in.read(); // 让 main 线程阻塞一下,否则消费者线程可能会来不及消费就死了
    }
    
}

运行测试类,可以看到有三条消费者线程在监听消息,分别对应 KafkaConsumerService 类的 3 个 @KafkaListener 注解方法。

image

控制台输出如下:

生产者发送消息成功。topic: test01, partition: 2, offset: 773
生产者发送消息成功。topic: test01, partition: 0, offset: 922
生产者发送消息成功。topic: test01, partition: 2, offset: 774
生产者发送消息成功。topic: test01, partition: 2, offset: 775
生产者发送消息成功。topic: test01, partition: 1, offset: 740
生产者发送消息成功。topic: test01, partition: 0, offset: 923
生产者发送消息成功。topic: test01, partition: 1, offset: 741
生产者发送消息成功。topic: test01, partition: 0, offset: 924
生产者发送消息成功。topic: test01, partition: 0, offset: 925
生产者发送消息成功。topic: test01, partition: 2, offset: 776
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 773, key: 0, value: hello kafka-0-1593143698305
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 773, key: 0, value: hello kafka-0-1593143698305
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 922, key: 1, value: hello kafka-1-1593143713542
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 774, key: 2, value: hello kafka-2-1593143744094
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 775, key: 3, value: hello kafka-3-1593143746141
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 922, key: 1, value: hello kafka-1-1593143713542
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 774, key: 2, value: hello kafka-2-1593143744094
[group01-消费者2]收到了消息。topic: test01, partition: 1, offset: 740, key: 4, value: hello kafka-4-1593143746861
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 923, key: 5, value: hello kafka-5-1593143746893
[group01-消费者2]收到了消息。topic: test01, partition: 1, offset: 741, key: 6, value: hello kafka-6-1593143746964
[group01-消费者1]收到了消息。topic: test01, partition: 2, offset: 776, key: 9, value: hello kafka-9-1593143747124
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 924, key: 7, value: hello kafka-7-1593143747027
[group01-消费者2]收到了消息。topic: test01, partition: 0, offset: 925, key: 8, value: hello kafka-8-1593143747063
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 923, key: 5, value: hello kafka-5-1593143746893
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 924, key: 7, value: hello kafka-7-1593143747027
[group02-消费者3]收到了消息。topic: test01, partition: 0, offset: 925, key: 8, value: hello kafka-8-1593143747063
[group02-消费者3]收到了消息。topic: test01, partition: 1, offset: 740, key: 4, value: hello kafka-4-1593143746861
[group02-消费者3]收到了消息。topic: test01, partition: 1, offset: 741, key: 6, value: hello kafka-6-1593143746964
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 775, key: 3, value: hello kafka-3-1593143746141
[group02-消费者3]收到了消息。topic: test01, partition: 2, offset: 776, key: 9, value: hello kafka-9-1593143747124

可以看到生产者生产了 10 条消息,均匀的分布在 3 个分区里。

“消费者1” 只消费了 2 号分区,而”消费者2” 消费了 0 号和 1 号分区。由于这两个消费者位于同一个消费者组下,所以不会重复消费同一个分区。

而 “消费者3” 位于不同的消费者组下,其它组对它不影响,跟它没关系。并且这个组只有它一个消费者,所以 “消费者3” 可以大饱口福,消费所有的分区。

原文链接:https://miansen.wang/2020/05/23/kafka-springboot/