Kafka Broker

①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳✕✓✔✖

1、安装

新的版本不需要 zookeeper 了,为了便于维护,可以安装一个 kafka-ui。

  • kafka 有那些好用的 UI?

1.1 单例模式

这种模式可以用做开发,不消耗机器的性能。

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
restart: always
depends_on:
- kafka1
ports:
- 9091:8080
volumes:
- ${DATA_PATH}/kafka_ui/data:/etc/localtime
- /etc/localtime:/etc/localtime:ro
environment:
# 集群名称
- KAFKA_CLUSTERS_0_NAME=local
# 集群地址
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092
kafka1:
image: 'bitnami/kafka'
user: root
ports:
- 9192:9092
- 9193:9093
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- ${DATA_PATH}/kafka1/data:/bitnami/kafka
- /etc/localtime:/etc/localtime:ro

安装完毕后,输入:http://localhost:9091/,可以看到控制页面。

1.2 集群模式

参考了 github 上 docker 作者的文章

version: '2'
services:
kafka-0:
image: docker.io/bitnami/kafka:testing
ports:
- '9092'
- '9093'
environment:
- BRDEBUG=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_BROKER_ID=0
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- kafka_0_data:/bitnami/kafka
kafka-1:
image: docker.io/bitnami/kafka:testing
ports:
- '9092'
- '9093'
environment:
- BRDEBUG=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_BROKER_ID=1
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- kafka_1_data:/bitnami/kafka
kafka-2:
image: docker.io/bitnami/kafka:testing
ports:
- '9092'
- '9093'
environment:
- BRDEBUG=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
- KAFKA_CFG_BROKER_ID=2
- ALLOW_PLAINTEXT_LISTENER=yes
volumes:
- kafka_2_data:/bitnami/kafka
volumes:
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local

可以模拟对集群的操作

root@kafka-0:/# /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic mytopic --partitions 3 --replication-factor 3
Created topic "mytopic".
root@kafka-0:/# /opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic mytopic
Topic:mytopic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: mytopic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: mytopic Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: mytopic Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

1.3 参数配置

容器会在 /bitnami/kafka/config/ and /bitnami/kafka/config/kraft 目录中,查找 (server.properties, log4j.properties, etc.)配置文件。

step1:可以通过镜像的 volumns 来配置

...
services:
kafka:
...
volumes:
- 'kafka_data:/bitnami'
+ - /path/to/server.properties:/bitnami/kafka/config/server.properties

step2:修改配置文件

vi /path/to/server.properties

step3:重启 kafka

docker-compose restart kafka

2、快速入门

2.1 进入容器

docker-compose exec kafka1 bash
cd /opt/bitnami/kafka

2.2 创建主题以存储事件

https://kafka.apache.org/quickstart

Kafka 是一个分布式事件流平台,可让您读取、写入、存储和处理事件(在文档中也称为记录或消息) 跨多台机器。

示例事件包括付款交易、手机的地理位置更新、运输订单、传感器测量 来自物联网设备或医疗设备等等。这些事件在 topics 中组织和存储。 非常简化,topics 类似于文件系统中的文件夹,事件是该文件夹中的文件。

创建

bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --create

查看主题

# 查看topic列表
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查看topic列表详情
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# 查看消费者组
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

修改 topic

# 修改分区,扩分区,不能减少分区
bin/kafka-topics.sh --alter --topic quickstart-events --partitions 3 --bootstrap-server localhost:9092
# 修改过期时间,下面两行都可以
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic quickstart-events --add-config retention.ms=86400000
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-name quickstart-events --entity-type topics --add-config retention.ms=86400000
# 删除topic 看看就可以了,别真删除了。
bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

2.3 将一些事件写入主题(生成者)

Kafka 客户端通过网络与 Kafka 代理通信,用于写入(或读取)事件。 收到后,代理将以持久和容错的方式存储事件,只要您需要——甚至永远存储。

