①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳✕✓✔✖
新的版本不需要 zookeeper 了,为了便于维护,可以安装一个 kafka-ui。
这种模式可以用做开发,不消耗机器的性能。
kafka-ui:image: provectuslabs/kafka-uicontainer_name: kafka-uirestart: alwaysdepends_on:- kafka1ports:- 9091:8080volumes:- ${DATA_PATH}/kafka_ui/data:/etc/localtime- /etc/localtime:/etc/localtime:roenvironment:# 集群名称- KAFKA_CLUSTERS_0_NAME=local# 集群地址- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092kafka1:image: 'bitnami/kafka'user: rootports:- 9192:9092- 9193:9093environment:- ALLOW_PLAINTEXT_LISTENER=yesvolumes:- ${DATA_PATH}/kafka1/data:/bitnami/kafka- /etc/localtime:/etc/localtime:ro
安装完毕后,输入:http://localhost:9091/
,可以看到控制页面。
version: '2'services:kafka-0:image: docker.io/bitnami/kafka:testingports:- '9092'- '9093'environment:- BRDEBUG=1- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093- KAFKA_CFG_BROKER_ID=0- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv- ALLOW_PLAINTEXT_LISTENER=yesvolumes:- kafka_0_data:/bitnami/kafkakafka-1:image: docker.io/bitnami/kafka:testingports:- '9092'- '9093'environment:- BRDEBUG=1- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093- KAFKA_CFG_BROKER_ID=1- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv- ALLOW_PLAINTEXT_LISTENER=yesvolumes:- kafka_1_data:/bitnami/kafkakafka-2:image: docker.io/bitnami/kafka:testingports:- '9092'- '9093'environment:- BRDEBUG=1- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv- KAFKA_CFG_BROKER_ID=2- ALLOW_PLAINTEXT_LISTENER=yesvolumes:- kafka_2_data:/bitnami/kafkavolumes:kafka_0_data:driver: localkafka_1_data:driver: localkafka_2_data:driver: local
可以模拟对集群的操作
root@kafka-0:/# /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic mytopic --partitions 3 --replication-factor 3Created topic "mytopic".root@kafka-0:/# /opt/bitnami/kafka/bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic mytopicTopic:mytopic PartitionCount:3 ReplicationFactor:3 Configs:Topic: mytopic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1Topic: mytopic Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2Topic: mytopic Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
容器会在 /bitnami/kafka/config/
and /bitnami/kafka/config/kraft
目录中,查找 (server.properties, log4j.properties, etc.)配置文件。
step1:可以通过镜像的 volumns 来配置
...services:kafka:...volumes:- 'kafka_data:/bitnami'+ - /path/to/server.properties:/bitnami/kafka/config/server.properties
step2:修改配置文件
vi /path/to/server.properties
step3:重启 kafka
docker-compose restart kafka
docker-compose exec kafka1 bashcd /opt/bitnami/kafka
https://kafka.apache.org/quickstart
Kafka 是一个分布式事件流平台,可让您读取、写入、存储和处理事件(在文档中也称为记录或消息) 跨多台机器。
示例事件包括付款交易、手机的地理位置更新、运输订单、传感器测量 来自物联网设备或医疗设备等等。这些事件在 topics 中组织和存储。 非常简化,topics 类似于文件系统中的文件夹,事件是该文件夹中的文件。
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic quickstart-events --create
# 查看topic列表bin/kafka-topics.sh --bootstrap-server localhost:9092 --list# 查看topic列表详情bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092# 查看消费者组bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 修改分区,扩分区,不能减少分区bin/kafka-topics.sh --alter --topic quickstart-events --partitions 3 --bootstrap-server localhost:9092# 修改过期时间,下面两行都可以bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic quickstart-events --add-config retention.ms=86400000bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-name quickstart-events --entity-type topics --add-config retention.ms=86400000# 删除topic 看看就可以了,别真删除了。bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092
Kafka 客户端通过网络与 Kafka 代理通信,用于写入(或读取)事件。 收到后,代理将以持久和容错的方式存储事件,只要您需要——甚至永远存储。
运行控制台创建者客户端,将一些事件写入主题。 默认情况下,您输入的每一行都会导致将单独的事件写入主题。
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic quickstart-eventsmy first eventmy second event
您可以随时停止创建者客户端。Ctrl-C
打开另一个终端会话并运行控制台使用者客户端以读取刚刚创建的事件:
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092my first eventmy second event
您可以随时停止使用者客户端。Ctrl-C
随意尝试:例如,切换回您的生产者终端(上一步)进行写入 其他事件,并查看事件如何立即显示在您的消费者终端中。
由于事件持久存储在 Kafka 中,因此可以根据需要多次读取它们,并被任意数量的使用者读取。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松验证这一点。
#指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --partition 0 --offset 100
消费组
消费组(Consumer Group)是一组独立消费者的集合,它们共同消费一个或多个 Topic 中的数据。消费组内的消费者协同工作,通过分摊该 Topic 中的所有分区,以实现消息的消费和处理。
消费组在 Kafka 消息队列中起到了至关重要的作用。它可以提供如下功能:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --group mygroup
通常情况下,消费组中的消费者都运行在不同的机器上,这样就可以实现分布式消费,以提高消息处理性能和可用性。Kafka 对消费组的实现也非常简单,通过在消费者在订阅 Topic 时,接受一个 Group ID 参数,就可以自动加入到一个消费组中。Kafka 会将 Group ID 相同的消费者映射到同一个 Consumer Group 中,以实现协同消费和分摊消费任务的目的。
查看数据积压
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup
kafka 数据积压处理方法
在 Kafka 中,由于消息的生产和消费速度可能不一致,导致消息会积压在 Kafka 的分区中,如果这些积压的消息处理不及时,会导致 Kafka 系统的性能下降和可用性降低等问题。因此,需要采取一些处理方法来解决数据积压问题:
增加消费者:增加消费者可以使消费任务并行执行,加快消息的处理速度。可以通过增加消费者的方式将积压的消息消费掉,提高系统处理速度和效率。
调整消费者组:当一个消费组中的消费者无法处理所有的消息时,可以考虑调整消费者组。可以增加消费者的数量或者更换消费者组,以适应消息处理的速度和大小。
调整消息分区:Kafka 中 Topic 的分区数也会影响数据积压的情况。可以调整分区数以改善数据读取和分发的情况,或者对热点 Topic 进行分区处理,以实现更好的性能和可用性。
调整消费 offset:若积压的消息都已经被处理过了,却还在 Kafka 中存在,可能是消费者消费 offset 设置错误导致的。可以通过 Kafka 的 offset 操作,重置消费 offset,跳过已经处理过的消息,减少数据积压的问题。
执行消息清洗:在消费 Kafka 消息时,可以额外执行一些消息清洗处理操作,将无用的数据过滤出去,或者将数据进行清理和格式化处理,减少中间处理环节,提高数据消费的效率和可用性。
使用 KAFKA CONNECT 将数据导入/导出为事件流。
您可能在现有系统(如关系数据库或传统消息传递系统)中拥有大量数据, 以及许多已经使用这些系统的应用程序。Kafka Connect 允许您持续摄取 数据从外部系统进入 KAFKA,反之亦然。
它是运行连接器的可扩展工具,连接器实现用于与外部系统交互的自定义逻辑。 因此,将现有系统与 Kafka 集成非常容易。为了使此过程更加容易, 有数百种这样的连接器随时可用。
在本快速入门中,我们将了解如何使用导入数据的简单连接器运行 Kafka Connect 从文件到 Kafka 主题,并将数据从 Kafka 主题导出到文件。
kafka-connect 有两个核心概念:Source 和 Sink。Source:负责导入数据到 kafka,Sink 负责从 kafka 导出数据,它们统称 Connector,即连接器。
还有两个重要概念:Task 和 Worker,每一个 Connector 都会协调一系列的 task 去执行任务,Connector 把一项工作任务分割成许多的 task,然后把 task 分发到各个 worker 进程中去执行。task 不保存自己的状态信息,而是交给特定的 kafka 主题去保存。
kafka-connect 提供了以下特性:
kafka 中通过connect-standalone.sh 和 connect-distributed.sh 命令来实现独立模式和分布式模式运行的 Kafka Connect,可以在 kafka 的/bin 目录下看到:
在独立模式中,所有操作都是在一个进程中完成的,它比较适合测试和功能验证的场景,但是无法充分利用 kafka 自身所提供的负载均衡和高容错特性。
下面来演示一下使用独立模式将一个文件中的内容导入到 kafka 中。
配置文件说明
connect-standalone.properties:用于 Work 进程运行的配置文件
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=truevalue.converter.schemas.enable=trueoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000plugin.path=libs/connect-file-3.4.0.jar
connect-file-source.properties
name=local-file-sourceconnector.class=FileStreamSourcetasks.max=1file=test.txttopic=connect-test
file:该连接器数据源文件路径
topic:设置连接器将数据导入哪个主题,如果该主题不存在则会自动创建,当然也可以自己提前创建好(推荐)
connect-file-sink.properties
name=local-file-sinkconnector.class=FileStreamSinktasks.max=1file=test.sink.txttopics=connect-test
这里主要修改connect-standalone.properties
,修改plugin.path
。
没有 vim 可以安装
apt updateapt install vim
修改connect-standalone.properties
cd /opt/bitnami/kafka/configmore connect-standalone.propertiesvim connect-standalone.properties# 添加下面内容:plugin.path=libs/connect-file-3.4.0.jar
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
在启动期间,你将看到许多日志消息,包括一些指示连接器正在实例化的消息。
查看输出文件
Kafka 连接过程启动后,可以验证结果,打开test.sink.txt
文件
> more test.sink.txtfoobar
在 kafka-ui 上看
注意数据存储在 Kafka 主题 ,所以我们也可以运行一个控制台消费者来查看 主题中的数据(或使用自定义使用者代码进行处理):connect-test
使用 kafka 消费者看
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}{"schema":{"type":"string","optional":false},"payload":"bar"}
继续添加数据
连接器继续处理数据,因此我们可以将数据添加到文件中,并查看它在管道中移动:
> echo Another line>> test.txt
应会看到该行出现在控制台使用者输出和接收器文件中。
https://gitee.com/xiaoyu-learning/kafka-demo.git
Kakfa 推出 Stream API 后,支持了事件时间,窗口,聚合函数等等常用流处理中常用功能,流数据处理方面基本已经向 Flink 靠齐了,下面是一些 Kafka 和 Flink 之间最重要的区别。
对比项 | Flink | Kafka Stream API |
---|---|---|
部署方式 | Flink 是一个集群框架,因此应用程序的部署由该框架负责 | Streams API 是一个任何标准 Java 应用程序都可以嵌入的库,因此不会改变你的部署方式,你基本上可以使用任意方式来部署应用程序。 |
代码生命周期 | 流处理代码在 Flink 集群中作为作业部署和运行 | 用户的流处理代码嵌入在他们的应用程序中运行 |
作业由谁负责管理 | 一般由数据基础设施或 BI 团队 | 一般由业务线团队 |
分布式协调 | Flink Master(JobManager),流式程序的一部分 | 利用 Kafka 集群进行协调、负载平衡和容错。 |
流数据来源 | Kafka、文件系统、其他消息队列等等 | 严格的 Kafka,Kafka 中的 Connect API 服务于解决数据进出 Kafka 的问题 |
流数据写入 | Kafka、其他 MQ、文件系统、数据库、K/V 存储、流处理器状态和其他外部系统 | Kafka、应用程序状态、操作数据库或任何外部系统 |
有界和无界数据流 | 无界与有界 | 无界 |
语义保证 | Flink 内部状态恰好一次;使用选定的输入源和输出源时 exactly once(例如,Kafka 到 Flink 到 HDFS);当 Kafka 为输出源时,at least once。将来很可能与 Kafka 端对端 Exactly-once | 端到端都是 Kafka,Exactly-once |
实时上 Flink 和 Kafka Stream 二者最核心的区别在于 Flink 和 Kafka Stream 的部署和管理模式,以及如何协调分布式处理(包括容错)。
Flink程序作为独立的作业进行部署和升级,Flink 作业可以自行启动和停止,从所有权的角度看,Flink 作业的运维通常由拥有框架运行的集群的团队负责,例如数据基础设施团队。
Kafka Stream API是一个标准的库,因此应用程序的生命周期由应用开发人员负责。Streams API 并未规定应如何配置、监控或部署应用程序,以及如何与公司现有的打包、部署、监控和运营工具无缝集成。从所有权的角度来看,Streams 应用程序通常由各自的产品团队负责。
Flink 有一个专门的主节点进行协调,而 Streams API 通过 Kafka 的消费者组协议依赖 Kafka broker 进行分布式协调和容错。
Flink 的容错、扩展、状态的同步都是由主节点(JobManager)进行全局协调的。主节点基于 ZooKeeper 实现了自己的高可用机制。这种方法可以帮助 Flink 获得高吞吐量,以及 exactly once 语义保证,以及 Flink 的 checkPoint 功能,并且它支持 Flink 的 exactly-once 的输出源(例如 HDFS 和 Cassandra,但不包含 Kafka)。
Kafka Streams API 利用 Kafka 提供容错、保证连续处理和高可用性。用户的应用程序的每个实例都独立运行,所有协调由 kafka broker 完成。容错是内置在 Kafka 协议中的,允许轻量级的集成到用户应用程序中。
Kafka Streams API 使流处理可作为应用程序的一部分运营,由于 Kafka Stream API 与 Kafka 中的核心概念紧密集成,应用程序可以利用并受益于 Kafka 的核心能力——性能、可扩展性、安全性、可靠性 以及端到端恰好一次。
Flink 非常适合能够部署在独立集群中的应用程序,并能让应用程序得到吞吐量,延迟、事件时间语义、保存点和操作特性、应用程序状态的一次性保证、端到端的一次性保证和批处理等功能
可以先选择 Flink
配置环境,gradle 引用
dependencies {implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.4.0'implementation group: 'org.slf4j', name: 'slf4j-simple', version: '2.0.7'testImplementation(platform("org.junit:junit-bom:5.9.1"))testImplementation("org.junit.jupiter:junit-jupiter")}
CustomProducer.java
CustomProducerCallback.java
CustomProducerSync.java
RoundRobinPartitioner.java
根据传输的内容,自定义要发往那个分区。
应用场景:
具体操作如下:
定义一个分区器:CustomPartitioner
public class CustomPartitioner implements Partitioner
关联 Producer
properties.setProperty(ProducerConfig.*PARTITIONER_CLASS_CONFIG*,CustomPartitioner.class.getName());
执行完毕,可以看到数据被写入了分区 1.
可以参考程序CustomProducerParameter.java
//缓存区大小properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, String.valueOf(33554432));//批次大小properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,String.valueOf(16384));//间隔时间properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(1));//压缩properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
acks=-1 数据重复
只有当 Leader 挂了,Follower 还没有数据同步玩。选出新的 Leader 时,会重新发送数据。概率很小,但是也要解决。
具体实现程序看CustomProducerAcks.java
//acks,默认是-1properties.setProperty(ProducerConfig.ACKS_CONFIG, String.valueOf(1));//重试参数,默认是int最大值properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(3));
如何精确保证一次?幂等性+最少一次
enable.idempotence=true
,默认就是幂等性
事务协调器
实际代码中很简单
public class CustomProducerTransaction {public static void main(String[] args){//1. 创建kafka生产对象Properties properties=CustomProducer.getBaseProperties();// 指定一个事务ID,这个ID不重复就可以了,可以自动得到一个雪花IDproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"123456");KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(properties);//1.1初始化事务kafkaProducer.initTransactions();//1.2开启事务kafkaProducer.beginTransaction();try{//2. 发送数据for(int i=0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("quickstart-events","1CustomProducerTransaction"+i));}//设置一个错误,//int i=2/0;//1.3提交事务kafkaProducer.commitTransaction();}catch (Exception e){//1.4回撤事务kafkaProducer.abortTransaction();}finally {//3. 关闭资源kafkaProducer.close();System.out.println("--------------------");}}}
在开启幂等性后max.inn.flight.requests.per.connection<=5
都可以让数据有序。
担心菜鸟在服务器上创建好多主题。
消费者初始化流程
消费者消费流程
kafka 会自动提交
例如:可以修改自动提交参数,修改成 3s。下面是给的一个代码
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,3000);
提交方法
同步提交案例
把自动提交关闭
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
手工撰写提交函数
while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}kafkaConsumer.commitSync();}
异步提交案例(再实际情况中,这种用的很多,为了提高效率)
kafkaConsumer.commitAsync();
auto.offset.reset=earliest|latest|none
,默认是latest
--from-beginning
以上是特殊情况,如果想指定位置呢? 按照下面的代码执行,但是要等一段时间,才能看到结果
//指定位置,首先要拿到所有的分区,然后从分区信息中指定offsetSet<TopicPartition> assignment = kafkaConsumer.assignment();//担心没有拿到分区方案,这里又等待了一下。while (assignment.size()==0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment=kafkaConsumer.assignment();}for(TopicPartition topicPartition:assignment){kafkaConsumer.seek(topicPartition,19470);}
如果发现近来几小时有问题,就按照时间重新消费一下。 消费端启动到开始读取数据,大概有 1 秒钟的时间,所以要耐心等待。
public class CustomConsumerSeekByTime {public static void main(String[] args) throws ParseException {//0 配置Properties properties = CustomConsumer.getBaseProperties();properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ConsumerSeekByTime");//1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2 订阅一个主题Collection<String> topics=new ArrayList<>();topics.add("quickstart-events");kafkaConsumer.subscribe(topics);//指定位置,首先要拿到所有的分区,然后从分区信息中指定offsetSet<TopicPartition> assignment = kafkaConsumer.assignment();//担心没有拿到分区方案,这里又等待了一下。while (assignment.size()==0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment=kafkaConsumer.assignment();}SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date dateTime = sdf1.parse("2023-6-4 15:20:24");//把时间抓换成offsetMap<TopicPartition, Long> timestampsToSearch=new HashMap<>();// 填充timestampsToSearchfor (TopicPartition topicPartition:assignment){timestampsToSearch.put(topicPartition,dateTime.getTime());}Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap= kafkaConsumer.offsetsForTimes(timestampsToSearch);for(TopicPartition topicPartition:assignment){OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}//3 消费数据while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> consumerRecord:consumerRecords){System.out.println(consumerRecord);}}}}
怎么解决上面的问题,需要添加事务处理。
前提是后面的处理,支持事务处理,例如 MySql。
消费者如何提高吞吐量?
物联网端
有 3w 台设备,每台每秒发 1 条,会连续不断发送 8 小时。
30000x8x60x60=864,000,000 条数据=8.64 亿条数据
1 条大小 128 字节。
每秒中发送 24.4MB 数据,8 小时发送数据 702,720MB =686.25G 数据
按照 365 天,那么应该存储 244.6T 数据
服务器选择
磁盘选择
内存选择
jstat -gc kafka进程号 ls 10
,看 YGC 的数量,如果几十次就不用修改了。jmap -heap 2321
G1 Heap 中看比例CPU 选择
非常占用线程,建议选择一个 32CPU (IO=12,副本 4,数据传输 12) 所以要修改下面的默认值
网络带宽选择
至少有 3 个节点,副本至少>=2
假设有 broke={0,1,2},现在要添加 3
如何把老节点的数据,负载均衡到新节点?
kafka-reassign-partitions.sh --broker-list "0,1,2,3" --generate
kafka-reassign-partitions.sh --execute
--verify
kafka-reassign-partitions.sh --broker-list "0,1,2" --generate
kafka-reassign-partitions.sh --execute
kafka-reassign-partitions.sh --verify
执行 bin/kafka-server-stop
副本的好处是为了提高可靠性,生产环境,一般做两个副本。
有 4 个分区,2 个副本,想让副本放在前两个分区上
如果 leader 分配在某几个机器上,那么会引发机器的不平衡。 系统默认就能把 leader 平衡了。
原先副本是 1,后来发现比较重要,想增加到 2。应该怎么做?
https://www.kafka-eagle.org/ ,可以观察一下新的版本,是否支持 kraft 模式
KafkaSink
可将数据流写入一个或多个 Kafka topic。
public class KafkaFlinkProducer {public static void main(String[] args) throws Exception {//1.获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//2.准备数据源//模拟一个集合作为数据源ArrayList<String> wordList = new ArrayList<>();for(int i=0;i<100;i++){wordList.add("test:"+i);}DataStreamSource<String> stream=env.fromCollection(wordList);//创建kafka sinkKafkaSink<String> kafkaSink=KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("quickstart-events").setValueSerializationSchema(new SimpleStringSchema()).build())//.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//3.添加数据源stream.sinkTo(kafkaSink).name("myKafkaSink").setParallelism(1);env.execute("myJob");}}
Kafka Source 提供了构建类来创建 KafkaSource
的实例。以下代码片段展示了如何构建 KafkaSource
来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:
为了显示队列中的内容,可以手工操作SingleOutputStreamOperator
,也可以使用PrintSink
public class KafkaFlinkConsumer {public static void main(String[] args) throws Exception {//1.获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//2.创建消费者KafkaSource<String> source=KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("quickstart-events").setGroupId("my-group").setValueOnlyDeserializer(new SimpleStringSchema())//.setStartingOffsets(OffsetsInitializer.earliest()).build();//3.关联消费者与flink流DataStreamSource<String> dataStreamSource= env.fromSource(source, WatermarkStrategy.noWatermarks(),"Kafka Source", TypeInformation.of(String.class));//4.执行// SingleOutputStreamOperator<String> myCounter = dataStreamSource.process(new ProcessFunction<>() {// @Override// public void processElement(String s, Context context, Collector<String> collector) {// System.out.println(s);// }// });PrintSink<String> printSink = new PrintSink<>(true);dataStreamSource.sinkTo(printSink).name("printSink").setParallelism(3);env.execute("Flink DataSource");}}
配置文件如下
spring:kafka:bootstrap-servers: localhost:9092
代码如下:
@Servicepublic class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public String send(String value){assert value!=null;CompletableFuture<SendResult<String,String>> completableFuture=kafkaTemplate.send("quickstart-events",value);completableFuture.thenAccept(result->{System.out.println("send ok:"+value);});completableFuture.exceptionally(e->{System.out.println(e.toString());return null;});return "ok";}}
其中group-id
一定要配置。
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: 'springGroup'
代码
@Configurationpublic class KafkaConsumer {@KafkaListener(topics = "quickstart-events")public void consumer(String value){System.out.println("收到kafka:"+value);}}
kafka 自带的有生产与消费测试脚本,可以执行一下就可以了。
测试完毕后,就可以模拟调优了。
创建一个测试的 test topic,设置三个分区,三个副本。
bin/kafka-topics.sh --bootstrap-server localhost:9002 --create --topic test --rtitions 3 --replication-factor 3
进行测试
bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-recorder 1000000 --throughput 30000 --producer-props bootstrap.servers=localhost:9092 batch.size=16384 linger.ms=0
参数说明:
执行后看结果
可以看每秒的执行效率:recods/sec =9.76
提高 record-size:32768 看执行结果
可以看每秒的执行效率:recods/sec =9.76
record-size 变小,会发现执行的慢,所以要慢慢的调试。
调整 linger.ms
调整压缩方式
调整缓存大小
下面是主要的结果
不同的机器不一样,要根据自己的数据情况与机器情况进行测试。
bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic test --messages 10000000 ---consumer.config config/consumer.properties
consumer.properties:下面定义每次拉的条数
max.poll.records=500
消费者问题不大。