成都创新互联网站制作重庆分公司

kafka精通1-创新互联

学习目标: 学习内容: 为什么使用消息队列(MQ)
  • 使用同步方式来解决多个服务之间的通信
    在这里插入图片描述
    同步通信的方式方式会存在性能和稳定性的问题

    成都创新互联公司是专业的乌拉特后网站建设公司,乌拉特后接单;提供网站制作、成都网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行乌拉特后网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
  • 使用异步的通信方式
    在这里插入图片描述
    异步的优势:可以让上游快速成功,明显提升系统吞吐量;即使有服务失败,也可以通过分布式事务解决方案来保证最终是成功的,也能保障业务执行之后的最终一致性。
    消息队列解决具体的是什么问题--------通信问题

消息队列的流派

目前消息队列的中间件选型有很多种:

  • rabbitMQ
  • rocketMQ
  • kafka(全球消息处理性能最快的一款MQ)
  • zeroMQ

这些消息队列中间件有什么区别?

  1. 有broker
    重topic:kafka、rocketMQ
    整个topic,依据topic来进行消息的中转,在重topic的消息队列里必然需要topic的存在
    轻topic:RabbitMQ
    topic只是一种中转模式
  2. 无broker
    在生产者和消费者之间没有使用broker,例如zeroMQ,直接使用socket来进行通信
kafka介绍

kafka是一个分布式、支持分区(partition)、多副本(replica),基于zookeeper协调的分布式消息系统,大特点是可以实时的处理大量数据

kafka使用场景
  • 日志收集:用kafka收集各种服务的日志,通过kafka以统一接口服务的方式开放给consumer
  • 消息系统:解耦和生产者和消费者、缓存消息等
  • 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,比如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘
  • 运营指标:kafka也经常用来记录运营监控指标。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
kafka的基本知识
  1. kafka的安装

    1.部署一台zookeeper服务器
    	2.安装jdk
    	3.下载kafka的安装包:https:kafka.apache.org/download
    	4.上传到kafka服务器上并解压:/usr/local/kafka
    	5.	进入conf目录内,修改server.properties
    
    	```powershell
    	#broker.id属性在kafka集群中必须要唯一
    	broker.id=0
    	#kafka部署的机器ip和提供服务的端口
    	listeners=PLAINTEXT://10.234.252.122:9092
    	#kafka消息存储文件
    	log.dirs=/usr/local/kafka
    	#kafka连接zookeeper的地址
    	zookeeper.connect=10.234.252.122:2181
    	```	
    
    	```powershell
    	#进入到bin目录内,执行以下命令来启动kafka服务器(带着配置文件)
    	./kafka-server-start.sh -daemon ../config/server.properties
    
    	#校验kafka是否启动成功,进入到zk内查看是否有kafka的节点
    	ls /brokers/ids				#查询出有broker的id则存在
    	```
  2. 创建topic

执行以下命令创建名为“test”的topic,这个topic只有一个partition,并且备份因子也设置为1:
#./kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看当前kafka内有那些topic
./kafka-topic.sh --zookeeper localhost:2182 --list
  1. 发送消息
    kafka自带一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中,在默认情况下,每一行会被当成一个独立的消息,使用kafka的发送消息的客户端,指定发送到kafka服务器地址和topic
./kafka-console-producer.sh --broker-list 10.234.252.122:9092 --topic test
>hello
>world
>1111
>22222222
  1. 消费消息
    对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息,使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息
方式1:从最后一条消息的偏移量(offset)+1开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test

方式2:从开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --from-beginning
>hello
>world
>1111
>22222222
  1. 关于消费消息的细节
  • 生产者将消息发送给broker,broker会将消息保存在本地日志文件中
/usr/local/kafka/kafka-logs/主题-分区/000000.log
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置

在这里插入图片描述

  1. 单播消息
    如果多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的topic中的消息;换言之,同一个消费组中只能有一个消费者收到一个topic中的消息
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --consumer-property group.id=testgroup
  1. 多播消息
    不同的消费组订阅同一个topic,那么不同的消费组中只有一个消费者能收到消息;实际上也是多个消费组中的多个消费者收到同一个topic的消息
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --consumer-property group.id=testgroup1

./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test  --consumer-property group.id=testgroup2
  1. 查看消费组及信息
查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --list

查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --describe --group testgroup

在这里插入图片描述

在这里插入图片描述
注意:

  • current-offset:当前消费组的已消费偏移量
  • log-end-offset:主题对应分区消费的结束偏移量
  • lag:当前消费组未消费的消息数