运行控制台创建者客户端,将一些事件写入主题。 默认情况下,您输入的每一行都会导致将单独的事件写入主题。

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic quickstart-events
my first event
my second event

您可以随时停止创建者客户端。Ctrl-C

2.4 阅读事件(消费者)

打开另一个终端会话并运行控制台使用者客户端以读取刚刚创建的事件:

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
my first event
my second event

您可以随时停止使用者客户端。Ctrl-C

随意尝试:例如,切换回您的生产者终端(上一步)进行写入 其他事件,并查看事件如何立即显示在您的消费者终端中。

由于事件持久存储在 Kafka 中,因此可以根据需要多次读取它们,并被任意数量的使用者读取。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松验证这一点。

#指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --partition 0 --offset 100

消费组

消费组(Consumer Group)是一组独立消费者的集合,它们共同消费一个或多个 Topic 中的数据。消费组内的消费者协同工作,通过分摊该 Topic 中的所有分区,以实现消息的消费和处理。

消费组在 Kafka 消息队列中起到了至关重要的作用。它可以提供如下功能:

  • 并发消费:消费组内的每个消费者都可以独立地消费消息,可以实现高并发处理。
  • 自动负载均衡:消费组内的消费者会自动协作,将消费任务均分到所有消费者上,使得每个消费者都能处理相同数量的消息。
  • 提高可用性:当消费组内的一个或多个消费者故障退出时,消息会自动分配到其他消费者上,保证消费任务的不间断执行。
  • 支持多租户:可以通过 Consumer Group 来对不同的租户进行消息隔离,不同的 Consumer Group 可以读取同一个 Topic 的不同副本,或者读取不同 Topic 的不同分区,实现多个实例共享同一 Topic 或分散处理不同 Topic。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --group mygroup

通常情况下,消费组中的消费者都运行在不同的机器上,这样就可以实现分布式消费,以提高消息处理性能和可用性。Kafka 对消费组的实现也非常简单,通过在消费者在订阅 Topic 时,接受一个 Group ID 参数,就可以自动加入到一个消费组中。Kafka 会将 Group ID 相同的消费者映射到同一个 Consumer Group 中,以实现协同消费和分摊消费任务的目的。

查看数据积压

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup

kafka 数据积压处理方法

在 Kafka 中,由于消息的生产和消费速度可能不一致,导致消息会积压在 Kafka 的分区中,如果这些积压的消息处理不及时,会导致 Kafka 系统的性能下降和可用性降低等问题。因此,需要采取一些处理方法来解决数据积压问题:

增加消费者:增加消费者可以使消费任务并行执行,加快消息的处理速度。可以通过增加消费者的方式将积压的消息消费掉,提高系统处理速度和效率。

调整消费者组:当一个消费组中的消费者无法处理所有的消息时,可以考虑调整消费者组。可以增加消费者的数量或者更换消费者组,以适应消息处理的速度和大小。

调整消息分区:Kafka 中 Topic 的分区数也会影响数据积压的情况。可以调整分区数以改善数据读取和分发的情况,或者对热点 Topic 进行分区处理,以实现更好的性能和可用性。

调整消费 offset:若积压的消息都已经被处理过了,却还在 Kafka 中存在,可能是消费者消费 offset 设置错误导致的。可以通过 Kafka 的 offset 操作,重置消费 offset,跳过已经处理过的消息,减少数据积压的问题。

执行消息清洗:在消费 Kafka 消息时,可以额外执行一些消息清洗处理操作,将无用的数据过滤出去,或者将数据进行清理和格式化处理,减少中间处理环节,提高数据消费的效率和可用性。

2.5 使用 CONNECT

使用 KAFKA CONNECT 将数据导入/导出为事件流。

您可能在现有系统(如关系数据库或传统消息传递系统)中拥有大量数据, 以及许多已经使用这些系统的应用程序。Kafka Connect 允许您持续摄取 数据从外部系统进入 KAFKA,反之亦然。

它是运行连接器的可扩展工具,连接器实现用于与外部系统交互的自定义逻辑。 因此,将现有系统与 Kafka 集成非常容易。为了使此过程更加容易, 有数百种这样的连接器随时可用。

