spring-kafka使消费者动态订阅新增的topic问题

目录
  • 一、前言
  • 二、需求分析
  • 三、测试运行
    • 3.1 启动消费者服务
    • 3.2 新建topic
    • 3.3 等待topic被分配到消费者
    • 3.4 发送第一条消息
    • 3.5 注意事项
  • 总结

一、前言

在Java中使用kafka,方式很多,例如:

  • 直接使用kafka-clients这类原生的API;
  • 也可以使用Spring对其的包装API,即spring-kafka,同其它包装API一样(如JdbcTemplate、RestTemplate、RedisTemplate等等),KafkaTemplate是其生产者核心类,KafkaListener是其消费者核心注解;
  • 也有包装地更加抽象的SpringCloudStream等。

这里讨论的话题是,如何在spring-kafka中,使得一个消费者可以动态订阅新增的topic?

本文不讨论利用SpringCloudConfig或Apollo等分布式配置中心,利用@RefreshScope的方式来达到目的,这种方式有点杀鸡用牛刀,也会增加系统复杂度和维护成本。

我的环境:jdk 1.8,Spring 2.1.3.RELEASE,kafka_2.12-2.3.0单节点。

二、需求分析

上面已经提到,spring-kafka通过 @KafkaListener 的方式配置订阅的topic,最常用的属性可能是 topics,而要实现本文的需求,就要使用另一个属性 topicPattern,查看它的属性说明:

The topic pattern for this listener. 
The entries can be 'topic pattern', a'property-placeholder key' or an 'expression'. 
The framework will create acontainer that subscribes to all topics matching the specified pattern to getdynamically assigned partitions. 
The pattern matching will be performedperiodically against topics existing at the time of check. 
An expression mustbe resolved to the topic pattern (String or Pattern result types are supported).

将其翻译过来:

此侦听器的主题模式。条目可以是“主题模式”,“属性占位符键”或“表达式”。
该框架将创建一个容器,该容器订阅与指定模式匹配的所有主题以获取动态分配的分区。
模式匹配将针对检查时存在的主题【定期执行】。
表达式必须解析为主题模式(支持字符串或模式结果类型)。

注意:从说明信息来看,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。

下面列出消费端的核心测试代码:

@Component
public class SinkConsumer {
    @KafkaListener(topicPattern = "test_topic2.*")
    public void listen2(ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic2.* = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

代码实现很简洁,就是期待我们新增一个符合 topicPattern 的topic后,spring-kafka能否自动为新建的topic分配到此目标消费者。

三、测试运行

3.1 启动消费者服务

配置文件中,spring该配的配,kafka该配的配,接着启动即可。

3.2 新建topic

新建 test_topic2_3,刚创建完不能立刻分配到目标消费者,从 topicPattern 的注释得知spring-kafka会定期扫描topic列表,我们要给它几分钟等待扫描到新topic,并为它成功分配到目标消费者后,再去发送第一条消息(所以可以先去洗个手,此时19:02)。

3.3 等待topic被分配到消费者

洗手期间的控制台日志提示:已为新建的 test_topic2_3 分配到我们的目标消费者,并将offset设置到起始位置0,日志如下:

2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Revoking previously assigned partitions [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] (Re-)joining group
2019-11-15 19:05:15.757  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Attempt to heartbeat failed since group is rebalancing
2019-11-15 19:05:15.761  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Revoking previously assigned partitions [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] (Re-)joining group
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Setting newly assigned partitions [test_topic-0]
2019-11-15 19:05:16.028  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic-0]
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_3-0 to offset 0.
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]

3.4 发送第一条消息

洗手完毕,看到3.3小节里的日志,然后确认成功分配到目标消费者,且offset被设为0之后,发送第一条消息【我是第1个test_topic2_3的消息】,控制台日志打印出此消息信息,代表成功消费:

topic2.* = test_topic2_3, offset = 0, value = {"date":"2019-11-15 19:11:13","msg":"我是第1个test_topic2_3的消息"}

3.5 注意事项

若不等到offset被设为0之后,过早发送消息,则会在消费端丢失过早发送的消息,并且当spring-kafka自动设置offset的时候,日志提示,offset被设置为1,而不是起始位置0:

INFO o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_1-0 to offset 1.

在上面的3.1至3.4的整个过程中,可能会日志警告,代表暂时不能为新增的topic分配到目标消费者:

WARN o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] The following subscribed topics are not assigned to any members: [test_topic2_3]

