侧边栏壁纸
  • 累计撰写 39 篇文章
  • 累计创建 51 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Kafka学习笔记

叶子
2024-04-30 / 0 评论 / 0 点赞 / 335 阅读 / 8,594 字

配套代码:https://gitee.com/benbenyezi/kafka

一、为什么使用消息队列

1. 使用同步的通信方式来解决多个服务之间的通信

image-20240127092528953

同步的通行方式存在性能和稳定性的问题。

2. 使用异步的通信方式

image-20240127093501545

针对于同步的方式来说,异步的方式,可以让上游快速成功,极大提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务,也能保障业务执行之后的最终一致性。

消息队列解决具体的是什么问题 -> 通信问题。

二、关于消息队列的流派

目前消息队列的中间件选型有很多种:

  • rabbitMQ:比较简单,功能强大
  • rocketMQ:阿里根据Kafka内部原理,开源的一款消息中间件。性能与kafka相比肩,功能比kafka更多(顺序消费等)
  • kafka:全球消息处理性能最快
  • zeroMQ:

这些消息队列中间件有什么区别?

1. 有broker

  • 重Topic:kafka、rocketmq、activemq

    整个broker,依据topic来进行消息的中转,在重topic的消息队列里必然需要topic的存在

  • 轻Topic:rabbitmq

    topic只是一种中转模式

2. 无broker

在生产者和消费者之间没有使用broker,例如zeromq,直接使用socket进行通信。

三、Kafka的基本知识

1. Kafka的安装

  • 安装JDK

    yum install java-1.8.0-openjdk*
    
  • 部署zookeeper

    • 下载zookeeper : Apache ZooKeeper
    • 进入bin目录启动zookeeper ./zkServer.sh start
    • 进入bin目录查看zookeeper状态 ./zkServer.sh status
  • 部署Kafka

    • 下载kafka:Apache Kafka

    • 修改conf下的server.properties

      # The id of the broker. This must be set to a unique integer for each broker.
      broker.id=0
      
      #   EXAMPLE:
      #     listeners = PLAINTEXT://your.host.name:9092
      listeners=PLAINTEXT://192.168.171.131:9092
      
      # A comma separated list of directories under which to store log files
      log.dirs=/opt/kafka/data/kafka-logs
      
      # root directory for all kafka znodes.
      zookeeper.connect=192.168.171.131:2181
      
    • 进入bin目录启动kafka

      ./kafka-server-start.sh -daemon ../config/server.properties
      
  • 检查Kafka启动状态

    • 进入zookeeper/bin目录,执行./zkCli.sh,查看ls /brokers/ids

      [zk: localhost:2181(CONNECTED) 3] ls /
      [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
      [zk: localhost:2181(CONNECTED) 4] ls /brokers
      [ids, seqid, topics]
      [zk: localhost:2181(CONNECTED) 5] ls /brokers/ids
      [0]
      [zk: localhost:2181(CONNECTED) 6] 
      

      image-20240127104142387

2. kafka中的一些基本概念

kafka中有这么些复杂的概念

名称 解释
Broker 消息中间件处理节点,一个kafka节点就是一个broker,一个或者多个broker可以组成一个kafka集群
Topic kafka根据topic对消息进行归类,发布到kafka集群的每条消息都需要指定一个topic
Producer 消息生产者,向broker发送消息的客户端
Consumer 消息消费者,向broker获取消息的客户端

image-20240127104841645

3. 创建Tocpic

  • 通过kafka命令在zk中创建一个主题

    ./kafka-topics.sh --create --zookeeper 192.168.171.131:2181 --replication-factor 1 --partitions 1 --topic test
    
  • 查看当前zk中所有的主题

    ./kafka-topics.sh --list --zookeeper 192.168.171.131:2181
    

4. 发送消息

把消息发送给broker中的某个topic

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.171.131:9092 --topic test

5. 消费消息

  • 方式一:从当前主题的最后一条消息的offset(偏移量) +1开始消费
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --topic test
  • 方式二:从当前主题从头开始消费消息
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --from-beginning --topic test

6. 关于消息的细节

image-20240127112238656

  • 生产者将消息发送给broker,broker会将消息保存到本地日志文件中
/opt/kafka/data/kafka-logs/test-0/00000000000000000000.log
  • 消息的保存是有序的,通过 offset 偏移量来描述消息的有序性
  • 消费者消费消息时也是通过 offset 来描述当前要消费的那条消息的位置

7. 单播消息

在一个kafka的topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?

如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的 topic中的消息。换言之,同一个消费组中只能有一个消费者收到一个topic中的消息。

 ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --consumer-property group.id=testGroup  --topic test

8. 多播消息

不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息。

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --consumer-property group.id=testGroup --topic test


[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --consumer-property group.id=testGroup1  --topic test

image-20240127121842248

9. 查看消费组的详细信息

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.171.131:9092 --describe --group testGroup1

image-20240127122425134

重点关注几个信息:

  • CURRENT-OFFSET :最后被消费的消息的偏移量
  • LOG-END-OFFSET :消息总量(最后一条消息的偏移量)
  • LAG:积压了多少条消息

四、Kafka中主题和分区的概念

1. 主题Topic

主题topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费者消费。
但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中。为了解决这个文件过大的问题,kafka提出了Partition分区的概念。

2. 分区partition

1) 分区的概念

通过partition将一个topic中的消息分区来存储。这样的好处有多个:

  • 分区存储,可以解决统一存储文件过大的问题
  • 提升了读写的吞吐量:读和写可以同时在多个分区中进行

3) 创建多分区的主题