在本快速入门中,我们将了解如何使用导入数据的简单连接器运行 Kafka Connect 从文件到 Kafka 主题,并将数据从 Kafka 主题导出到文件。

① 背景说明

kafka-connect 有两个核心概念:Source 和 Sink。Source:负责导入数据到 kafka,Sink 负责从 kafka 导出数据,它们统称 Connector,即连接器。

还有两个重要概念:Task 和 Worker,每一个 Connector 都会协调一系列的 task 去执行任务,Connector 把一项工作任务分割成许多的 task,然后把 task 分发到各个 worker 进程中去执行。task 不保存自己的状态信息,而是交给特定的 kafka 主题去保存。

kafka-connect 提供了以下特性:

  • 通用性:规范化其他数据系统与 kafka 的继集成,简化了连接器的开发、部署和管理
  • 支持独立模式(standalone)分布式模式(distributed)
  • REST 接口:使用 Rest API 提交和管理 Connector
  • 自动位移管理:自动管理位移的提交,不需要开发人员干预,降低了开发成本
  • 分布式和可扩展性:Kafka Connect 基于现有的组管理协议来实现扩展 Kafka Connect 集群
  • 流式计算和批处理的集成

kafka 中通过connect-standalone.sh 和 connect-distributed.sh 命令来实现独立模式和分布式模式运行的 Kafka Connect,可以在 kafka 的/bin 目录下看到:

在独立模式中,所有操作都是在一个进程中完成的,它比较适合测试和功能验证的场景,但是无法充分利用 kafka 自身所提供的负载均衡和高容错特性。

下面来演示一下使用独立模式将一个文件中的内容导入到 kafka 中。

配置文件说明

connect-standalone.properties:用于 Work 进程运行的配置文件

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=libs/connect-file-3.4.0.jar

connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

file:该连接器数据源文件路径

topic:设置连接器将数据导入哪个主题,如果该主题不存在则会自动创建,当然也可以自己提前创建好(推荐)

connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

② 修改配置文件

这里主要修改connect-standalone.properties,修改plugin.path

没有 vim 可以安装

apt update
apt install vim

修改connect-standalone.properties

cd /opt/bitnami/kafka/config
more connect-standalone.properties
vim connect-standalone.properties
# 添加下面内容:plugin.path=libs/connect-file-3.4.0.jar

③ 启动独立模式

  • 有三个参数
    • 第一个始终是 Kafka Connect 的 Task 配置。例如要连接到的 Kafka 代理和数据序列化格式。
    • 其他两个是要创建的连接器。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • Kafka 附带的这些示例配置文件会创建两个连接器:
    • 第一个是源连接器,用于从输入文件中读取行并将每个行生成到 Kafka 主题
    • 第二个是接收器连接器,它从 Kafka 主题读取消息,并将每个消息生成为输出文件中的一行。

在启动期间,你将看到许多日志消息,包括一些指示连接器正在实例化的消息。

④ 验证结果

查看输出文件

Kafka 连接过程启动后,可以验证结果,打开test.sink.txt文件

> more test.sink.txt
foo
bar

在 kafka-ui 上看

注意数据存储在 Kafka 主题 ,所以我们也可以运行一个控制台消费者来查看 主题中的数据(或使用自定义使用者代码进行处理):connect-test

使用 kafka 消费者看

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

继续添加数据

连接器继续处理数据,因此我们可以将数据添加到文件中,并查看它在管道中移动:

> echo Another line>> test.txt

应会看到该行出现在控制台使用者输出和接收器文件中。

2.6 使用 Kafka Streams

https://gitee.com/xiaoyu-learning/kafka-demo.git

Kakfa 推出 Stream API 后,支持了事件时间,窗口,聚合函数等等常用流处理中常用功能,流数据处理方面基本已经向 Flink 靠齐了,下面是一些 Kafka 和 Flink 之间最重要的区别。