所以只需等待日志提示可以成功分配到目标消费者,且offset被设为0之后,即可发送第一条消息。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • kafka 消息队列中点对点与发布订阅的区别说明

    目录 背景知识 1.JMS中定义 2.二者分析与区别 2.1 点对点模式 2.2 发布订阅模式 3.流行的消息队列模型比较 3.1 RabbitMQ 3.2 Kafka 背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM

  • spring 整合kafka监听消费的配置过程

    前言 最近项目里有个需求,要消费kafka里的数据.之前也手动写过代码去消费kafka数据.但是转念一想.既然spring提供了消费kafka的方法.就没必要再去重复造轮子.于是尝试使用spring的API. 项目技术背景,使用springMVC,XML配置和注解相互使用.kafka的配置都是使用XML方式. 整合过程 1. 引入spring-kafka的依赖包 <dependency> <groupId>org.springframework.kafka</groupId&

  • Springboot整合kafka的示例代码

    目录 1. 整合kafka 2. 消息发送 2.1 发送类型 2.2 序列化 2.3 分区策略 3. 消息消费 3.1 消息组别 3.2 位移提交 1. 整合kafka 1.引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 2.设置yml文件 spring:

  • spring-kafka使消费者动态订阅新增的topic问题

    目录 一.前言 二.需求分析 三.测试运行 3.1 启动消费者服务 3.2 新建topic 3.3 等待topic被分配到消费者 3.4 发送第一条消息 3.5 注意事项 总结 一.前言 在Java中使用kafka,方式很多,例如: 直接使用kafka-clients这类原生的API: 也可以使用Spring对其的包装API,即spring-kafka,同其它包装API一样(如JdbcTemplate.RestTemplate.RedisTemplate等等),KafkaTemplate是其生产

  • springboot+kafka中@KafkaListener动态指定多个topic问题

    目录 说明 总结一下大家问的最多的一个问题 终极方法 思路 实现 代码 总结 说明 本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener 首先,application.properties中配置用逗号隔开的多个topic. 方法:利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”) 运行程序,console打

  • Docker部署Kafka以及Spring Kafka实现

    这篇文章主要介绍了Docker部署Kafka以及Spring Kafka实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从https://hub.docker.com/查找kafka 第三个活跃并stars数量多 进去看看使用 我们使用docker-compose来构建镜像 查看使用文档中的docker-compose.yml 因为kafka要搭配zookeeper一起使用,所以文档中包含了zookeeper 我修改了一下版本号 以及变量参

  • 详解Spring Kafka中关于Kafka的配置参数

    SpringKafka文档地址:https://docs.spring.io/spring-kafka/reference/htmlsingle kafka文档地址:http://kafka.apache.org/documentation SpringKafka中配置的Java配置实现类:https://github.com/spring-projects/spring-boot/blob/v1.5.4.RELEASE/spring-boot-autoconfigure/src/main/ja

  • Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键.值进行保存. 每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器. 新建一个Maven项目,pom.xml 加入依赖: <dependency> <groupId>org.apache.kafka</gro

  • 基于Nacos实现Spring Cloud Gateway实现动态路由的方法

    简介 该文档主要介绍以Nacos为配置中心,实现Spring Cloud GateWay 实现动态路由的功能.Spring Cloud Gateway启动时候,就将路由配置和规则加载到内存里,无法做到不重启网关就可以动态的对应路由的配置和规则进行增加,修改和删除.通过nacos的配置下发的功能可以实现在不重启网关的情况下,实现动态路由. 集成 Spring Cloud GateWay集成 spring-cloud-starter-gateway:路由转发.请求过滤(权限校验.限流以及监控等) s

  • Spring Security 密码验证动态加盐的验证处理方法

    本文个人博客地址:https://www.leafage.top/posts/detail/21697I2R 最近几天在改造项目,需要将gateway整合security在一起进行认证和鉴权,之前gateway和auth是两个服务,auth是shiro写的一个,一个filter和一个配置,内容很简单,生成token,验证token,没有其他的安全检查,然后让对项目进行重构. 先是要整合gateway和shiro,然而因为gateway是webflux,而shiro-spring是webmvc,所

  • Spring Kafka中如何通过参数配置解决超时问题详解

    目录 背景 思路 过程 步骤一,查询版本特性 步骤二,查源码 步骤三,查自身的代码 总结 背景 这是我们团队负责的一个不太核心的服务.之前与外部交互时应外部要求由普通kafka集群改成加密kafka集群.我们是数据生产端. 改的过程中并跑上线,60%的请求耗时增加了2倍,也还是在百毫秒的量级可以接受.但是每次重启的第一个请求要5s以上,会超过:运行过程中,一两个月也会有一次超时.因为我们有三次重试,整体没有影响成功率. 上线的时候我们问过网络组,还专门请教过公司专业负责kafka的团队.结论是:

  • 利用Spring Cloud Zuul实现动态路由示例代码

    前言 本文主要给大家介绍了关于Spring Cloud Zuul实现动态路由的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. Zuul 是提供动态路由,监控,弹性,安全等的边缘服务.Zuul 相当于是设备和 Netflix 流应用的 Web 网站后端所有请求的前门. Zuul 可以适当的对多个 Amazon Auto Scaling Groups 进行路由请求. 首先新建maven项目,加入如下依赖 <dependencyManagement> <depend

  • Spring + Mybatis 项目实现动态切换数据源实例详解

    项目背景:项目开发中数据库使用了读写分离,所有查询语句走从库,除此之外走主库. 最简单的办法其实就是建两个包,把之前数据源那一套配置copy一份,指向另外的包,但是这样扩展很有限,所有采用下面的办法. 参考了两篇文章如下: http://www.jb51.net/article/111840.htm http://www.jb51.net/article/111842.htm 这两篇文章都对原理进行了分析,下面只写自己的实现过程其他不再叙述. 实现思路是: 第一步,实现动态切换数据源:配置两个D

随机推荐