课程目标
- Kafka 产生的背景
- Kafka 的架构
- Kafka 的安装部署和集群部署
- Kafka 的基本操作
- Kafka 的应用
Kafka 的简介
- 高性能
- 高吞吐量
什么是 Kafka
Kafka 是一款分布式消息发布和订阅系统,具有高性能、高吞吐量的特点而被广泛应用与大数据传输场景。它是由 LinkedIn公 司开发,使用 Scala 语言编写,之后成为 Apache 基金会的一个顶级项目。 kafka 提供了类似 JMS 的特性,但是在设计和实现上是完全不同的,而且他也不是 JMS 规范的实现。
Kafka 产生的背景
kafka作为一个消息系统,早起设计的目的是用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)。活动流数据是所有的网站对用户的使用情况做分析的时候要用到的最常规的部分,活动数据包括页面的访问量(PV)、被查看内容方面的信息以及搜索内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性的对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等)。
Kafka 的应用场景
- 内置分区
- 实现集群
spring cloud stream
也有 Kafka 的实现。
由于 kafka 具有更好的吞吐量、内置分区、冗余及容错性的优点 ( kafka 每秒可以处理几十万消息 ) ,让 kafka 成为了一个很好的大规模消息处理应用的解决方案。所以在企业级应用长,主要会应用于如下几个方面:
- 行为跟踪:kafka可以用于跟踪用户浏览页面、搜索及其他行为。通过发布-订阅模式实时记录到对应的topic中,通过后端大数据平台接入处理分析,并做更进一步的实时处理和监控。
- 日志收集:日志收集方面,有很多比较优秀的产品,比如 Apache Flume,很多公司使用kafka 代理日志聚合。日志聚合表示从服务器上收集日志文件,然后放到一个集中的平台(文件服务器)进行处理。在实际应用开发中,我们应用程序的 log 都会输出到本地的磁盘上, 排查问题的话通过 linux 命令来搞定,如果应用程序组成了负载均衡集群,并且集群的机器有几十台以上,那么想通过日志快速定位到问题,就是很麻烦的事情了。所以一般都会做一个日志统一收集平台管理 log 日志用来快速查询重要应用的问题。所以很多公司的套路都是把应用日志集中到 kafka 上,然后分别导入到 es 和 hdfs 上,用来做实时检索分析和离线统计数据备份等。而另一方面,kafka 本身又提供了很好的 api 来集成日志并且做日志收集。
Kafka 本身的架构
一个典型的 kafka 集群包含若干 Producer(可以是应用节点产生的消息,也可以是通过Flume 收集日志产生的事件),若干个 Broker(kafka 支持水平扩展)、若干个 Consumer 、Group,以及一个 zookeeper 集群。kafka 通过 zookeeper 管理集群配置及服务协同。
Producer 使用 push 模式将消息发布到 broker,consumer 通过监听使用 pull 模式从broker 订阅并消费消息。
多个 broker 协同工作, producer 和 consumer 部署在各个业务逻辑中。三者通过zookeeper 管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。图上有一个细节是和其他 mq 中间件不同的点,producer 发送消息到 broker 的过程是 push,而 consumer 从 broker 消费消息的过程是 pull,主动去拉数据。而不是 broker 把数据主动发送给 consumer
Topic 主题
partion 数据分区
group
kafka 的安装部署
下载安装包
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz
kafka_2.11-1.1.0.tgx: http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
安装过程
- tar -zxvf 解压安装包
c[root@Darian1 software]# tar -zxvf kafka_2.11-2.0.0.tgz
[root@Darian1 software]# tar -zxvf kafka_2.11-2.0.0.tgz
1
kafka 目录介绍
/bin
操作 kafka 的可执行脚本/config
配置文件/libs
依赖库目录/logs
日志数据目录
启动 /停止 kafka
- 需要先启动 zookeeper ,如果没有搭建 zookeeper 环境,可以直接运行kafka内嵌的zookeeper 启动命令: bin/zookeeper-server-start.sh config/zookeeper.properties &
- 进入kafka目录,运行 bin/kafka-server-start.sh {-daemon 后台启动} config/server.properties &
- 进入kafka目录,运行bin/kafka-server-stop.sh config/server.properties
运行的外部的 Zookeeper 集群的 Kafka
[root@Darian1 bin]# vim ../config/server.properties
zookeeper.connect=168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties
[root@Darian1 bin]# vim ../config/server.properties
zookeeper.connect=168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties
2
3
4
5
如果超时时间连接较长可以延长时间。
c[root@Darian1 bin]# vim ../config/server.properties zookeeper.connect=192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=30000
[root@Darian1 bin]# vim ../config/server.properties zookeeper.connect=192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=30000
1
2
3
4
5
6
Kafka 的基本操作
前提:
先需要启动 zookeeper
启动 kafka
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties
后台启动kafka
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
创建一个 Topic
[root@Darian1 bin]# sh kafka-topics.sh --create --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --replication-factor 1 --partitions 1 --topic darianTest
Created topic "darianTest".
[root@Darian1 bin]# sh kafka-topics.sh --create --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --replication-factor 1 --partitions 1 --topic darianTest
Created topic "darianTest".
2
3
Replication-factor 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份
Partitions 分区数
查看所有的 Topic
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper localhost:2181
darianTest
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper localhost:2181
darianTest
2
查看 topic 属性
[root@Darian1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --topic darianTest
Topic:darianTest PartitionCount:1 ReplicationFactor:1 Configs:
Topic: darianTest Partition: 0 Leader: 1 Replicas: 1 Isr: 1
[root@Darian1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --topic darianTest
Topic:darianTest PartitionCount:1 ReplicationFactor:1 Configs:
Topic: darianTest Partition: 0 Leader: 1 Replicas: 1 Isr: 1
2
3
4
创建一个控制台发送端
broker-list 不是 zookeeper
[root@Darian1 bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic darianTest
>hello
>helloworld
>dsfsdf
>sdfsd
>
[root@Darian1 bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic darianTest
>hello
>helloworld
>dsfsdf
>sdfsd
>
2
3
4
5
6
创建一个控制台接收端
[root@Darian1 bin]# sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic darianTest --from-beginning
hello
helloworld
dsfsdf
sdfsd
[root@Darian1 bin]# sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic darianTest --from-beginning
hello
helloworld
dsfsdf
sdfsd
2
3
4
5
6
安装集群环境
zookeeper 能够完成 kafka 的集群
修改 server.properties
配置
修改server.properties. broker.id=0 / 1
修改server.properties 修改成本机IP
advertised.listeners=PLAINTEXT://192.168.11.153:9092
当 Kafka broker 启动时,它会在ZK上注册自己的IP和端口号,客户端就通过这个IP和端口号来连接。Kafka 的 listeners 如果需要配置集群的话,需要把自己机器的 IP 配置上去。
192.168.40.128
[root@Darian1 bin]# vim ../config/server.properties
# 在集群里边必须是唯一的,默认情况下 broker.id 都是 0 ,需要标志它的唯一性
broker.id=1 / 2 / 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.128:9092
[root@Darian1 bin]# vim ../config/server.properties
# 在集群里边必须是唯一的,默认情况下 broker.id 都是 0 ,需要标志它的唯一性
broker.id=1 / 2 / 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.128:9092
2
3
4
5
6
7
8
9
192.168.40.129
# 在集群里边必须是唯一的,默认情况下 broker.id 都是 0 ,需要标志它的唯一性
broker.id= 2
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.129:9092
# 在集群里边必须是唯一的,默认情况下 broker.id 都是 0 ,需要标志它的唯一性
broker.id= 2
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.129:9092
2
3
4
192.168.40.131
# 在集群里边必须是唯一的,默认情况下 broker.id 都是 0 ,需要标志它的唯一性
broker.id= 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.131:9092
# 在集群里边必须是唯一的,默认情况下 broker.id 都是 0 ,需要标志它的唯一性
broker.id= 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.131:9092
2
3
4
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
Kafka 启动不好的话,多启动两次。
启动成功以后,看 zookeeper 的节点的变化:
[root@Darian1 bin]# sh zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 12] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 15] get /controller
{"version":1,"brokerid":2,"timestamp":"1548340180496"}
cZxid = 0x100000147
ctime = Thu Jan 24 22:29:40 CST 2019
mZxid = 0x100000147
mtime = Thu Jan 24 22:29:40 CST 2019
pZxid = 0x100000147
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1687fbe81fe000a
dataLength = 54
numChildren = 0
[root@Darian1 bin]# sh zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 12] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 15] get /controller
{"version":1,"brokerid":2,"timestamp":"1548340180496"}
cZxid = 0x100000147
ctime = Thu Jan 24 22:29:40 CST 2019
mZxid = 0x100000147
mtime = Thu Jan 24 22:29:40 CST 2019
pZxid = 0x100000147
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1687fbe81fe000a
dataLength = 54
numChildren = 0
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
每一个节点都有它相应的语义。我们的写请求回落到 Master
节点上,读请求可以走其它节点去读组。他们都是热备可以工作的。他的选举规则是最小的节点也就是最早注册的节点。
中间件都是成熟的商业化的东西。
JAVA API 的使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
2
3
4
5
KafKaConsumer
public class KafkaConsumerDemo extends Thread {
private final KafkaConsumer kafkaConsumer;
public KafkaConsumerDemo(String topic) {
// 设置属性
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
// 消费组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 间隔时间
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 从最早的开始
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); // 超时时间
consumerRecords.forEach(consumerRecord -> {
System.err.println("[message receive]:" + consumerRecord.value());
kafkaConsumer.commitSync();
});
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("test").start();
}
}
public class KafkaConsumerDemo extends Thread {
private final KafkaConsumer kafkaConsumer;
public KafkaConsumerDemo(String topic) {
// 设置属性
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
// 消费组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 间隔时间
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 从最早的开始
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); // 超时时间
consumerRecords.forEach(consumerRecord -> {
System.err.println("[message receive]:" + consumerRecord.value());
kafkaConsumer.commitSync();
});
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("test").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
KafkaProducer
public class KafkaProducerDemo extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final boolean isAsync;
public KafkaProducerDemo(String topic, boolean isAsync) {
// 设置属性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int num = 0;
while (num < 50) {
String message = "message_" + num;
System.err.println("[producer message]:" + message);
if (isAsync) { // 异步发送
producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
if (recordMetadata != null) {
System.err.println("[async-offset]:" + recordMetadata.offset() +
"->[partition]:" + recordMetadata.partition());
}
});
} else { // 同步发送 future / callable
try {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, message)).get();
System.err.println("[sync-offset]:" + recordMetadata.offset() +
"->[partition]:" + recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
}
}
num++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new KafkaProducerDemo("test", true).start();
}
}
public class KafkaProducerDemo extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final boolean isAsync;
public KafkaProducerDemo(String topic, boolean isAsync) {
// 设置属性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int num = 0;
while (num < 50) {
String message = "message_" + num;
System.err.println("[producer message]:" + message);
if (isAsync) { // 异步发送
producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
if (recordMetadata != null) {
System.err.println("[async-offset]:" + recordMetadata.offset() +
"->[partition]:" + recordMetadata.partition());
}
});
} else { // 同步发送 future / callable
try {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, message)).get();
System.err.println("[sync-offset]:" + recordMetadata.offset() +
"->[partition]:" + recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
}
}
num++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new KafkaProducerDemo("test", true).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
Kafka 有两种 API
- 封装的比较好的
- 非常灵活的
Kafka 的配置信息非常的详细
配置信息分析
发送端的可选配置信息分析
ackacks
acks 配置表示 producer 发送消息到 broker 上以后的确认值。有三个可选项
- 0:表示 producer 不需要等待 broker 的消息确认。这个选项时延最小但同时风险最大(因为当 server 宕机时,数据将会丢失)。
- 1:表示 producer 只需要获得 kafka 集群中的 leader 节点确认即可,这个选择时延较小同时确保了 leader 节点确认接收成功。
- all(-1):需要 ISR 中所有的 Replica 给予接收确认(需要集群中的所有节点确认),速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica ,所以设置参数为 all 并不能一定避免数据丢失,
batch.size
生产者发送多个消息到 broker
上的同一个 分区 时,为了减少网络请求带来的性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是 16384byte , 也就是 16kb ,意味着当一批消息大小达到指定的 batch.size
的时候会统一发送。
linger.ms
Producer 默认会把两次发送时间间隔内收集到的所有 Requests 进行一次聚合然后再发送,以此提高吞吐量,而 linger.ms
就是为每次发送到 broker 的请求增加一些 delay,以此来聚合更多的 Message 请求。 这个有点想TCP里面的 Nagle 算法,在 TCP 协议的传输中,为了减少大量小数据包的发送,采用了 Nagle 算法,也就是基于小包的等-停协议。
batch.size
和linger.ms
这两个参数是kafka性能优化的关键参数,很多同学会发现 batch.size 和 linger.ms 这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要求,就会发送请求到 broker 上
max.request.size
设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。
还有重试次数等
消息的同步发送和异步发送:
Kafka 1.0 以后,默认的发送方式都是异步发送消息。
我们的消息去通过 Kafka producer 去 send
以后,这个消息实际上是放到一个后台的发送队列里边,然后通过一个后台的线程,通过不断地从后代的发送队列中不断地取出消息进行发送。发送以后,会进行调用回调方法。就是 Callback
方法进行执行。
同步发送就是用的 future 的时候去阻塞。获得发送信息是进行阻塞。
消费端的可选配置分析
group.id
consumer group
是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例 ( consumer instance ) ,它们共享一个公共的 ID ,即 group ID 。组内的所有消费者协调在一起来消费订阅主题 ( subscribed topics ) 的所有分区 ( partition ) 。当然,每个分区只能由同一个消费组内的一个 consumer 来消费 . 如下图所示,分别有三个消费者,属于两个不同的 group ,那么对于 firstTopic 这个 topic 来说,这两个组的消费者都能同时消费这个 topic 中的消息,对于此事的架构来说,这个 firstTopic 就类似于 ActiveMQ 中的 topic 概念。如右图所示,如果3个消费者都属于同一个 group ,那么此时 firstTopic 就是一个 Queue 的概念。
不同的组消费同一条消息。
同一个组只能有一个 Consumer 能够拿到消息
enable.auto.commit
ENABLE_AUTO_COMMIT_CONFIG
可以设置成 false
。
消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到,还可以配合 auto.commit.interval.ms
控制自动提交的频率。 每一段时间内的消息批量的确认提交。
当然,我们也可以通过 consumer.commitSync()
的方式实现手动提交
{
// 异步 Commit
kafkaConsumer.commitAsync();
// 同步 commit
kafkaConsumer.commitSync();
//
}
// 可以选择不同的 Topic
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
// ...
}
// 可以选择不同的回调接口 callback 接口
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
// ...
}
{
// 异步 Commit
kafkaConsumer.commitAsync();
// 同步 commit
kafkaConsumer.commitSync();
//
}
// 可以选择不同的 Topic
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
// ...
}
// 可以选择不同的回调接口 callback 接口
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
// ...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
auto.offset.reset
这个参数是针对新的 groupid
中的消费者而言的,当有新 groupid
的消费者来消费指定的 topic
时,对于该参数的配置,会有不同的语义
auto.offset.reset=latest
情况下,新的消费者将会从其他消费者最后消费的 offset 处开始消费 Topic 下的消息。auto.offset.reset= earliest
情况下,新的消费者会从该 topic 最早的消息开始消费,(对于新的 groupId 来说,重置 offset )。auto.offset.reset=none
情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
没有消费组,就会抛出异常。
max.poll.records
此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔。
Kafka 工具:
消息都会写道磁盘上,只要磁盘上这条消息可以存在,那么我换不同的 GroupId
,那么就可以一直去消费这条消息。
Spring -kafka 集成
Spring 整合 Kafka 实现注册成功以后去设置抽奖名额(赠送一个抽奖机会)。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定义应用的名称
spring.application.name=spring-cloud-stream-kafka
# 配置 Web 服务端口
server.port=8080
# 失效管理安全
management.security.enabled=false
# 配置需要的 kafka 主题
kafka.topic.test=test
kafka.topic.darian=darian
# 配置 kafka 的 zookeeper 的节点
#spring.cloud.stream.kafka.binder.zk-nodes=192.168.136.128:2181
spring.cloud.stream.kafka.streams.binder.configuration.zk-nodes=192.168.40.128:2181
# 配置 Spring Kafka 配置信息
spring.kafka.bootstrap-servers=192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092
# Kafka 生产者配置
spring.kafka.producer.bootstrap-servers=192.168.40.128:9092
spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka 消费者配置
spring.kafka.consumer.group-id=darian-1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 定义应用的名称
spring.application.name=spring-cloud-stream-kafka
# 配置 Web 服务端口
server.port=8080
# 失效管理安全
management.security.enabled=false
# 配置需要的 kafka 主题
kafka.topic.test=test
kafka.topic.darian=darian
# 配置 kafka 的 zookeeper 的节点
#spring.cloud.stream.kafka.binder.zk-nodes=192.168.136.128:2181
spring.cloud.stream.kafka.streams.binder.configuration.zk-nodes=192.168.40.128:2181
# 配置 Spring Kafka 配置信息
spring.kafka.bootstrap-servers=192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092
# Kafka 生产者配置
spring.kafka.producer.bootstrap-servers=192.168.40.128:9092
spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka 消费者配置
spring.kafka.consumer.group-id=darian-1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
KafkaConsumerListener
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "${kafka.topic.test}")
public void onMessageTest(String message) {
System.out.println("Kafka test 消费者监听器,接收到消息:" + message);
}
@KafkaListener(topics = "${kafka.topic.darian}")
public void onMessageDarian(String message) {
System.out.println("Kafka darian消费者监听器,接收到消息:" + message);
}
}
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "${kafka.topic.test}")
public void onMessageTest(String message) {
System.out.println("Kafka test 消费者监听器,接收到消息:" + message);
}
@KafkaListener(topics = "${kafka.topic.darian}")
public void onMessageDarian(String message) {
System.out.println("Kafka darian消费者监听器,接收到消息:" + message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
KafkaProducerController
@RequiredArgsConstructor
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/message/send/{topic}")
public TopicMessage sendMessage(
@PathVariable String topic,
@RequestParam String message) {
if ((!"darian".equals(topic)) && (!"test".equals(topic))) {
return new TopicMessage(topic, message, false, "topic【" + topic + "】 不存在");
}
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
return new TopicMessage(topic, message, true, "success");
}
@AllArgsConstructor
@Data
public static class TopicMessage {
private String topic;
private String message;
private boolean send;
private String errorMessage;
}
}
@RequiredArgsConstructor
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/message/send/{topic}")
public TopicMessage sendMessage(
@PathVariable String topic,
@RequestParam String message) {
if ((!"darian".equals(topic)) && (!"test".equals(topic))) {
return new TopicMessage(topic, message, false, "topic【" + topic + "】 不存在");
}
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
return new TopicMessage(topic, message, true, "success");
}
@AllArgsConstructor
@Data
public static class TopicMessage {
private String topic;
private String message;
private boolean send;
private String errorMessage;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28