对比项FlinkKafka Stream API
部署方式Flink 是一个集群框架,因此应用程序的部署由该框架负责Streams API 是一个任何标准 Java 应用程序都可以嵌入的库,因此不会改变你的部署方式,你基本上可以使用任意方式来部署应用程序。
代码生命周期流处理代码在 Flink 集群中作为作业部署和运行用户的流处理代码嵌入在他们的应用程序中运行
作业由谁负责管理一般由数据基础设施或 BI 团队一般由业务线团队
分布式协调Flink Master(JobManager),流式程序的一部分利用 Kafka 集群进行协调、负载平衡和容错。
流数据来源Kafka、文件系统、其他消息队列等等严格的 Kafka,Kafka 中的 Connect API 服务于解决数据进出 Kafka 的问题
流数据写入Kafka、其他 MQ、文件系统、数据库、K/V 存储、流处理器状态和其他外部系统Kafka、应用程序状态、操作数据库或任何外部系统
有界和无界数据流无界与有界无界
语义保证Flink 内部状态恰好一次;使用选定的输入源和输出源时 exactly once(例如,Kafka 到 Flink 到 HDFS);当 Kafka 为输出源时,at least once。将来很可能与 Kafka 端对端 Exactly-once端到端都是 Kafka,Exactly-once

实时上 Flink 和 Kafka Stream 二者最核心的区别在于 Flink 和 Kafka Stream 的部署和管理模式,以及如何协调分布式处理(包括容错)。

部署和管理模式区别

Flink程序作为独立的作业进行部署和升级,Flink 作业可以自行启动和停止,从所有权的角度看,Flink 作业的运维通常由拥有框架运行的集群的团队负责,例如数据基础设施团队。

Kafka Stream API是一个标准的库,因此应用程序的生命周期由应用开发人员负责。Streams API 并未规定应如何配置、监控或部署应用程序,以及如何与公司现有的打包、部署、监控和运营工具无缝集成。从所有权的角度来看,Streams 应用程序通常由各自的产品团队负责。

分布式协调区别

Flink 有一个专门的主节点进行协调,而 Streams API 通过 Kafka 的消费者组协议依赖 Kafka broker 进行分布式协调和容错。

Flink 的容错、扩展、状态的同步都是由主节点(JobManager)进行全局协调的。主节点基于 ZooKeeper 实现了自己的高可用机制。这种方法可以帮助 Flink 获得高吞吐量,以及 exactly once 语义保证,以及 Flink 的 checkPoint 功能,并且它支持 Flink 的 exactly-once 的输出源(例如 HDFS 和 Cassandra,但不包含 Kafka)。

Kafka Streams API 利用 Kafka 提供容错、保证连续处理和高可用性。用户的应用程序的每个实例都独立运行,所有协调由 kafka broker 完成。容错是内置在 Kafka 协议中的,允许轻量级的集成到用户应用程序中。

总结

Kafka Streams API 使流处理可作为应用程序的一部分运营,由于 Kafka Stream API 与 Kafka 中的核心概念紧密集成,应用程序可以利用并受益于 Kafka 的核心能力——性能、可扩展性、安全性、可靠性 以及端到端恰好一次。

Flink 非常适合能够部署在独立集群中的应用程序,并能让应用程序得到吞吐量,延迟、事件时间语义、保存点和操作特性、应用程序状态的一次性保证、端到端的一次性保证和批处理等功能

可以先选择 Flink

3、开发 Produce

3.1 基本调用方法

配置环境,gradle 引用

dependencies {
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.4.0'
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '2.0.7'
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
}
  • 简单用例
    • 异步发送:CustomProducer.java
    • 异步发送回调函数:CustomProducerCallback.java
    • 同步发送:CustomProducerSync.java
  • 发送分区规则:如果指定分区,那么发送到分区。如果没有指定分区,那么按照 Key 的 Hash 求余数得到分区号。
    • 例子: 如果让统一表的数据,都发送到一个分区中?用表名作为 key
    • 具体可以看RoundRobinPartitioner.java

