一、为什么使用消息队列
1. 使用同步的通信方式来解决多个服务之间的通信
同步的通行方式存在性能和稳定性的问题。
2. 使用异步的通信方式
针对于同步的方式来说,异步的方式,可以让上游快速成功,极大提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务,也能保障业务执行之后的最终一致性。
消息队列解决具体的是什么问题 -> 通信问题。
二、关于消息队列的流派
目前消息队列的中间件选型有很多种:
- rabbitMQ:比较简单,功能强大
- rocketMQ:阿里根据Kafka内部原理,开源的一款消息中间件。性能与kafka相比肩,功能比kafka更多(顺序消费等)
- kafka:全球消息处理性能最快
- zeroMQ:
这些消息队列中间件有什么区别?
1. 有broker
-
重Topic:kafka、rocketmq、activemq
整个broker,依据topic来进行消息的中转,在重topic的消息队列里必然需要topic的存在
-
轻Topic:rabbitmq
topic只是一种中转模式
2. 无broker
在生产者和消费者之间没有使用broker,例如zeromq,直接使用socket进行通信。
三、Kafka的基本知识
1. Kafka的安装
-
安装JDK
yum install java-1.8.0-openjdk*
-
部署zookeeper
- 下载zookeeper : Apache ZooKeeper
- 进入
bin
目录启动zookeeper./zkServer.sh start
- 进入
bin
目录查看zookeeper状态./zkServer.sh status
-
部署Kafka
-
下载kafka:Apache Kafka
-
修改
conf
下的server.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.171.131:9092 # A comma separated list of directories under which to store log files log.dirs=/opt/kafka/data/kafka-logs # root directory for all kafka znodes. zookeeper.connect=192.168.171.131:2181
-
进入
bin
目录启动kafka./kafka-server-start.sh -daemon ../config/server.properties
-
-
检查Kafka启动状态
-
进入
zookeeper/bin
目录,执行./zkCli.sh
,查看ls /brokers/ids
[zk: localhost:2181(CONNECTED) 3] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] [zk: localhost:2181(CONNECTED) 4] ls /brokers [ids, seqid, topics] [zk: localhost:2181(CONNECTED) 5] ls /brokers/ids [0] [zk: localhost:2181(CONNECTED) 6]
-
2. kafka中的一些基本概念
kafka中有这么些复杂的概念
名称 | 解释 |
---|---|
Broker | 消息中间件处理节点,一个kafka节点就是一个broker,一个或者多个broker可以组成一个kafka集群 |
Topic | kafka根据topic对消息进行归类,发布到kafka集群的每条消息都需要指定一个topic |
Producer | 消息生产者,向broker发送消息的客户端 |
Consumer | 消息消费者,向broker获取消息的客户端 |
3. 创建Tocpic
-
通过kafka命令在zk中创建一个主题
./kafka-topics.sh --create --zookeeper 192.168.171.131:2181 --replication-factor 1 --partitions 1 --topic test
-
查看当前zk中所有的主题
./kafka-topics.sh --list --zookeeper 192.168.171.131:2181
4. 发送消息
把消息发送给broker中的某个topic
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.171.131:9092 --topic test
5. 消费消息
- 方式一:从当前主题的最后一条消息的offset(偏移量) +1开始消费
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --topic test
- 方式二:从当前主题从头开始消费消息
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --from-beginning --topic test
6. 关于消息的细节
- 生产者将消息发送给broker,broker会将消息保存到本地日志文件中
/opt/kafka/data/kafka-logs/test-0/00000000000000000000.log
- 消息的保存是有序的,通过 offset 偏移量来描述消息的有序性
- 消费者消费消息时也是通过 offset 来描述当前要消费的那条消息的位置
7. 单播消息
在一个kafka的topic中,启动两个消费者,一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?
如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的 topic中的消息。换言之,同一个消费组中只能有一个消费者收到一个topic中的消息。
./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --consumer-property group.id=testGroup --topic test
8. 多播消息
不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息。
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --consumer-property group.id=testGroup --topic test
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.171.131:9092 --consumer-property group.id=testGroup1 --topic test
9. 查看消费组的详细信息
[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.171.131:9092 --describe --group testGroup1
重点关注几个信息:
- CURRENT-OFFSET :最后被消费的消息的偏移量
- LOG-END-OFFSET :消息总量(最后一条消息的偏移量)
- LAG:积压了多少条消息
四、Kafka中主题和分区的概念
1. 主题Topic
主题topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费者消费。
但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中。为了解决这个文件过大的问题,kafka提出了Partition分区的概念。
2. 分区partition
1) 分区的概念
通过partition将一个topic中的消息分区来存储。这样的好处有多个:
- 分区存储,可以解决统一存储文件过大的问题
- 提升了读写的吞吐量:读和写可以同时在多个分区中进行
3) 创建多分区的主题
.\kafka-topics.bat --create --bootstrap-server 10.88.40.40:9092,10.88.40.40:9093,10.88.40.40:9094 --partitions 2 --topic test1
注意:Kafka1.0版本以后,如果不指定 --replication-factor
参数, 则默认为集群默认值:1
3. Kafka中消息日志文件中保存的内容
-
0000000000.log:这个文件中保存的就是消息
-
__consumer_offsets-49:
kafka内部自己创建了 consumer_offsets主题包含了50个分区。这个主题用来存放消费者消息某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量。也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区。
- 至于提交到哪个分区,通过hash函数:hash(consumerGroupId) % __consumer_offsets主题的分区数。
- 提交到该主题中的内容是:key是consumerGroupId + topic + 分区号,value是当前offset的值。
-
文件中保存的消息,默认保存7天。7天后消息会被删除。
五、kafka集群操作
1. 搭建kafka集群(三个broker)
- 创建三个server.properties文件
# 0 / 1 / 2
broker.id=0
# 9092 / 9093 / 9094
listeners=PLAINTEXT://192.168.18.139:9092
# kafka-logs / kafka-logs-1 / kafka-logs-2
log.dirs=/opt/kafka/data/kafka-logs
- 通过命令启动三台broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
- 校验是否启动成功
进入zk中查看 ls /brokers/ids 中是否有三个znode(0 / 1 / 2)
2. 副本的概念
创建一个主题、两个分区、三个副本
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.18.139:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
Created topic my-replicated-topic.
[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 192.168.18.139:2181 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: my-replicated-topic Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
在创建主题时,除了指明主题的分区数以外,还指明了副本数,那么副本是一个什么概念呢?
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower。
- leader:负责kafka的读写操作,并把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader。
- follower:接收leader的同步数据。
- isr:可以同步和已同步的节点会被存入到isr集合中。这里有一个细节:如果isr中的节点性能较差,会被踢出isr集合。
重点:集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里。
3. kafka集群消息发送
./kafka-console-producer.sh --broker-list 192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094 --topic my-replicated-topic
4. kafka集群消息消费
./kafka-console-consumer.sh --bootstrap-server 192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094 --consumer-property group.id=testGroup1 --from-beginning --topic my-replicated-topic
5. kafka集群消费
- 分区消费组的集群消费中的细节
- 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partition的多个消费者消费的总的顺序性是得不到保证的,那么怎么做到消费的总顺序性呢?
- partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息。
- 如果消费者挂了,会触发rebalance机制,会让其他消费者来消费该分区。
六、kafka在java客户端-生产者的实现
1. Java客户端生产者实现
-
引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
具体代码
package com.example.kafka.test;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MyProducer01 {
private static Producer<String, String> createProducer() {
// 设置 Producer 的属性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094"); // 设置 Broker 的地址
properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 发送失败时,重试发送的次数
// properties.put("batch.size", 16384);
// properties.put("linger.ms", 1);
// properties.put("client.id", "DemoProducer");
// properties.put("buffer.memory", 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 key 的序列化方式
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 value 的序列化方式
// 创建 KafkaProducer 对象
// 因为我们消息的 key 和 value 都使用 String 类型,所以创建的 Producer 是 <String, String> 的泛型。
return new KafkaProducer<>(properties);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建 KafkaProducer 对象
Producer<String, String> producer = createProducer();
// 创建消息。传入的三个参数,分别是 Topic ,消息的 key ,消息的 message 。
ProducerRecord<String, String> message = new ProducerRecord<>("my-replicated-topic", "1111", "yudaoyuanma");
// 同步发送消息
RecordMetadata metadata = producer.send(message).get();
System.out.println("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());
}
}
2. 生产者的同步发送消息
如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试次数3次。
3. 生产者的异步发消息
异步发送,生产者发送完消息就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法。
package com.example.kafka.test;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MyProducer01 {
private static Producer<String, String> createProducer() {
// 设置 Producer 的属性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094"); // 设置 Broker 的地址
properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 发送失败时,重试发送的次数
// properties.put("batch.size", 16384);
// properties.put("linger.ms", 1);
// properties.put("client.id", "DemoProducer");
// properties.put("buffer.memory", 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 key 的序列化方式
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 消息的 value 的序列化方式
// 创建 KafkaProducer 对象
// 因为我们消息的 key 和 value 都使用 String 类型,所以创建的 Producer 是 <String, String> 的泛型。
return new KafkaProducer<>(properties);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建 KafkaProducer 对象
Producer<String, String> producer = createProducer();
// 创建消息。传入的三个参数,分别是 Topic ,消息的 key ,消息的 message 。
ProducerRecord<String, String> message = new ProducerRecord<>("my-replicated-topic", 1,"1111", "yudaoyuanma");
// 异步发送消息
producer.send(message, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null){
System.err.println("发送消息失败:" + exception.getMessage());
}
if (metadata != null){
System.out.println("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());
}
}
});
Thread.sleep(10000L);
}
}
4. 生产者中的ack的配置
在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?此时ack有3个配置:
-
ack = 0 :kafka-cluster不需要任何的broker收到消息,就会立即返回ack给生产者,最容易丢消息,效率最高。
-
ack = 1 (默认):多副本之间的leader已经收到消息,并把消息写入到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的。
-
ack = -1/all :里面有默认的配置 min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但是性能最差。
关于ack 和 重试(如果没有收到ack,就开启重试)的配置
// 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 发送失败时,重试发送的次数 properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试间隔设置 properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
5. 关于消息发送的缓冲区
-
kafka默认会创建一个小希缓冲区,用来存放要发送的消息,默认32MB
properties.put("buffer.memory", 33554432);
-
kafka本地线程会去缓冲区中一次拉16k的数据,发送到broker
properties.put("batch.size", 16384);
-
如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发送到broker
properties.put("linger.ms", 10);
七、Java客户端消费者的实现细节
1. 消费者的基本实现
package com.example.kafka.test;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer01 {
private static Consumer<String, String> createConsumer() {
// 设置 Producer 的属性
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.18.139:9092,192.168.18.139:9093,192.168.18.139:9094"); // 设置 Broker 的地址
properties.put("group.id", "test-consumer-group"); // 消费者分组
properties.put("auto.offset.reset", "earliest"); // 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
properties.put("enable.auto.commit", true); // 是否自动提交消费进度
properties.put("auto.commit.interval.ms", "1000"); // 自动提交消费进度频率
properties.put("key.deserializer", StringDeserializer.class.getName()); // 消息的 key 的反序列化方式
properties.put("value.deserializer", StringDeserializer.class.getName()); // 消息的 value 的反序列化方式
// 创建 KafkaProducer 对象
// 因为我们消息的 key 和 value 都使用 String 类型,所以创建的 Producer 是 <String, String> 的泛型。
return new KafkaConsumer<>(properties);
}
public static void main(String[] args) {
// 创建 KafkaConsumer 对象
Consumer<String, String> consumer = createConsumer();
// 订阅消息
consumer.subscribe(Collections.singleton("my-replicated-topic"));
// 拉取消息
while (true) {
// 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。
ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
// 遍历处理消息
records.forEach(new java.util.function.Consumer<ConsumerRecord>() {
@Override
public void accept(ConsumerRecord record) {
System.out.println(record.key() + "\t" + record.value());
}
});
}
}
}
2. 关于消费者的自动提交和手动提交offset
1. 提交的内容
消费者无论是自动提交还是手动提交,都需哟把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。
2. 自动提交
消费者poll消息下来以后就会自动提交offset
properties.put("enable.auto.commit", true); // 是否自动提交消费进度
properties.put("auto.commit.interval.ms", "1000"); // 自动提交消费进度频率
注意:自动提交会丢消息。因为消费者在消费前提交offset,有可能提交完后还没消费时消费者挂了。
3. 手动提交
需要把自动提交的配置改成false
properties.put("enable.auto.commit", false); // 是否自动提交消费进度
手动提交又分成了两种:
-
手动同步提交
在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑
// 拉取消息 while (true) { // 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。 ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); // 遍历处理消息 records.forEach(new java.util.function.Consumer<ConsumerRecord>() { @Override public void accept(ConsumerRecord record) { System.out.println(record.key() + "\t" + record.value()); } }); if (records.count() > 0) { consumer.commitSync(); } }
-
手动异步提交
在消息消费完后提交,不需要等到集群的ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
// 拉取消息 while (true) { // 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。 ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); // 遍历处理消息 records.forEach(new java.util.function.Consumer<ConsumerRecord>() { @Override public void accept(ConsumerRecord record) { System.out.println(record.key() + "\t" + record.value()); } }); if (records.count() > 0) { consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null){ System.err.println(offsets); System.err.println(exception.getMessage()); } } }); } }
3. 长轮询poll消息
- 默认情况下, 消费者一次会poll 500条消息
// 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
-
代码中设置了长轮询的时间是1000毫秒
while (true) { // 拉取消息。如果拉取不到消息,阻塞等待最多 10 秒,或者等待拉取到消息。 ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); // 遍历处理消息 records.forEach(new java.util.function.Consumer<ConsumerRecord>() { @Override public void accept(ConsumerRecord record) { System.out.println(record.key() + "\t" + record.value()); } }); }
意味着:
- 如果一次poll到500条,直接执行forEach循坏
- 如果这一次没有poll到500条,且时间在10秒内,那么长轮询继续poll,要么到500条,要么到10s。
- 如果多次poll都没到达500条,且10秒时间到了,那么直接执行forEach循坏。
-
如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制, rebalance机制会造成性能开销,可以通过设置参数,让一次poll的消息条数少一些。
// 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 如果两次poll的时间超过30s的时间间隔,kafka会认为消费者消费能力过弱,将其踢出消费组,将分区分配给其他消费者。 -rebalance properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
4. 消费者的健康状态检查
消费者每个1s向kafka集群发送心跳进行续约,集群发现如果超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组的其他消费者进行消费。
// consumer给broker发送心跳的间隔时间
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
5. 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition("my-replicated-topic",0)));
6. 消息的回溯消费
consumer.assign(Arrays.asList(new TopicPartition("my-replicated-topic", 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition("my-replicated-topic",0)));
7. 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition("my-replicated-topic", 0)));
consumer.seek(new TopicPartition("my-replicated-topic",0),10);
8. 从指定时间点消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。
List<PartitionInfo> partitionInfos = consumer.partitionsFor("my-replicated-topic");
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : partitionInfos) {
map.put(new TopicPartition("my-replicated-topic", par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) {
continue;
}
long offset = value.offset();
System.out.println("partiton-" + key.partition() + "|offset-" + offset);
System.out.println();
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
9. 新消费组的消费offset规则
新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新的消息)。可以通过以下的设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
- latest(默认):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
参考:org.apache.kafka.clients.consumer.ConsumerConfig
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
10. Springboot中使用Kafka
1. 引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. 编写配置文件
server:
port: 8081
spring:
kafka:
bootstrap-servers: 10.88.40.40:9092,10.88.40.40:9093,10.88.40.40:9094 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
producer:
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
retries: 3 # 发送失败时,重试发送的次数
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
batch-size: 16384 # 每次批量发送消息的最大数量
buffer-memory: 33554432 # 每次批量发送消息的最大内存
properties:
spring.json.add.type.headers: false
linger.ms: 30000 # 批处理延迟时间上限。这里配置为 30 * 1000 ms 过后,不管是否消息数量是否到达 batch-size 或者消息大小到达 buffer-memory 后,都直接发送一次请求。
consumer:
auto-offset-reset: latest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #org.springframework.kafka.support.serializer.JsonDeserializer
fetch-max-wait: 1000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
fetch-min-size: 10 # poll 一次消息拉取的最小数据量,单位:字节
max-poll-records: 100 # poll 一次消息拉取的最大数量
properties:
spring.json.trusted.packages: com.example.kafka.message
enable-auto-commit: false
# Kafka Consumer Listener 监听器配置
listener:
type: SINGLE # 监听器类型,默认为 SINGLE ,只监听单条消息。这里我们配置 BATCH ,监听多条消息,批量消费
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
ack-mode: manual_immediate
logging:
level:
org:
springframework:
kafka: ERROR # spring-kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别
apache:
kafka: ERROR # kafka INFO 日志太多了,所以我们限制只打印 ERROR 级别
3. 编写消息生产者
package com.example.kafka.controller;
import javax.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.example.kafka.entity.UserEntity;
/**
* @author Li Zemin
* @since 2024/2/1 9:17
*/
@RestController
@RequestMapping
public class SendController {
@Resource
private KafkaTemplate kafkaTemplate;
@GetMapping("/send")
public String send() {
UserEntity user = UserEntity.builder()
.id(System.currentTimeMillis()+"")
.name(generateRandomString(3))
.age(20)
.build();
this.kafkaTemplate.send("my-replicated-topic", user.getId(), JSON.toJSONString(user));
return "ok";
}
private static String generateRandomString(int len) {
StringBuilder str = new StringBuilder();
for (int i = 0; i < len; i++) {
char result = (char) (0x4e00 + (int) (Math.random() * (0x9fa5 - 0x4e00 + 1)));
str.append(result);
}
return str.toString();
}
}
4. 编写消息消费者
package com.example.kafka.controller;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import com.example.kafka.message.Demo01Message;
import lombok.extern.slf4j.Slf4j;
/**
* @author Li Zemin
* @since 2024/2/1 9:28
*/
@Slf4j
@Component
public class ConsumerController {
@KafkaListener(topics = "my-replicated-topic",groupId = "demo01-A-consumer-group-" + "my-replicated-topic")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("demo01-A-consumer-group-" + "my-replicated-topic");
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
ack.acknowledge();
}
}
5. 消费者中配置消费主题、分区、offset
@KafkaListener(groupId = "demo01-A-consumer-group-" + "my-replicated-topic", topicPartitions = {
@TopicPartition(topic = "my-replicated-topic", partitions = {"0", "1"}),
@TopicPartition(topic = Demo01Message.TOPIC, partitions = "0", partitionOffsets =
@PartitionOffset(partition = "1", initialOffset = "100"))
}, concurrency = "3")
public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("demo01-A-consumer-group-" + "my-replicated-topic");
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), record);
ack.acknowledge();
}
6. 消费者提交模式
九、kafka集群中的controller、rebalance、HW
1. controller
- 集群中谁来充当controller?
每个broker启动时会向zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller,负责这么几件事:- 当集群中有一个副本的leader挂掉,需要在集群中选举一个新的leader,选举的规则是从isr集合中最左边获得。
- 当集群中有broker新增或减少,controller会同步信息给其他broker。
- 当集群中有分区新增或者减少,controller会同步信息给其他broker。
2. rebalance机制
- 前提:消费组中的消费者没有指明分区来消费
- 触发的条件:当消费组中的消费者和分区的关系发生变化的时候
- 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
- range:根据公示计算得到每个消费者消费哪几个分区:前面的消费者是分区总数/消费者数量+1,之后的消费者是分区总数/消费者数量。
- 轮询:大家轮着来
- sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启。
3. HW和LEO
LEO是某个副本最后消息的消息位置(log-end-offset)
HW是已完成同步的位置。消息在写入broker时,且每个broker完成这条消息的同步后,hw才会变化。在这之前消费者是消费不带这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失。
十、Kafka线上问题优化
1. 如何防止消息丢失
- 生产者:1) 使用同步发送 2) 把ack设成1或者all,并且设置同步的分区数>=2
- 消费者:自动提交变成手动提交
2. 如何防止重复消费
在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。
此时生产者会进行重试,于是broker就会收到多条相同的消息,而造成消费者的重复消费。
怎么解决:
-
生产者关闭重试:造成消息丢失(不建议)
-
消费者解决幂等性消费问题:
- 所谓的幂等性:多次访问的结果是一样的。对于rest的请求(get(幂等)、post(非幂等)、put(幂等)、delete(幂等))
解决方案:-
在数据库中创建联合主键,防止相同的主键创建多条记录
-
使用分布式锁,以业务ID作为锁,保证只有一条记录可以创建成功
-
- 所谓的幂等性:多次访问的结果是一样的。对于rest的请求(get(幂等)、post(非幂等)、put(幂等)、delete(幂等))
3. 如何做到消息的顺序消费
- 生产者:保证消息按顺序消费,且消息不丢失——使用同步的发送方式,ack设置成非0的值
- 消费者:主题只能设置一个分区,消费组中只能有一个消费者
- kafka的顺序消费使用场景不多,因为牺牲了性能,但是比如rocketmq已有现成的能力。
4. 如何解决消息积压问题
1) 消息积压问题的出现
消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个kafka对外提供的服务的性能很差,从而造成其他服务也访问变慢,造成服务雪崩。
2) 消息积压的解决方案
- 在这个消费者中,使用多线程,充分利用机器的性能进行消息消费。
- 通过业务的架构设计,提升业务层面消费的性能。
- 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度。
- 创建一个消费者,该消费者在kafka另建一个主题,配上对个分区,多个分区再配上多个消费者。该消费者将poll下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。——不常用
5. 延迟队列
1)应用场景
订单创建后,超过30分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现
2)具体方案
-
kafka中创建相应的主题
-
消费者消费该主题的消息(轮询)
-
消费者消费消息时判断消息的创建时间,是否超过30分钟(前提是订单没支付)
- 如果是:数据库修改订单状态为已取消
- 如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及后面的消息,继续进行判断,以此反复。
十一、Kafka-eagle监控平台
1. 搭建
-
去kafka-eagle官网下载压缩包:EFAK (kafka-eagle.org)
-
配置环境变量
/etc/profile
JAVA_HOME#set java environment export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.402.b06-1.el7_9.x86_64/ export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar export PATH=$PATH:$JAVA_HOME/bin
-
配置环境变量
/etc/profile
export KE_HOME=/opt/kafka-eagle/kafka-eagle-bin-3.0.1/efak-web-3.0.1 export PATH=$PATH:$KE_HOME/bin
-
修改配置文件
system-config.properties
,zk地址及MySQL地址###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### efak.zk.cluster.alias=cluster1 cluster1.zk.list=192.168.18.139:2181 ###################################### # kafka mysql jdbc driver address ###################################### efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://192.168.18.139:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=root
-
启动
kafka-eagle
,进入/opt/kafka-eagle/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin
目录,执行./ke.sh start
评论区