这一节主要讲kafka 集群环境部署,
创新互联服务项目包括满城网站建设、满城网站制作、满城网页制作以及满城网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,满城网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到满城省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
1)Producer:消息生产者,就是向kafkabroker发消息的客户端;
2)Consumer:消息消费者,向kafkabroker取消息的客户端;
3)Topic:可以理解为一个队列;
4)Consumer Group(CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)
和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制
(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一
个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现
单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需
要多次发送消息到不同的topic;
5)Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker
可以容纳多个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,
一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息
都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给
consumer,不保证一个topic的整体(多个partition间)的顺序;
分区数越多,在一定程度上会提升消息处理的吞吐量,因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。
如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。
7)replication-factor
用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
在创建Topic的时候,有两个参数是需要填写的,那就是partions和replication-factor。
8)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查
找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然thefirstoffset就
是00000000000.kafka。
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
解压缩
[root@es7-1-80 conf]# cat zoo.cfg
initLimit=10
syncLimit=5
dataDir=/data/zk/data
clientPort=2181
maxClientCnxns=0
server.1=172.16.0.80:2888:3888
server.2=172.16.0.91:2888:3888
server.3=172.16.0.92:2888:3888
echo 1 >> =/data/zk/data/myid
启动,制作标准启动服务
cat /etc/systemd/system/zookeeper.service
[Unit]
Description=zookeeper.service
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
其他节点按照上述方式配置,并依次启动
集群状态查看
/usr/local/zookeeper/bin/zkServer.sh status
curl -LO https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz
解压缩到所需位置在
broker.id=0 #
num.inetwork.threads=3 #broker 处理消息的最大线程数,一般情况下不需要去修改
num.io.threads=8 # # broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka_logs
num.partitions=1 # 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.0.91:2181,172.16.0.80:2181,172.16.0.92:2181
zookeeper.connection.timeout.ms=6000
[Unit]
Description=Apache Kafka server (broker)
After=network.target
After=syslog.target
After=zookeeper.target
[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
KillMode=none
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
5、将kafka 命令加入到环境变量中
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
其他节点按照上述方式部署,修改broker.id
配置Kafka外网IP地址
kafka0.10以下
修改server.properties 配置文件
advertised.host.name=xxxx
advertised.port=9092
kafka0.10以上
listeners=PLAINTEXT://0.0.0.0:9093 //绑定所有ip
advertised.listeners=PLAINTEXT://42.159.7.75:9093
kafka 常用操作
创建主题
./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test1 --replication-factor 1 --partitions 3
在创建Topic的时候,有两个参数是需要填写的,那就是partions和replication-factor。
删除主题
./kafka-topics.sh --zookeeper 172.16.0.80:2181 --delete --topic test1
查看主题列表
./kafka-topics.sh --zookeeper 172.16.0.80:2181:2181 --list
查看主题状态
kafka-topics.sh --zookeeper 172.16.0.80:2181 --describe --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test2 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
修改主题分区
./kafka-topics.sh --zookeeper 172.16.0.80:2181 --create --topic test3 --replication-factor 1 --partitions 1
kafka-topics.sh --zookeeper 172.16.0.80:2181 --alter --topic test3 --partitions 3
kafka-topics.sh --zookeeper 172.16.0.80:2181 --describe --topic test3
Topic:test3 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test3 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
生产数据
kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4
hi go
[2019-11-26 15:02:32,608] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,713] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,820] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-11-26 15:02:32,927] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test4=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien
解决方法
port=9092
listeners=PLAINTEXT://172.16.0.92:9092 # 修改为对应的ip
重新生产数据
kafka-console-producer.sh --broker-list 172.16.0.80:9092 --topic test4
hello kafaka
kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning
消费数据
kafka-console-consumer.sh --zookeeper 172.16.0.91:2181 --topic test4 --from-beginning
列出消费者主组
消费组 即 Consumer Group,应该算是 Kafka 比较有亮点的设计了。那么何谓 Consumer Group 呢?用一句话概括就是:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --list
获取新版本消费者群组testgroup的详细信息
/kafka-consumer-groups.sh --bootstrap-server 172.16.0.80:9092 --group testgroup --describe