课程目标
Topic & Partition
消息分发策略
消息消费原理
消息的存储策略
Partition 副本机制
关于 Topic 和 Partition
Topic
在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个类别。物理上来说,不同的 Topic 的消息是分开存储的。
每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。
Partition
每个 topic 可以划分多个分区(每个 Topic 至少有一个分区),同一 Topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka只保证在同一个分区内的消息是有序的。
分区可以理解为数据库层面上的分表操作
下图中,对于名字为 test 的 topic,做了 3 个分区,分别是
p0、p1、p2
➢ 每一条消息发送到 broker 时,会根据 partition 的规则选择存储到哪一个 partition 。如果 partition 规则设置合理,那么所有的消息会均匀的分布在不同的 partition 中, 这样就有点类似数据库的分库分表的概念,把数据做了分片处理。
每一个分区里的数字就是一个 offset ,它是一个 类似于游标的概念。这个数字不是数据,是一个 offset,通过 offset 找到对应的数据内容。每一个 分区的内容是追加的。一个顺序写入的规则。顺序递增。Kafka 可以保证它每一个 topic 里的每一个 分区 的数据都是顺序的。跨分区是不保证顺序的。这是 partition 的改变。
Topic & Partition 的存储
Partition 是以文件的形式存储在文件系统中,比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录, firstTopic-0~3, 命名规则是<topic_name>-<partition_id> ,每一个 Topic 的存储是以 Partition 的存储。
[root@Darian1 bin]# sh kafka-topics.sh --create --zookeeper 192.168.40.128:2181 --replication-factor=1 --partitions 3 --topic dariantest
Created topic "dariantest".
[root@Darian1 bin]# cd /software/zookeeper-3.4.10/bin/
[root@Darian1 bin]# sh zkCli.sh
[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics
[dariantest]
[root@Darian1 bin]# sh kafka-topics.sh --create --zookeeper 192.168.40.128:2181 --replication-factor=1 --partitions 3 --topic dariantest
Created topic "dariantest".
[root@Darian1 bin]# cd /software/zookeeper-3.4.10/bin/
[root@Darian1 bin]# sh zkCli.sh
[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics
[dariantest]
2
3
4
5
6
7
8
9
192.168.40.129
[root@Darian3 bin]# ls /tmp/kafka-logs/
... dariantest-0 ...
[root@Darian3 bin]# ls /tmp/kafka-logs/
... dariantest-0 ...
2
192.168.40.131
192.168.40.131
[root@Darian1 bin]# ls /tmp/kafka-logs/
... dariantest-1 ...
[root@Darian1 bin]# ls /tmp/kafka-logs/
... dariantest-1 ...
2
192.168.40.128
[root@Darian1 bin]# ls /tmp/kafka-logs/
... dariantest-2 ....
[root@Darian1 bin]# ls /tmp/kafka-logs/
... dariantest-2 ....
2
关于消息分发
kafka 消息分发策略
消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由 key、value 两部分构成,在发送一条消息时,我们可以指定这个 key,那么 producer 会根据 key 和 partition 机制来判断当前这条消息应该发送并存储到哪个 partition 中。我们可以根据需要进行扩展 producer 的 partition 机制。
自定义分区策略代码演示
默认的 Kafka 会根据 Key 去计算,我们也可以去扩展自己的分区策略。
/***
* 自定义分区策略
*/
public class MyParitition implements Partitioner {
private final Random random = new Random();
/***
* 重写发送的策略
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 根据消息拿到具体的分区列表
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionNum = 0;
if (key == null) {
partitionNum = random.nextInt(partitionInfos.size()); // 随机的分区
} else {
partitionNum = Math.abs(key.hashCode() % partitionInfos.size()); // Hash 取模运算
}
System.err.println("[key]:\t" + key + "[partitionNum]:\t" + partitionNum + "[value]:\t" + value);
return partitionNum;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
/***
* 自定义分区策略
*/
public class MyParitition implements Partitioner {
private final Random random = new Random();
/***
* 重写发送的策略
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 根据消息拿到具体的分区列表
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionNum = 0;
if (key == null) {
partitionNum = random.nextInt(partitionInfos.size()); // 随机的分区
} else {
partitionNum = Math.abs(key.hashCode() % partitionInfos.size()); // Hash 取模运算
}
System.err.println("[key]:\t" + key + "[partitionNum]:\t" + partitionNum + "[value]:\t" + value);
return partitionNum;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyParitition.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyParitition.class.getName());
消息默认的分发机制
默认情况下,kafka 采用的是 hash 取模的分区算法。如果Key 为 null,则会随机分配一个分区。这个随机是在这个参数 metadata.max.age.ms
的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区。这个值默认情况下是 10 分钟更新一次。会保存在内存里边。
关于 Metadata ,这个之前没讲过,简单理解就是 Topic/Partition
和 broker
的映射关系,每一个 topic 的每一个 partition,需要知道对应的 broker 列表是什么,leader是谁、follower 是谁。这些信息都是存储在 Metadata 这个类里面。他的 broker 的 partition 的状态可能发生变化,意味着它要更新他的状态信息。
消费端如何消费指定的分区
通过下面的代码,就可以消费指定该 topic 下的 0 号分区。其他分区的数据就无法接收。
// 消费指定分区的时候,不需要再订阅
// kafkaConsumer.subscribe(Collections.singleto nList(topic));
// 消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));
// 消费指定分区的时候,不需要再订阅
// kafkaConsumer.subscribe(Collections.singleto nList(topic));
// 消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));
2
3
4
5
6
7
他也是可以消费多个分区的消息的。
kafka 消息消费原理演示
在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,也就是我们接下来要了解的,在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的同时,在上一节课,我们讲了,kafka 存在 consumer group的概念, 也就是 group.id
一样的 consumer ,这些 consumer 属于一个 consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的 consumer 来消费,那么同一个consumer group 里面的 consumer 是怎么去分配该消费哪个分区里的数据的呢?如下图所示,3 个分区,3 个消费者,那么哪个消费者消分哪个分区?
对于上面这个图来说,这 3 个消费者会分别消费 test 这个topic 的 3 个分区, 也就是每个 consumer 消费一个partition。
如果有三个分区,有四个消费者,会有一个消费者消费不到。
如果有三个分区,有两个消费者,会有一个消费者消费两个分区。
如果消费者比 partition 多的话浪费,所以我们不建议去设置多的消费者。
- 我们实际过程使用过程中,consumer 如果比 partition 数量多的话,实际上是浪费的。所以我们不建议去设置比较多的消费者。因为 Kafka 的设计是在一个 partition 上是不允许并发的。
- 如果 consumer 比 partition 数量少的话,就会有 consumer 消费多个 partition。如果,我们的消费者的能力本身就比较强的话,我就可以去合理的做一个负载。我一个消费者可以消费两个到三个。
- consumer 最好是 partition 的整数倍。整数倍,意味着我们的消费者能够合理的分发。
- 如果我们的 consumer 消费了多个 partition ,那么它是不保证顺序性的。他只能说对一个分区保证顺序性,但是跨分区,它是不保证顺序性。
增减 consumer 、broker、partition 会导致 Rebalance。重新负载。
什么是分区分配策略
通过前面的案例演示,我们应该能猜到,同一个 group 中的消费者对于一个 topic 中的多个 partition,存在一定的分区分配策略。
在 kafka 中,存在两种分区分配策略,一种是 Range ( 默认 ) 、另 一 种 还 是 RoundRobin( 轮 询 )。通过 partition.assignment.strategy
这个参数来设置。
Range strategy(范围分区)
Range 策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假设我们有 10 个分区,3 个消费者,排完序的分区将会是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是 C1-0 , C2-0 , C3-0 。然后将 partitions 的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,
我们有 10 个分区,3 个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
- C1-0 将消费 0, 1, 2, 3 分区
- C2-0 将消费 4, 5, 6 分区
- C3-0 将消费 7, 8, 9 分区
假如我们有 11 个分区,那么最后分区分配的结果看起来是这样的:
- C1-0 将消费 0, 1, 2, 3 分区
- C2-0 将消费 4, 5, 6, 7 分区
- C3-0 将消费 8, 9, 10 分区
假如我们有 2 个主题 ( T1 和 T2 ) ,分别有 10 个分区,那么最后分区分配的结果看起来是这样的:
- C1-0 将消费 T1 主题的 0, 1, 2, 3 分区以及 T2 主题的 0, 1, 2, 3 分区
- C2-0 将消费 T1 主题的 4, 5, 6 分区以及 T2 主题的 4, 5, 6 分区
- C3-0 将消费 T1 主题的 7, 8, 9 分区以及 T2 主题的 7, 8, 9 分区
可以看出,C1-0 消费者线程比其他消费者线程多消费了 2 个分区,这就是 Range strategy 的一个很明显的弊端
RoundRobin strategy(轮询分区)
轮询分区策略是把所有 partition 和所有 consumer 线程都列出来,然后按照 hashcode 进行排序。最后通过轮询算法分配 partition 给消费线程。如果所有 consumer 实例的订阅是相同的,那么 partition 会均匀分布。
在我们的例子里面,假如按照 hashCode 排序完的 topic-partitions 组依次为 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4,T1-7, T1-6, T1-9,我们的消费者线程排序为 C1-0, C1-1, C2- 0, C2-1,最后分区分配的结果为:
- C1-0 将消费 T1-5, T1-2, T1-6 分区;
- C1-1 将消费 T1-3, T1-1, T1-9 分区;
- C2-0 将消费 T1-0, T1-4 分区;
- C2-1 将消费 T1-8, T1-7 分区;
使用轮询分区策略必须满足两个条件
每个主题的消费者实例具有相同数量的流
每个消费者订阅的主题必须是相同的
什么时候会触发这个策略呢?
当出现以下几种情况时,kafka 会进行一次分区分配操作, 也就是 kafka consumer 的 rebalance
同一个 consumer group 内新增了消费者
消费者离开当前所属的 consumer group,比如主动停机或者宕机
Topic 新增了分区(也就是分区数量发生了变化)
消费者主动取消订阅 Topic。
kafka consumer 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。而具体如何执行分区策略,就是前面提到过的两种内置的分区策略。而 kafka 对于分配策略这块,提供了可插拔的实现方式, 也就是说,除了这两种之外,我们还可以创建自己的分配机制。
谁来执行 Rebalance 以及管理 consumer 的 group 呢?
Kafka 提供了一个角色:coordinator 来执行对于 consumer group 的管理,当 consumer group 的第一个 consumer 启动的时候,它会去和 kafka server 确定谁是它们组的 coordinator。之后该 group 内的所有成员都会和该 coordinator 进行协调通信。
如何确定 coordinator
consumer group 如何确定自己的 coordinator 是谁呢, 消费者向 kafka 集 群 中 的 任 意 一 个 broker 发 送 一 个GroupCoordinatorRequest 请求,服务端会返回一个负载最 小 的 broker 节 点 的 id , 并 将 该 broker 设 置 为coordinator
JoinGroup 的过程
在 rebalance 之前,需要保证 coordinator 是已经确定好了的,整个 rebalance 的过程分为两个步骤,Join 和 Sync
join: 表示加入到 consumer group 中,在这一步中,所有的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求,那么 coordinator 会选择一个 consumer 担任 leader 角色,并把组成员信息和订阅信息发送消费者。
protocol_metadata
: 序列化后的消费者的订阅信息leader_id
: 消费组中的消费者,coordinator 会选择一个座位 leader,对应的就是 member_idmember_metadata
对应消费者的订阅信息members
:consumer group 中全部的消费者的订阅信息 ,只有 leader 才会受到 members 的信息。generation_id
: 年代信息,类似于之前讲解 zookeeper 的时候的 epoch 是一样的, 对于每一轮 rebalance ,generation_id 都会递增。主要用来保护 consumer group。隔离无效的 offset 提交。也就是上一轮的 consumer 成员无法提交 offset 到新的 consumer group 中。
建立好连接以后,会发送心跳。
Synchronizing Group State 阶段
完成分区分配之后,就进入了 Synchronizing Group State 阶段 ,主要逻辑是向 GroupCoordinator 发 送 SyncGroupRequest 请求,并且处理 SyncGroupResponse 响应,简单来说,就是 leader 将消费者对应的 partition 分配方案同步给 consumer group 中的所有 consumer。
每个消费者都会向 coordinator 发送 syncgroup 请求,不过只有 leader 节点会发送分配方案,其他消费者只是打打酱油而已。当 leader 把方案发给 coordinator 以后,coordinator 会把结果设置到 SyncGroupResponse 中。这样所有成员都知道自己应该消费哪个分区。
- consumer group 的分区分配方案是在客户端执行的!Kafka 将这个权利下放给客户端主要是因为这样做可以有更好的灵活性
一开始是在分区分配方案是在 zookeeper 执行,后来都是 客户端执行。
如何保存消费端的消费位置
什么是 offset
前面在讲解 partition 的时候,提到过 offset, 每个 topic可以划分多个分区(每个 Topic 至少有一个分区),同一 topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它 是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka 只保证在同一个分区内的消息是有序的; 对于应用层的消费来说, 每次消费一个消息并且提交以后,会保存当前消费到的最 近的一个 offset。那么 offset 保存在哪里?
offset 在哪里维护?
在 kafka 中,提供了一个 consumer_offsets_* 的一个topic , 把 offset 信 息 写 入 到 这 个 topic 中 。 consumer_offsets——按保存了每个 consumer group 某一时刻提交的 offset 信息。 consumer_offsets 默认有50 个分区。
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[test, __consumer_offsets, dariantest]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]
[zk: localhost:2181(CONNECTED) 5]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[test, __consumer_offsets, dariantest]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]
[zk: localhost:2181(CONNECTED) 5]
2
3
4
5
6
7
[root@Darian1 ~]# clear
[root@Darian1 ~]# ls /tmp/kafka-logs/
cleaner-offset-checkpoint __consumer_offsets-13 __consumer_offsets-22 __consumer_offsets-31 __consumer_offsets-4 __consumer_offsets-46 dariantest-2 recovery-point-offset-checkpoint
__consumer_offsets-1 __consumer_offsets-16 __consumer_offsets-25 __consumer_offsets-34 __consumer_offsets-40 __consumer_offsets-49 log-start-offset-checkpoint replication-offset-checkpoint
__consumer_offsets-10 __consumer_offsets-19 __consumer_offsets-28 __consumer_offsets-37 __consumer_offsets-43 __consumer_offsets-7 meta.properties
[root@Darian1 ~]# clear
[root@Darian1 ~]# ls /tmp/kafka-logs/
cleaner-offset-checkpoint __consumer_offsets-13 __consumer_offsets-22 __consumer_offsets-31 __consumer_offsets-4 __consumer_offsets-46 dariantest-2 recovery-point-offset-checkpoint
__consumer_offsets-1 __consumer_offsets-16 __consumer_offsets-25 __consumer_offsets-34 __consumer_offsets-40 __consumer_offsets-49 log-start-offset-checkpoint replication-offset-checkpoint
__consumer_offsets-10 __consumer_offsets-19 __consumer_offsets-28 __consumer_offsets-37 __consumer_offsets-43 __consumer_offsets-7 meta.properties
2
3
4
5
根 据 前 面 我 们 演 示 的 案 例 , 我 们 设 置 了 一 个 KafkaConsumerDemo 的 groupid。首先我们需要找到这个 consumer_group 保存在哪个分区中
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
- 计算公式
- java
Math.abs(“groupid”.hashCode())%groupMetadataTopi cPartitionCount ; // Math.abs("consumerDemo".hashCode % 50 );
Math.abs(“groupid”.hashCode())%groupMetadataTopi cPartitionCount ; // Math.abs("consumerDemo".hashCode % 50 );
1
2由于默认情况下
groupMetadataTopicPartitionCount
有 50 个分区,计算得到的结果为:35, 意味着当前的consumer_group
的位移信息保存在consumer_offsets
的第 35 个分区 执行如下命令,可以查看当前
consumer_goup
中的 offset 位移信息csh kafka-simple-consumer-shell.sh --topic consumer_offsets --partition 5 --broker-list 192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
sh kafka-simple-consumer-shell.sh --topic consumer_offsets --partition 5 --broker-list 192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
1c[root@Darian3 bin]# sh kafka-consumer-groups.sh --bootstrap-server 192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092 --describe --group KafkaConsumerDemo Consumer group 'KafkaConsumerDemo' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 115 115 0 - - -
[root@Darian3 bin]# sh kafka-consumer-groups.sh --bootstrap-server 192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092 --describe --group KafkaConsumerDemo Consumer group 'KafkaConsumerDemo' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 115 115 0 - - -
1
2
3
4
5c[root@Darian3 bin]# sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092' --topic 'test' --time -1 test:0:115
[root@Darian3 bin]# sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092' --topic 'test' --time -1 test:0:115
1
2从输出结果中,我们就可以看到 test 这个 topic 的 offset的位移日志。
192.168.40.129 | 192.168.40.130 | 192.168.40.131 |
---|---|---|
消息的存储
消息的保存路径
消息发送端发送消息到 broker 上以后,消息是如何持久化的呢?那么接下来去分析下消息的存储
首先我们需要了解的是,kafka 是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,Log 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录, 这个目录的明明规则是<topic_name>_<partition_id>
比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3
多个分区在集群中的分配
如果我们对于一个 topic,在集群中创建多个 partition,那么 partition 是如何分布的呢?
将所有 N Broker 和待分配的 i 个 Partition 排序
将第 i 个 Partition 分配到第(i mod n)个 Broker 上
了解到这里的时候,大家再结合前面讲的消息分发策略, 就应该能明白消息发送到 broker 上,消息会保存到哪个分区中,并且消费端应该消费哪些分区的数据了。
消息写入的性能
我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。即使是这样,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka 还有一个性能策略
零拷贝
消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单, 但实际上经历了很多步骤。
▪ 操作系统将数据从磁盘读入到内核空间的页缓存
▪ 应用程序将数据从内核空间读入到用户空间缓存中
▪ 应用程序将数据写回到内核空间到 socket 缓存中
▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。
通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket; 在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API
使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中, 只有最后一步将数据拷贝到网卡缓存中是需要的。
https://www.cnblogs.com/dadonggg/p/8205302.html kafka 管理工具。