主题、分区概念
  1. 主题topic
    主题-topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类,不同的topic会被订阅该topic的消费者消费。
    但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的,为了解决这个文件过大的问题,kafka提出来分区的概念

  2. partition分区
    通过partition将一个topic中的消息分区来存储,这样的好处:

  • 分区存储,可以解决统一存储文件过大的问题
  • 提高了读写的吞吐量,读和写可以同时在多个分区中进行

分区的作用:

分布式存储
		可以并行写

在这里插入图片描述

**为一个主题创建多个分区**
./kafka-topics.sh --create --zookeeper localhost:2181  --partitions 2 --replication-factor 1 --topic test1
  1. kafka中消息日志文件中保存的内容

    00000.log:这个文件中保存的就是消息
    
    __consumer_offsets-49:kafka内部自己创建了 __consumer_offsets主题,包含了50个分区,这个主题用来存放消费者消费某个主题的偏移量,因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主的上报给kafka中的默认主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区
    		提交到那个分区:通过hash函数:hash(consumergroupid)%_consumer_offsets主题的分区数
    		提交到该主题中的内容是:key是consumergroupid+topic+分区号,value就是当前offsets的值
    		
    文件中保存的消息,默认保存7天,7天后消息会被删除
副本的概念

在创建主题时,除了指明主题的分区数以外,还指明了副本数
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower
在这里插入图片描述

  • leader:
    kafka的读和写的操作,都发生在leader上,leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader
  • follower:
    接受leader的同步的数据
  • isr:
    可以同步和已同步的节点会被存入到isr集合中,这里有一个细节:如果isr中的节点性能较差,会被踢出isr集合
kafka集群消息的发送与消费
./kafka-console-producer.sh --broker-list 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --topic my-replicated-topic

./kafka-console-consumer.sh --bootstrap-server 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --from-beginning --topic my-replicated-topic

在这里插入图片描述

关于分区消费组消费者的细节

在这里插入图片描述

  • 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partition的多个消费者消费的总的顺序性是得不到保证的。
  • partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息
  • 如果消费者挂了,会触发rebalance机制,会让其他消费者来消费该分区
生产者的同步发送消息

在这里插入图片描述
如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数为3次

生产者的异步发消息

在这里插入图片描述
异步发送,生产者发完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法

生产者ACK的配置

在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞,那么集群什么时候返回ACK呢?此时ACK有三个配置:

  • ACK=0:kafka-cluster不需要任何的broker收到消息,就立即返回ACK给生产者,最容易丢消息的,效率是最高的
  • ACK=1(默认):多副本之间的leader已经收到消息,并把消息写入到本地log中,才会返回ACK给生产者,性能和安全性是最均衡的
  • ACK=-1/all:里面有默认的配置min.insync.replica=2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完成后,才会返回ACK给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但性能最差

其他一些细节:

  • 发送默认会重试3次,每次间隔100ms
  • 发送的消息会先进入到本地缓冲区(32MB),用来存放要发送的消息,kafka会跑一个线程,该线程去缓冲区中取16k(也是可以配置的,默认是16k)的数据,发送到kafka,如果到10毫秒数据没取满16k,也会发送一次

在这里插入图片描述

消费者offset的自动提交和手动提交

在这里插入图片描述
消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量

  • 提交的内容
    消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的主题+消费的某个分区及消费的偏移量这样的信息提交到集群的_consumer_offsets主题里面

  • 自动提交
    消费者poll消息下来以后就会自动提交offset
    自动提交会丢消息:因为如果消费者还没消费完poll下来的消息就会自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息,之前未被消费的消息就会丢失掉了

  • 手动提交
    需要把自动提交的配置改成false
    手动提交又分成两种

    - 手动同步提交
     	在消费完消息后调用同步提交的方法,当集群返回ACK前一直阻塞,返回ACK后表示提交成功,执行之后的逻辑
    
      - 手动异步提交
     	在消息消费完后提交,不需要等到集群ACK,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
消费者poll消息的过程
  • 默认情况下,消费者一次会poll 500条消息
  • 长轮询的时间默认是1000毫秒
    意味着:
    1、如果一次poll到500条,就执行for循环
    2、如果这一次没有poll到500

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


本文名称:kafka精通1-创新互联
网页链接:http://cxhlcq.com/article/doocps.html

其他资讯

在线咨询

微信咨询

电话咨询

028-86922220(工作日)

18980820575(7×24)

提交需求

返回顶部