做快照网站和推广 哪个效果好广州企业网站建设
Kafka基本概念
kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独 特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确并没有完全遵循JMS规范。
Kafka中一些基本概念。

安装并配置kafka
安装和配置的步骤,可以参考下面的官网
http://kafka.apache.org/quickstart
下载,解压
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -zxvf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0/
cd config/
vi server.properties
修改配置,注意修改如下,
broker.id=0 #当前broker的一个唯一身份标志
listeners=PLAINTEXT://192.168.0.101:9092 #生产者消费者通过下面的端口就行通信
log.dirs=/home/allen/packages/kafka_2.13-2.8.0/log/kafka-logs #log地址
zookeeper.connect=localhost:2181 #zookeeper的地址和端口
启动kafka
启动zookeeper
cd /home/allen/packages/apache-zookeeper-3.5.8-bin/bin
./zkServer.sh start
./zkCli.sh
启动kafka:
cd /home/allen/packages/kafka_2.13-2.8.0
./bin/kafka-server-start.sh -daemon config/server.properties
查看启动结果:
在zookeeper客户端查看zookeeper的变化:
创建主题
创建一个主题名字为topic名字为AllenTest1
bin/kafka-topics.sh --create --zookeeper 192.168.0.101:2181 --replication-factor 1 --partitions 1 --topic AllenTest1#zookeeper 192.168.0.101:2181 #zookeeper的通信地址
#replication-factor 1 #复制因子为1
#partitions 1 #一个partitions
#AllenTest1 #名字为AllenTest1
创建结果:
删除主题:
bin/kafka‐topics.sh ‐‐delete ‐‐topic test ‐‐zookeeper 192.168.0.101:2181
发布消息:
bin/kafka-topics.sh --create --zookeeper 192.168.0.101:2181 --replication-factor 1 --partitions 1 --topic AllenTest1
bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092 --topic AllenTest1
>Alle
>Allen1
>Allen2
>Allen3
消费消息:
如下kafka里面的消息是存在磁盘的,所以可以被重复消费,如下,可以被消费多次。
默认这些消息是保留一周的,可以配置修改。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --from-beginning --topic AllenTest1
cd kafka_2.13-2.8.0/
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --from-beginning --topic AllenTest1
Alle
Allen1
Allen2
Allen3
[allen@localhost kafka_2.13-2.8.0]$
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --from-beginning --topic AllenTest1
Alle
Allen1
Allen2
Allen3
默认是消费最新的消息
bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.0.101:9092 ‐‐topic AllenTest1
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --topic AllenTest1
Allen4
Allen5
Kafka 消费组的概念
单播模式
类似Queue模式一个消费组里面可以有一个或多个消费者, 一个消费组里面只有一个消费者能消费到一个topic的消息。
所以一个topic如果只接了一个消费组,那么这个topic上的一条消息只能被该消费组的某一个消费者消费,这种模式,类似queue模式。也称之为单播模式。
创建消费者1,group.id=AllenGroup1
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --consumer-property group.id=AllenGroup1 --topic AllenTest1
创建消费者2,group.id=AllenGroup1
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --consumer-property group.id=AllenGroup1 --topic AllenTest1
producer中发布消息
bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092 --topic AllenTest1
如上图所示,只有消费者1才能消费到消息。
多播模式
Kafka的发布订阅模式
一个topic接了多个消费组, 每个组里面的消费者都可以收到相同的消息。类似于发布订阅模式。
创建消费者1, group.id=AllenGroup1
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --consumer-property group.id=AllenGroup1 --topic AllenTest1
创建消费者2, group.id=AllenGroup2
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.101:9092 --consumer-property group.id=AllenGroup2 --topic AllenTest1
发布消息:
如上图示可以看出, 不同消费组的2个消费者都是可以接受到相同消息的。
查看当前broker上有多少个消费组:
[allen@localhost kafka_2.13-2.8.0]$ bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.101:9092 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
AllenGroup1
AllenGroup2
[allen@localhost kafka_2.13-2.8.0]$
理解消费组中的偏移量
在kafka中,记录消费相关的消息都是记录在消费组里面的, 和消费者没什么关系,如下,命令查看消费组AllenGroup1的相关信息,
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.101:9092 --describe --group AllenGroup1
Kafka分区的概念Partition
一个主题下面有多个分区,真正的message是存在对应的分区下面,主题只是一个逻辑的概念。
分布式存储,将message的消息以文件的形式,存储在不同的分区上, 把分区放到分布式系统中去。
Partition是一个有序的message序列,这些message按顺序添加到一个叫做commit log的文件中。
每个partition中的 消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。
每个partition,都对应一个commit log文件。
一个partition中的message的offset都是唯一的,但是不同的partition 中的message的offset可能是相同的。
kafka一般不会删除消息,不管这些消息有没有被消费。只会根据配置的日志保留时间(log.retention.hours)确认消息多 久被删除,默认保留最近一周的日志消息。
kafka的性能与保留的消息数据量大小没有关系,因此保存大量的数据消息日 志信息不会有什么影响。
每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自 己来维护;一般情况下我们按照顺序逐条消费commit log中的消息,当然我可以通过指定offset来重复消费某些消息, 或者跳过某些消息。 这意味kafka中的consumer对集群的影响是非常小的,添加一个或者减少一个consumer,对于集群或者其他consumer 来说,都是没有影响的,因为每个consumer维护各自的消费offset。
创建一个多分区的topic
bin/kafka-topics.sh --create --zookeeper 192.168.0.101:2181 --replication-factor 1 --partitions 2 --topic AllenPartitionTopic1
查看这个topic:
bin/kafka-topics.sh --describe --zookeeper 192.168.0.101:2181 --topic AllenPartitionTopic1