3.2 自定义分区器

根据传输的内容,自定义要发往那个分区。

应用场景:

  • 根据数据内容,把数据发送到指定分区。
  • 一些脏数据,发送到某个分区。

具体操作如下:

  • 定义一个分区器:CustomPartitioner

    • public class CustomPartitioner implements Partitioner
  • 关联 Producer

    • properties.setProperty(ProducerConfig.*PARTITIONER_CLASS_CONFIG*,CustomPartitioner.class.getName());

执行完毕,可以看到数据被写入了分区 1.

3.3 提高生产者吞吐量

  • 优化批次大小与等待时间**(重点)**
    • batch.size 批次大小,默认 16K,可以根据实际情况修改成 32K
    • linger.ms 等待时间,默认 0,可以根据实际情况,修改成 5-100ms
  • 采用压缩**(重点)**
    • compression.type: 压缩 snappy
  • 缓冲区大小
    • RecordAccumulator

可以参考程序CustomProducerParameter.java

//缓存区大小
properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, String.valueOf(33554432));
//批次大小
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,String.valueOf(16384));
//间隔时间
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(1));
//压缩
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

3.4 提高数据可靠性

  • acks 参数
    • 0 : 不需要应答,(实际情况用的比较少),效率高
    • 1:Leader 应答,在 Leader 挂掉的情况下,有丢失数据的风险。效率中等
    • -1:所有 Leader+Follower 都应答了。效率低
      • 但是如果某个 Follower 挂了,就会堵塞发送。Kafka 会等待 30s Follower 无反映,就会把这个 Follwer 提掉。
  • acks=0 基本不用
  • acks=1 表示日志相关,允许丢失数据。
  • acks=-1 一般与金钱相关,要求绝对可靠。
    • 必须按照下面设置:acks=-1;分区副本书>=2;ISR 应答的最小副本数量>=2;

acks=-1 数据重复

只有当 Leader 挂了,Follower 还没有数据同步玩。选出新的 Leader 时,会重新发送数据。概率很小,但是也要解决。

具体实现程序看CustomProducerAcks.java

//acks,默认是-1
properties.setProperty(ProducerConfig.ACKS_CONFIG, String.valueOf(1));
//重试参数,默认是int最大值
properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(3));

3.5 解决数据重复

  • 最少一次:acks=-1,副本>=2,ISR 最小副本数数量》=2
    • ISR=所有存活并正常的 leader+flower 数。
  • 最多一次:ack=0
  • 幂等性:无论发多少条,Broker 只会持久化一次。
    • 《PID,Partition,SeqNumber》,PID 是 kafka 每次重启会话都会分配一个。SeqNumber 是单调递增的。所以幂等性只会保证单分区单会话不重复。

如何精确保证一次?幂等性+最少一次

enable.idempotence=true,默认就是幂等性

3.6 添加事务

事务协调器

实际代码中很简单

public class CustomProducerTransaction {
public static void main(String[] args){
//1. 创建kafka生产对象
Properties properties=CustomProducer.getBaseProperties();
// 指定一个事务ID,这个ID不重复就可以了,可以自动得到一个雪花ID
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"123456");
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(properties);
//1.1初始化事务
kafkaProducer.initTransactions();
//1.2开启事务
kafkaProducer.beginTransaction();
try{
//2. 发送数据
for(int i=0;i<5;i++){
kafkaProducer.send(new ProducerRecord<>("quickstart-events","1CustomProducerTransaction"+i));
}
//设置一个错误,
//int i=2/0;
//1.3提交事务
kafkaProducer.commitTransaction();
}catch (Exception e){
//1.4回撤事务
kafkaProducer.abortTransaction();
}finally {
//3. 关闭资源
kafkaProducer.close();
System.out.println("--------------------");
}
}
}

3.7 让数据有序

在开启幂等性后max.inn.flight.requests.per.connection<=5都可以让数据有序。

3.8 关闭自动创建主题