.\kafka-topics.bat --create --bootstrap-server 10.88.40.40:9092,10.88.40.40:9093,10.88.40.40:9094 --partitions 2 --topic test1

注意:Kafka1.0版本以后,如果不指定 --replication-factor 参数, 则默认为集群默认值:1

3. Kafka中消息日志文件中保存的内容

  • 0000000000.log:这个文件中保存的就是消息

  • __consumer_offsets-49:

    kafka内部自己创建了 consumer_offsets主题包含了50个分区。这个主题用来存放消费者消息某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量。也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。

    • 至于提交到哪个分区,通过hash函数:hash(consumerGroupId) % __consumer_offsets主题的分区数。
    • 提交到该主题中的内容是:key是consumerGroupId + topic + 分区号,value是当前offset的值。
  • 文件中保存的消息,默认保存7天。7天后消息会被删除。

五、kafka集群操作

1. 搭建kafka集群(三个broker)

  • 创建三个server.properties文件
# 0 / 1 / 2 
broker.id=0

# 9092 / 9093 / 9094
listeners=PLAINTEXT://192.168.18.139:9092

# kafka-logs / kafka-logs-1 / kafka-logs-2
log.dirs=/opt/kafka/data/kafka-logs
  • 通过命令启动三台broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
  • 校验是否启动成功

进入zk中查看 ls /brokers/ids 中是否有三个znode(0 / 1 / 2)

image-20240129202406838

2. 副本的概念

