Kafka笔记

企业中离线业务场景实时业务场景都需要使用到kafka

Kafka具备数据的计算能力和存储能力,但是两个能力相对(MR/SPARK,HDFS)较弱.

Kafka角色的角色与hbase比较像,层级关系比较多。

 

消息:应用之间传送的数据,或点与点之间,点与多点之间传递的数据,传递的信息。

消息队列:是一种应用间的通信方式以队列的形式传递。

 

消息队列的应用场景

应用解耦合:多应用间通过消息队列对同一消息进行处理

异步处理:多应用对消息队列中同一消息进行处理

限流削峰:用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;

 

消息驱动的系统:有前面消息传递到后,后面的才会触发动作。

在学习MR时做的游戏,后面的同学在没有接收到前面同学的信息之前是不允许操作的,这个就是消息系统。

 

消息系统的方式

  • 点对点的方式

消息队列

发送者 (生产者):生产数据的程序/人/对象

接收者(消费者):处理队列内的数据的程序/人/对象

每个消息只有一个接收者,发送者和接收者间没有依赖性,接收者在成功接收消息之后需向队列应答成功。

 

  • 发布-订阅的方式

角色主题(Topic):消息的分类,分组

发布者(Publisher):生产者

订阅者(Subscriber):消费者

每个消息可以有多个订阅者,发布者和订阅者之间有时间上的依赖性,订阅者需要提前订阅该角色主题。

 

 

 

 

Kafka:是一个分布式的(可以多节点),分区的,多副本的,多订阅者的消息发布订阅系统。

Kafka对消息分类使用topic(一个分类,一个类别)

生产者:Producer(制造数据、生产数据的,将消息推送到队列的)

消费者:Consumer(读取数据的,浏览数据的,在队列中获取数据)

服务器:Broker

 

 

Kafka的好处

可靠性高:分布式的—>处理能力快,分区的—>数据读取速度快,副本机制à 防止数据丢失。

可扩展性:动态扩展,添加节点。

耐用性:kafka讲数据写入磁盘

高性能:对于发布和订阅消息都具有高吞吐量。保证零停机和零数据丢失。

 

[info]

Kafka的补充说明

kafka消息保留在磁盘上,并在集群内复制以防止数据丢失(不能提高数据的读取效率)。

消费端为拉模型来主动拉取数据。

 

Consumer Group:每一个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定 groupName)

Broker:kafka集群中包含一个或者多个服务实例

Topic:每条发布到kafka集群的消息都有一个类别,分类

Partition:Partition是一个物理上的概念,每个Topic包含一个或者多个Partition

 

segment:一个partition当中存在多个segment文件段,每个segment分为两部分,.log文件和.index文件,

其中.index文件是索引文件,主要用于快速查询.log文件当中数据的偏移量位置

.log存放数据文件

[/info]

 

Kafka的适用场景

  • 指标分析,通常用于操作监控数据辅助分析。
  • 日志的汇聚,在多节点,多业务生产数据的系统直接调用kafka Producer的api发送数据

  • 流式处理,kafka可以实时传递数据到实时计算框架中(sparkstreaming   flink)。

 

 

 

Kafka架构

  • Producers:生产数据到topic, topic 可以是一个或多个。
  • Consumers:在一个或多个topic读取数据,消费数据。
  • Connectors:把kafka主题连接到现有的应用程序或数据系统

  • StreamProcessors:应用充当处理器。

架构细节图

 

 

 

 

 

 

Kafka主要组件的说明

生产者(Producer):主要是用于生产消息,是kafka当中的消息生产者,生产的消息通过topic进行归类,保存到kafka的broker里面去

主题(Topic):可以实是0个也可以是多个,每条数据必须有一个所属的topic。Topic的数量可以多个。

分区(Partition):一个topic内部可以有多个分区,分区内部的数据是有序的。多个分区时无序的.分区数在创建topic时设置,并后期可以修改。Partition数量决定了每个Consumer group中并发消费者的最大数量(效率最高的情况)。

Partition = 并发度:  刚刚好,效率最高

Partition > 并发度:有部分消费者消费多个分区的数据。

Partition < 并发度 :有部分消费者闲置(任意时刻一个分区内的一条数据只能被消费组中的一个消费者消费s)

副本(Replicas):副本数量一般情况下小于等于broker的个数,每个分区都有各自的主副本和从副本。副本有主(在哪里复制的)-从(复制出来的)概念,副本是通过拉取的方式进行的复制。消费者和生产者都是从leader读写数据,不与follower交互。

一个分区有三个副本因子,主数据丢失,会在剩余的两个中将一个从变为主,不会去做丢失数据的恢复。

lsr表示:当前可用的副本(是一个列表,表示数据副本在那几个节点上)

 

segment:一个分区内有一个或多个segment,每个segment由一个.log文件和一个.index文件。Log存储数据,index保存数据(log里面数据)的索引

 

 

 

 

Kafka 常用命令

添加topic

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic 18BD34_1

查看topic

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic 18BD34-1

查看topic结构

bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test

删除topic

bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --delete --topic 18BD34_1

模拟生产者

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic 18BD34-1

模拟消费者

bin/kafka-console-consumer.sh --from-beginning --topic 18BD34-1  --zookeeper node01:2181,node02:2181,node03:2181

添加配置

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1

删除配置

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages

修改分区

bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8

 

 

 

kafka生产数据到topic的分区的原则

1、当没有指定分区号和key时使用轮训(轮训分区)策略存储数据

2、当制定了key时,分区策略为对key求hash,  hash后的值角色存在哪个分区。

3、当指定分区编号时,所有的数据全部打入该分区

4、自定义分区

 

 

[info]

为什么要手动提交offset?????

自动提交不能offset 更新的及时性。

例如:10S 提交一次

  • 提交了一次      offset    100

第10秒  宕机      offset    700  (100-700已经消费)下次在100开始消费就会造成重复消费(100-700重复)

11  提交第二次     offset     800

 

手动提交风险更低,offset更新更及时。

 

常用的方案:将offset接入redis或hbase

[/info]

 

 

Consumer消费数据的流程

  • Consumer连接指定的Topic partition所在leader broker
  • 采用pull方式从kafkalogs中获取消息。

 

高阶API: 隐藏Consumer与Broker细节,封装好的接口。工程师直接使用,无需关注细节。

低阶API:使用灵活,用户自己维护连接Controller Broker,存储,更新offset。

 

 

实时计算:实时成产-实时传递-实时计算-实时存储-实时展现

.log文件的大小为1G

#索引文件

#日志内容

 

00000000000000000000.index

00000000000000000000.log

 

00000000000000012312.index

00000000000000012312.log

 

 

00000000000000034243.index

00000000000000034243log

 

后面的segment文件的名字如何命名?

来源于上一个segment最后一条数据的全局的偏移量!!

 

kafka数据如何快速查找到数据??

1、首先通过折半查找,确定数据在哪个segment

2、通过index文件确定数据的具体位置。

 

 

kafkalog CleanUp(删除)

 

kafka作用:临时缓存数据。不是永久存储数据。Hdfs是永久存储。

什么时候删除数据??

Kafka删除数据机制-合并机制(相同key的数据只保留最后一个版本)

  • 时间维度:默认168小时(7天)
  • 数据库量:默认 -1 (无限制)

 

 

Kafka如何保证数据不丢失?

生产者如何保证数据不丢失? Ack应答机制!!

Broker如何保证数据不丢失?副本机制-冗余数据实现数据不丢失。

消费者如何保证数据不丢失?维护offset,保证数据消费的位置,防止数据丢失。

 


已发布

分类

,

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注