担心菜鸟在服务器上创建好多主题。

4、开发消费者

4.1 基本概念

  • 消费者组中消费者数量=Topic 分区数量,那么刚好一个分区对应一个消费者。

消费者初始化流程

消费者消费流程

4.2 基本功能

  • 订阅一个 topic 中的信息
  • 订阅一个 topic 中指定 partition 中的信息
  • 通过一个 group 消费组,订阅一个订阅一个 topic 中的信息,组最好与分区相同。
    • 发送一个 3W 的 128K,只用 0.5 秒。

4.3 提交 offset

自动提交 offset

kafka 会自动提交

  • enable.auto.commit:默认 true
  • aaauto.commit.interaval.ms: 默认 5s,自动提交间隔。

例如:可以修改自动提交参数,修改成 3s。下面是给的一个代码

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,3000);

手动提交 offset

提交方法

  • 同步提交:commitSync 必须等到提交 offset 完毕后,才读取下一批数据。
  • 异步提交:commitAsync

同步提交案例

  • 把自动提交关闭

    • properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
  • 手工撰写提交函数

    • while (true) {
      ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
      for (ConsumerRecord<String,String> consumerRecord:consumerRecords){
      System.out.println(consumerRecord);
      }
      kafkaConsumer.commitSync();
      }

异步提交案例(再实际情况中,这种用的很多,为了提高效率)

  • 与同步的区别就是改成kafkaConsumer.commitAsync();

4.4 指定 offset 消费

auto.offset.reset=earliest|latest|none,默认是latest

  • earliest: 最早的偏移量,--from-beginning
  • latest:自动将偏移量设置为最新偏移量
  • none:如果找不到,就报错。

以上是特殊情况,如果想指定位置呢? 按照下面的代码执行,但是要等一段时间,才能看到结果

//指定位置,首先要拿到所有的分区,然后从分区信息中指定offset
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//担心没有拿到分区方案,这里又等待了一下。
while (assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment=kafkaConsumer.assignment();
}
for(TopicPartition topicPartition:assignment){
kafkaConsumer.seek(topicPartition,19470);
}

4.5 指定时间消费

如果发现近来几小时有问题,就按照时间重新消费一下。 消费端启动到开始读取数据,大概有 1 秒钟的时间,所以要耐心等待。

public class CustomConsumerSeekByTime {
public static void main(String[] args) throws ParseException {
//0 配置
Properties properties = CustomConsumer.getBaseProperties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ConsumerSeekByTime");
//1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅一个主题
Collection<String> topics=new ArrayList<>();
topics.add("quickstart-events");
kafkaConsumer.subscribe(topics);
//指定位置,首先要拿到所有的分区,然后从分区信息中指定offset
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//担心没有拿到分区方案,这里又等待了一下。
while (assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment=kafkaConsumer.assignment();
}
SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date dateTime = sdf1.parse("2023-6-4 15:20:24");
//把时间抓换成offset
Map<TopicPartition, Long> timestampsToSearch=new HashMap<>();
// 填充timestampsToSearch
for (TopicPartition topicPartition:assignment){
timestampsToSearch.put(topicPartition,dateTime.getTime());
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap
= kafkaConsumer.offsetsForTimes(timestampsToSearch);
for(TopicPartition topicPartition:assignment){
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}
//3 消费数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> consumerRecord:consumerRecords){
System.out.println(consumerRecord);
}
}
}
}

4.6 漏消费与重复消费

  • 漏消费:已经消费了数据,但是没有提交 offset
  • 重复消费:提交了 offset,但是没有消费成功。

怎么解决上面的问题,需要添加事务处理。

4.7 消费者事务

前提是后面的处理,支持事务处理,例如 MySql。

4.8 解决数据积压

消费者如何提高吞吐量?

  • 增加分区,同时增加消费者。
  • 批次大小从 500 条增加到 1000 条。同时根据 1000 条的限制,适当增加缓存空间的大小。

5、硬件选择