创建一个主题、两个分区、三个副本

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.18.139:2181 --replication-factor 3 --partitions 2 --topic  my-replicated-topic
Created topic my-replicated-topic.
[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 192.168.18.139:2181 --topic my-replicated-topic
Topic: my-replicated-topic      PartitionCount: 2       ReplicationFactor: 3    Configs: 
        Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: my-replicated-topic      Partition: 1    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

在创建主题时,除了指明主题的分区数以外,还指明了副本数,那么副本是一个什么概念呢?

副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower。

image-20240129203101465

image-20240129204112098

  • leader:负责kafka的读写操作,并把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader。
  • follower:接收leader的同步数据。
  • isr:可以同步和已同步的节点会被存入到isr集合中。这里有一个细节:如果isr中的节点性能较差,会被踢出isr集合。

重点:集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里。

3. kafka集群消息发送

./kafka-console-producer.sh --broker-list 192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094 --topic my-replicated-topic

4. kafka集群消息消费

./kafka-console-consumer.sh --bootstrap-server 192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094 --consumer-property group.id=testGroup1 --from-beginning --topic my-replicated-topic

5. kafka集群消费

  • 分区消费组的集群消费中的细节

image-20240129212002362

  • 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partition的多个消费者消费的总的顺序性是得不到保证的,那么怎么做到消费的总顺序性呢?
  • partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息。
  • 如果消费者挂了,会触发rebalance机制,会让其他消费者来消费该分区。

六、kafka在java客户端-生产者的实现

1. Java客户端生产者实现

  • 引入依赖

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
  • 具体代码

package com.example.kafka.test;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer01 {

    private static Producer<String, String> createProducer() {
        // 设置 Producer 的属性
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094"); // 设置 Broker 的地址
        properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 发送失败时,重试发送的次数
//        properties.put("batch.size", 16384);
//        properties.put("linger.ms", 1);
//        properties.put("client.id", "DemoProducer");
//        properties.put("buffer.memory", 33554432);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 key 的序列化方式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 value 的序列化方式

        // 创建 KafkaProducer 对象
        // 因为我们消息的 key 和 value 都使用 String 类型,所以创建的 Producer 是 <String, String> 的泛型。
        return new KafkaProducer<>(properties);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建 KafkaProducer 对象
        Producer<String, String> producer = createProducer();

        // 创建消息。传入的三个参数,分别是 Topic ,消息的 key ,消息的 message 。
        ProducerRecord<String, String> message = new ProducerRecord<>("my-replicated-topic", "1111", "yudaoyuanma");

        // 同步发送消息
        RecordMetadata metadata = producer.send(message).get();
        System.out.println("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());

    }

}

2. 生产者的同步发送消息

image-20240129215652523

如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试次数3次。

3. 生产者的异步发消息

image-20240129220734267

异步发送,生产者发送完消息就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法。

package com.example.kafka.test;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer01 {

    private static Producer<String, String> createProducer() {
        // 设置 Producer 的属性
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094"); // 设置 Broker 的地址
        properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
        properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 发送失败时,重试发送的次数
//        properties.put("batch.size", 16384);
//        properties.put("linger.ms", 1);
//        properties.put("client.id", "DemoProducer");
//        properties.put("buffer.memory", 33554432);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 key 的序列化方式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 value 的序列化方式

        // 创建 KafkaProducer 对象
        // 因为我们消息的 key 和 value 都使用 String 类型,所以创建的 Producer 是 <String, String> 的泛型。
        return new KafkaProducer<>(properties);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建 KafkaProducer 对象
        Producer<String, String> producer = createProducer();

        // 创建消息。传入的三个参数,分别是 Topic ,消息的 key ,消息的 message 。
        ProducerRecord<String, String> message = new ProducerRecord<>("my-replicated-topic", 1,"1111", "yudaoyuanma");


        // 异步发送消息
        producer.send(message, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null){
                    System.err.println("发送消息失败:" + exception.getMessage());
                }
                if (metadata != null){
                    System.out.println("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());
                }
            }
        });

        Thread.sleep(10000L);
    }

}

4. 生产者中的ack的配置

在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?此时ack有3个配置:

  • ack = 0 :kafka-cluster不需要任何的broker收到消息,就会立即返回ack给生产者,最容易丢消息,效率最高。

  • ack = 1 (默认):多副本之间的leader已经收到消息,并把消息写入到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的。

  • ack = -1/all :里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但是性能最差。

    image-20240130205022895

    关于ack 和 重试(如果没有收到ack,就开启重试)的配置

    // 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
    properties.put(ProducerConfig.ACKS_CONFIG, "1"); 
    // 发送失败时,重试发送的次数
    properties.put(ProducerConfig.RETRIES_CONFIG, 3); 
    // 重试间隔设置 
    properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
    

5. 关于消息发送的缓冲区

image-20240130205359208

  • kafka默认会创建一个小希缓冲区,用来存放要发送的消息,默认32MB

    properties.put("buffer.memory", 33554432);
    
  • kafka本地线程会去缓冲区中一次拉16k的数据,发送到broker

    properties.put("batch.size", 16384);
    
  • 如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发送到broker

    properties.put("linger.ms", 10);
    

七、Java客户端消费者的实现细节

1. 消费者的基本实现

package com.example.kafka.test;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyConsumer01 {


    private static Consumer<String, String> createConsumer() {
        // 设置 Producer 的属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094"); // 设置 Broker 的地址
        properties.put("group.id", "test-consumer-group"); // 消费者分组
        properties.put("auto.offset.reset", "earliest"); // 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
        properties.put("enable.auto.commit", true); // 是否自动提交消费进度
        properties.put("auto.commit.interval.ms", "1000"); // 自动提交消费进度频率
        properties.put("key.deserializer", StringDeserializer.class.getName()); // 消息的 key 的反序列化方式
        properties.put("value.deserializer", StringDeserializer.class.getName()); // 消息的 value 的反序列化方式

        // 创建 KafkaProducer 对象
        // 因为我们消息的 key 和 value 都使用 String 类型,所以创建的 Producer 是 <String, String> 的泛型。
        return new KafkaConsumer<>(properties);
    }

    public static void main(String[] args) {
        // 创建 KafkaConsumer 对象
        Consumer<String, String> consumer = createConsumer();

        // 订阅消息
        consumer.subscribe(Collections.singleton("my-replicated-topic"));

        // 拉取消息
        while (true) {
            // 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。
            ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
            // 遍历处理消息
            records.forEach(new java.util.function.Consumer<ConsumerRecord>() {
                @Override
                public void accept(ConsumerRecord record) {
                    System.out.println(record.key() + "\t" + record.value());
                }

            });
        }
    }
}

2. 关于消费者的自动提交和手动提交offset

1. 提交的内容

消费者无论是自动提交还是手动提交,都需哟把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。

2. 自动提交

消费者poll消息下来以后就会自动提交offset

properties.put("enable.auto.commit", true); // 是否自动提交消费进度
properties.put("auto.commit.interval.ms", "1000"); // 自动提交消费进度频率

注意:自动提交会丢消息。因为消费者在消费前提交offset,有可能提交完后还没消费时消费者挂了。

3. 手动提交

需要把自动提交的配置改成false

properties.put("enable.auto.commit", false); // 是否自动提交消费进度

手动提交又分成了两种:

  • 手动同步提交

    在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑

           // 拉取消息
            while (true) {
                // 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。
                ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
                // 遍历处理消息
                records.forEach(new java.util.function.Consumer<ConsumerRecord>() {
                    @Override
                    public void accept(ConsumerRecord record) {
                        System.out.println(record.key() + "\t" + record.value());
                    }
    
                });
    
                if (records.count() > 0) {
                    consumer.commitSync();
                }
    
            }
    
  • 手动异步提交

    在消息消费完后提交,不需要等到集群的ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用

    // 拉取消息
            while (true) {
                // 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。
                ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
                // 遍历处理消息
                records.forEach(new java.util.function.Consumer<ConsumerRecord>() {
                    @Override
                    public void accept(ConsumerRecord record) {
                        System.out.println(record.key() + "\t" + record.value());
                    }
    
                });
    
                if (records.count() > 0) {
                    consumer.commitAsync(new OffsetCommitCallback() {
                        @Override
                        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                            if (exception != null){
                                System.err.println(offsets);
                                System.err.println(exception.getMessage());
                            }
                        }
                    });
                }
    
            }
    

    image-20240130214948269

3. 长轮询poll消息

  • 默认情况下, 消费者一次会poll 500条消息
// 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  • 代码中设置了长轮询的时间是1000毫秒

     while (true) {
                // 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。
                ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
                // 遍历处理消息
                records.forEach(new java.util.function.Consumer<ConsumerRecord>() {
                    @Override
                    public void accept(ConsumerRecord record) {
                        System.out.println(record.key() + "\t" + record.value());
                    }
    
                });
     }
    

    意味着:

    • 如果一次poll到500条,直接执行forEach循坏
    • 如果这一次没有poll到500条,且时间在10秒内,那么长轮询继续poll,要么到500条,要么到10s。
    • 如果多次poll都没到达500条,且10秒时间到了,那么直接执行forEach循坏。
  • 如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制, rebalance机制会造成性能开销,可以通过设置参数,让一次poll的消息条数少一些。

    // 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
    // 如果两次poll的时间超过30s的时间间隔,kafka会认为消费者消费能力过弱,将其踢出消费组,将分区分配给其他消费者。 -rebalance
    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
    

4. 消费者的健康状态检查

消费者每个1s向kafka集群发送心跳进行续约,集群发现如果超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组的其他消费者进行消费。

// consumer给broker发送心跳的间隔时间
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

5. 指定分区消费

consumer.assign(Arrays.asList(new TopicPartition("my-replicated-topic",0)));

6. 消息的回溯消费

consumer.assign(Arrays.asList(new TopicPartition("my-replicated-topic", 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition("my-replicated-topic",0)));

7. 指定offset消费

consumer.assign(Arrays.asList(new TopicPartition("my-replicated-topic", 0)));
consumer.seek(new TopicPartition("my-replicated-topic",0),10);

8. 从指定时间点消费

根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。

		List<PartitionInfo> partitionInfos = consumer.partitionsFor("my-replicated-topic");
		long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
		Map<TopicPartition, Long> map = new HashMap<>();
		for (PartitionInfo par : partitionInfos) {
			map.put(new TopicPartition("my-replicated-topic", par.partition()), fetchDataTime);
		}
		Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
		for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
			TopicPartition key = entry.getKey();
			OffsetAndTimestamp value = entry.getValue();
			if (key == null || value == null) {
				continue;
			}
			long offset = value.offset();
			System.out.println("partiton-" + key.partition() + "|offset-" + offset);
			System.out.println();
			if (value != null) {
				consumer.assign(Arrays.asList(key));
				consumer.seek(key, offset);
			}
		}