物联网端

  • 有 3w 台设备,每台每秒发 1 条,会连续不断发送 8 小时。

  • 30000x8x60x60=864,000,000 条数据=8.64 亿条数据

  • 1 条大小 128 字节。

  • 每秒中发送 24.4MB 数据,8 小时发送数据 702,720MB =686.25G 数据

  • 按照 365 天,那么应该存储 244.6T 数据

服务器选择

  • 每秒 3w 条,1 条大小 128 字节,每秒 24.4MB
  • 服务器台数=2 x (每秒峰值 x 副本数 x )/100 +1
    • 2 x 24.4 x 2 / 100 + 1 =3

磁盘选择

  • 顺序读写,机械硬盘
  • 大小:(每天 x 副本数 x 保存几天)/ 0.7
    • (686 x 2 x 7) / 0.7 =13720G = 13T

内存选择

  • 内存=堆内存 + 页缓存。 正常系统一般在 10G-15G
  • 堆内存:默认 2G ,
    • 查看默认值:kafka-server-start.sh 中有,默认 1G
    • 查看 GC:jstat -gc kafka进程号 ls 10,看 YGC 的数量,如果几十次就不用修改了。
    • 查看内存使用效率:jmap -heap 2321 G1 Heap 中看比例
  • 页缓存 segment(1G)
    • (分区数 x segment x 25% )/3

CPU 选择

非常占用线程,建议选择一个 32CPU (IO=12,副本 4,数据传输 12) 所以要修改下面的默认值

  • io =8 写磁盘,这个要占总参数的 50%
  • 副本=1 50%的 1/3
  • 数据传输=3 50%的 2/3

网络带宽选择

  • 24.4MB * 8 就可以了。

6、服务器运维

6.1 集群安装

至少有 3 个节点,副本至少>=2

6.2 新增节点与退役节点

假设有 broke={0,1,2},现在要添加 3

新增节点

如何把老节点的数据,负载均衡到新节点?

  • 创建一个 json 脚本
  • 执行负载均衡计划:kafka-reassign-partitions.sh --broker-list "0,1,2,3" --generate
  • 得到新脚本,执行计划,kafka-reassign-partitions.sh --execute
  • 验证执行计划 --verify

退役节点

  • 创建一个 json 脚本
  • 执行负载均衡计划:kafka-reassign-partitions.sh --broker-list "0,1,2" --generate
  • 得到新脚本,执行计划,kafka-reassign-partitions.sh --execute
  • 验证执行计划 kafka-reassign-partitions.sh --verify
  • 执行 bin/kafka-server-stop

6.3 手工分配副本存储

副本的好处是为了提高可靠性,生产环境,一般做两个副本。

有 4 个分区,2 个副本,想让副本放在前两个分区上

6.4 Leader 自动平衡

如果 leader 分配在某几个机器上,那么会引发机器的不平衡。 系统默认就能把 leader 平衡了。

  • 在实际项目中,不建议频繁出发这个操作,因为会浪费资源。
  • 副本有 leader 与 follower,生产者与消费者只与 leader 关联。
  • controller 负责选举副本的 leader

6.5 增加副本

原先副本是 1,后来发现比较重要,想增加到 2。应该怎么做?

6.6 文件存储与清除策略

  • 超期默认时间:7 天。
    • log.retention.hours =
  • 超期后处理方法
    • 压缩 compact 只保留 key 的最后一个数值,其他的删除,并不是真正的压缩。
      • 应用场景:只有在最新状态下才使用
    • 删除 delete

6.7 kafka 高效写的诀窍

  • 分布,分区,并行度高
  • 顺序写磁盘
  • 稀疏索引
  • 页缓存与零拷贝:把工作都交给生产者与消费者。

6.8 数据备份

6.9 模拟灾难恢复

7、Kafka-Eagle

https://www.kafka-eagle.org/ ,可以观察一下新的版本,是否支持 kraft 模式

8.1 生产者

KafkaSink 可将数据流写入一个或多个 Kafka topic。