9. 新消费组的消费offset规则

新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新的消息)。可以通过以下的设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

  • latest(默认):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    参考:org.apache.kafka.clients.consumer.ConsumerConfig
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

10. Springboot中使用Kafka

1. 引入依赖

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

2. 编写配置文件

server:
  port: 8081
spring:
  kafka:
    bootstrap-servers: 10.88.40.40:9092,10.88.40.40:9093,10.88.40.40:9094 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
    producer:
      acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
      retries: 3 # 发送失败时,重试发送的次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
      batch-size: 16384 # 每次批量发送消息的最大数量
      buffer-memory: 33554432 # 每次批量发送消息的最大内存
      properties:
        spring.json.add.type.headers: false
        linger.ms: 30000 # 批处理延迟时间上限。这里配置为 30 * 1000 ms 过后,不管是否消息数量是否到达 batch-size 或者消息大小到达 buffer-memory 后,都直接发送一次请求。
    consumer:
      auto-offset-reset: latest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #org.springframework.kafka.support.serializer.JsonDeserializer
      fetch-max-wait: 1000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
      fetch-min-size: 10 # poll 一次消息拉取的最小数据量,单位:字节
      max-poll-records: 100 # poll 一次消息拉取的最大数量
      properties:
        spring.json.trusted.packages: com.example.kafka.message
      enable-auto-commit: false
    # Kafka Consumer Listener 监听器配置
    listener:
      type: SINGLE # 监听器类型,默认为 SINGLE ,只监听单条消息。这里我们配置 BATCH ,监听多条消息,批量消费
      missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
      ack-mode: manual_immediate

logging:
  level:
    org:
      springframework:
        kafka: ERROR # spring-kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别
      apache:
        kafka: ERROR # kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别

3. 编写消息生产者

package com.example.kafka.controller;

import javax.annotation.Resource;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.fastjson.JSON;
import com.example.kafka.entity.UserEntity;

/**
 * @author Li Zemin
 * @since 2024/2/1 9:17
 */
@RestController
@RequestMapping
public class SendController {

	@Resource
	private KafkaTemplate kafkaTemplate;

	@GetMapping("/send")
	public String send() {
		UserEntity user = UserEntity.builder()
				.id(System.currentTimeMillis()+"")
				.name(generateRandomString(3))
				.age(20)
				.build();
		this.kafkaTemplate.send("my-replicated-topic", user.getId(), JSON.toJSONString(user));
		return "ok";
	}

	private static String generateRandomString(int len) {
		StringBuilder str = new StringBuilder();
		for (int i = 0; i < len; i++) {
			char result = (char) (0x4e00 + (int) (Math.random() * (0x9fa5 - 0x4e00 + 1)));
			str.append(result);
		}
		return str.toString();
	}


}

4. 编写消息消费者

package com.example.kafka.controller;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import com.example.kafka.message.Demo01Message;

import lombok.extern.slf4j.Slf4j;

/**
 * @author Li Zemin
 * @since 2024/2/1 9:28
 */
@Slf4j
@Component
public class ConsumerController {


	@KafkaListener(topics = "my-replicated-topic",groupId = "demo01-A-consumer-group-" + "my-replicated-topic")
	public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
		log.info("demo01-A-consumer-group-" + "my-replicated-topic");
		log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
		ack.acknowledge();
	}

}

5. 消费者中配置消费主题、分区、offset

	@KafkaListener(groupId = "demo01-A-consumer-group-" + "my-replicated-topic", topicPartitions = {
			@TopicPartition(topic = "my-replicated-topic", partitions = {"0", "1"}),
			@TopicPartition(topic = Demo01Message.TOPIC, partitions = "0", partitionOffsets =
			@PartitionOffset(partition = "1", initialOffset = "100"))
	}, concurrency = "3")
	public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment ack) {
		log.info("demo01-A-consumer-group-" + "my-replicated-topic");
		log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
		ack.acknowledge();
	}

6. 消费者提交模式