public class KafkaFlinkProducer {
public static void main(String[] args) throws Exception {
//1.获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
//2.准备数据源
//模拟一个集合作为数据源
ArrayList<String> wordList = new ArrayList<>();
for(int i=0;i<100;i++){
wordList.add("test:"+i);
}
DataStreamSource<String> stream=env.fromCollection(wordList);
//创建kafka sink
KafkaSink<String> kafkaSink=KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("quickstart-events")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
//.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
//3.添加数据源
stream.sinkTo(kafkaSink)
.name("myKafkaSink")
.setParallelism(1)
;
env.execute("myJob");
}
}

8.2 消费者

Kafka Source 提供了构建类来创建 KafkaSource 的实例。以下代码片段展示了如何构建 KafkaSource 来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:

为了显示队列中的内容,可以手工操作SingleOutputStreamOperator,也可以使用PrintSink

public class KafkaFlinkConsumer {
public static void main(String[] args) throws Exception {
//1.获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
//2.创建消费者
KafkaSource<String> source=KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("quickstart-events")
.setGroupId("my-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
//.setStartingOffsets(OffsetsInitializer.earliest())
.build();
//3.关联消费者与flink流
DataStreamSource<String> dataStreamSource= env.fromSource(source
, WatermarkStrategy.noWatermarks()
,"Kafka Source"
, TypeInformation.of(String.class)
);
//4.执行
// SingleOutputStreamOperator<String> myCounter = dataStreamSource.process(new ProcessFunction<>() {
// @Override
// public void processElement(String s, Context context, Collector<String> collector) {
// System.out.println(s);
// }
// });
PrintSink<String> printSink = new PrintSink<>(true);
dataStreamSource.sinkTo(printSink)
.name("printSink")
.setParallelism(3)
;
env.execute("Flink DataSource");
}
}

9、SpringBoot 与 Kafkas

9.1 生产者

配置文件如下

spring:
kafka:
bootstrap-servers: localhost:9092

代码如下:

@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public String send(String value){
assert value!=null;
CompletableFuture<SendResult<String,String>> completableFuture=
kafkaTemplate.send("quickstart-events",value);
completableFuture.thenAccept(result->{
System.out.println("send ok:"+value);
});
completableFuture.exceptionally(e->{
System.out.println(e.toString());
return null;
});
return "ok";
}
}

9.2 消费者

其中group-id一定要配置。

spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: 'springGroup'

代码

@Configuration
public class KafkaConsumer {
@KafkaListener(topics = "quickstart-events")
public void consumer(String value){
System.out.println("收到kafka:"+value);
}
}

10、压力测试

kafka 自带的有生产与消费测试脚本,可以执行一下就可以了。

测试完毕后,就可以模拟调优了。

10.1 使用 kafka 压测脚本

生产者压力测试

创建一个测试的 test topic,设置三个分区,三个副本。

bin/kafka-topics.sh --bootstrap-server localhost:9002 --create --topic test --rtitions 3 --replication-factor 3

进行测试

bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-recorder 1000000 --throughput 30000 --producer-props bootstrap.servers=localhost:9092 batch.size=16384 linger.ms=0

参数说明:

  • record-size:一条信息多达,单位是字节,这里是 1k
  • num-recorder:总共发送多少条 ,这里是 100 万
  • throughput:每秒多少条,-1 表示不限流尽快发完,这里是 3 万条。
  • producer-props 跟生产者相关参数

执行后看结果

可以看每秒的执行效率:recods/sec =9.76

提高 record-size:32768 看执行结果

可以看每秒的执行效率:recods/sec =9.76

record-size 变小,会发现执行的慢,所以要慢慢的调试。

调整 linger.ms

调整压缩方式

调整缓存大小

下面是主要的结果

不同的机器不一样,要根据自己的数据情况与机器情况进行测试。

消费者压力测试

bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic test --messages 10000000 ---consumer.config config/consumer.properties

consumer.properties:下面定义每次拉的条数

max.poll.records=500

消费者问题不大。