image-20240201193746711

九、kafka集群中的controller、rebalance、HW

1. controller

  • 集群中谁来充当controller?
    每个broker启动时会向zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller,负责这么几件事:
    • 当集群中有一个副本的leader挂掉,需要在集群中选举一个新的leader,选举的规则是从isr集合中最左边获得。
    • 当集群中有broker新增或减少,controller会同步信息给其他broker。
    • 当集群中有分区新增或者减少,controller会同步信息给其他broker。

2. rebalance机制

  • 前提:消费组中的消费者没有指明分区来消费
  • 触发的条件:当消费组中的消费者和分区的关系发生变化的时候
  • 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
    • range:根据公示计算得到每个消费者消费哪几个分区:前面的消费者是分区总数/消费者数量+1,之后的消费者是分区总数/消费者数量。
    • 轮询:大家轮着来
    • sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启。

3. HW和LEO

LEO是某个副本最后消息的消息位置(log-end-offset)

HW是已完成同步的位置。消息在写入broker时,且每个broker完成这条消息的同步后,hw才会变化。在这之前消费者是消费不带这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失。

image-20240201200047941

十、Kafka线上问题优化

1. 如何防止消息丢失

  • 生产者:1) 使用同步发送 2) 把ack设成1或者all,并且设置同步的分区数>=2
  • 消费者:自动提交变成手动提交

2. 如何防止重复消费

在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。
此时生产者会进行重试,于是broker就会收到多条相同的消息,而造成消费者的重复消费。
怎么解决:

  • 生产者关闭重试:造成消息丢失(不建议)

  • 消费者解决幂等性消费问题:

    • 所谓的幂等性:多次访问的结果是一样的。对于rest的请求(get(幂等)、post(非幂等)、put(幂等)、delete(幂等))
      解决方案:
      • 在数据库中创建联合主键,防止相同的主键创建多条记录

      • 使用分布式锁,以业务ID作为锁,保证只有一条记录可以创建成功

        image-20240202191610932

3. 如何做到消息的顺序消费

  • 生产者:保证消息按顺序消费,且消息不丢失——使用同步的发送方式,ack设置成非0的值
  • 消费者:主题只能设置一个分区,消费组中只能有一个消费者
  • kafka的顺序消费使用场景不多,因为牺牲了性能,但是比如rocketmq已有现成的能力。

image-20240202201016689

4. 如何解决消息积压问题

image-20240202202542191

1) 消息积压问题的出现

消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个kafka对外提供的服务的性能很差,从而造成其他服务也访问变慢,造成服务雪崩。

2) 消息积压的解决方案

  • 在这个消费者中,使用多线程,充分利用机器的性能进行消息消费。
  • 通过业务的架构设计,提升业务层面消费的性能。
  • 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度。
  • 创建一个消费者,该消费者在kafka另建一个主题,配上对个分区,多个分区再配上多个消费者。该消费者将poll下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。——不常用

image-20240202204825266

5. 延迟队列

1)应用场景

订单创建后,超过30分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现

2)具体方案

image-20240202210432993

  • kafka中创建相应的主题

  • 消费者消费该主题的消息(轮询)

  • 消费者消费消息时判断消息的创建时间,是否超过30分钟(前提是订单没支付)

    • 如果是:数据库修改订单状态为已取消
    • 如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及后面的消息,继续进行判断,以此反复。

十一、Kafka-eagle监控平台

1. 搭建

  • 去kafka-eagle官网下载压缩包:EFAK (kafka-eagle.org)

  • 配置环境变量/etc/profile JAVA_HOME

    #set java environment  
    export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.402.b06-1.el7_9.x86_64/
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
    export PATH=$PATH:$JAVA_HOME/bin
    
  • 配置环境变量 /etc/profile

    export KE_HOME=/opt/kafka-eagle/kafka-eagle-bin-3.0.1/efak-web-3.0.1
    export PATH=$PATH:$KE_HOME/bin
    
  • 修改配置文件system-config.properties,zk地址及MySQL地址

    ######################################
    # multi zookeeper & kafka cluster list
    # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
    ######################################
    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=192.168.18.139:2181
    
    ######################################
    # kafka mysql jdbc driver address
    ######################################
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://192.168.18.139:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=root
    efak.password=root
    
    
  • 启动kafka-eagle,进入/opt/kafka-eagle/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin目录,执行 ./ke.sh start

其他

image-20240127112644526

0

评论区