欢迎来到飞鸟慕鱼博客,开始您的技术之旅!
当前位置: 首页知识笔记正文

kafka的三个功能,kafka简介及使用

终极管理员 知识笔记 80阅读

参考: 【kafka专栏】不用zookeeper怎么安装kafka集群-最新kafka3.0版本

一、kafka集群实例角色规划

在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。

上图中黑色代表broker消息代理服务褐色/蓝色代表Controller(集群控制器服务)

左图kafka2.0一个集群所有节点都是broker角色kafka从三个broker中选举出来一个Controller控制器控制器将集群元数据信息比如主题分类、消费进度等保存到zookeeper用于集群各节点之间分布式交互。右图kafka3.0假设一个集群有四个broker指定三个作为Conreoller角色(蓝色)从三个Controller中选举出来一个Controller作为主控制器褐色其他的2个备用。zookeeper不再被需要相关的元数据信息以kafka日志的形式存在即以消息队列消息的形式存在。controller通信端口9093, 作用与zk的2181端口类似 。

在搭建kafka3.0集群之前, 我们需要先做好kafka实例角色规划。(四个broker, 需要通过主动配置指定三个作为Controller, Controller需要奇数个, 这一点和zk是一样的)

主机名称ip角色node.idkafka-vm1192.168.1.111broker,controller1kafka-vm2192.168.1.112broker,controller2kafka-vm3192.168.1.113broker,controller3kafka-vm4192.168.1.114broker4 二、准备工作 kafka3.x不再支持JDK8建议安装JDK11或JDK17。新建kafka持久化日志数据mkdir -p /data/kafka;并保证安装kafka的用户具有该目录的读写权限。

各个机器节点执行

                                                                          # 安装jdk(kafka3.x不再支持JDK8建议安装JDK11或JDK17, 这里安装jdk11)                                                                                          # 下载安装jdk11, 参考:                                                                                                                                                                        # 下载kafka                                                                                          adduser kafka                                                                                          cd /opt                                                                                          wget                                                                                           tar -xf kafka_2.12-3.3.1.tgz                                                                                                                                                                       chown -R kafka:kafka kafka_2.12-3.3.1*                                                                                                                                                                       mkdir -p /data/kafka                                                                                          chown -R kafka:kafka /data/kafka                            
vi /etc/hosts各个节点添加如下内容
                                                                                        192.168.1.111 data-vm1                                                                                                          192.168.1.112 data-vm2                                                                                                          192.168.1.113 data-vm3                                                                                                          192.168.1.114 data-vm4                                  
三、修改Kraft协议配置文件

在kafka3.x版本中使用Kraft协议代替zookeeper进行集群的Controller选举所以要针对它进行配置。

vi /opt/kafka_2.12-3.3.1/config/kraft/server.properties        

具体配置参数如下

                                                                                        # data-vm1节点                                                                                                          node.id1                                                                                                          process.rolesbroker,controller                                                                                                          listenersPLAINTEXT://data-vm1:9092,CONTROLLER://data-vm1:9093                                                                                                          advertised.listenersPLAINTEXT://:9092                                                                                                          controller.quorum.voters1data-vm1:9093,2data-vm2:9093,3data-vm3:9093                                                                                                          log.dirs/data/kafka/                                                                                                                                                                                                     # data-vm2节点                                                                                                          node.id2                                                                                                          process.rolesbroker,controller                                                                                                          listenersPLAINTEXT://data-vm2:9092,CONTROLLER://data-vm2:9093                                                                                                          advertised.listenersPLAINTEXT://:9092                                                                                                          controller.quorum.voters1data-vm1:9093,2data-vm2:9093,3data-vm3:9093                                                                                                          log.dirs/data/kafka/                                                                                                                                                                                                     # data-vm3节点                                                                                                          node.id3                                                                                                          process.rolesbroker,controller                                                                                                          listenersPLAINTEXT://data-vm3:9092,CONTROLLER://data-vm3:9093                                                                                                          advertised.listenersPLAINTEXT://:9092                                                                                                          controller.quorum.voters1data-vm1:9093,2data-vm2:9093,3data-vm3:9093                                                                                                          log.dirs/data/kafka/                                  
node.id这将作为集群中的节点 ID,唯一标识按照我们事先规划好的上文在不同的服务器上这个值不同。其实就是kafka2.0中的broker.id,只是在3.0版本中kafka实例不再只担任broker角色也有可能是controller角色所以改名叫做node节点。process.roles一个节点可以充当broker或controller或两者兼而有之。按照我们事先规划好的上文在不同的服务器上这个值不同。多个角色用逗号分开。listeners: broker将使用9092端口而kraft controller控制器将使用9093端口。advertised.listeners: 这里指定kafka通过代理暴漏的地址如果都是局域网使用就配置PLAINTEXT://:9092即可。controller.quorum.voters这个配置用于指定controller主控选举的投票节点所有process.roles包含controller角色的规划节点都要参与即zimug1、zimug2、zimug3。其配置格式为:node.id1host1:9093,node.id2host2:9093log.dirskafka 将存储数据的日志目录在准备工作中创建好的目录。

所有kafka节点都要按照上文中的节点规划进行配置完成config/kraft/server.properties配置文件的修改。

四、格式化存储目录

生成一个唯一的集群ID在一台kafka服务器上执行一次即可这一个步骤是在安装kafka2.0版本的时候不存在的。

                                                                                        $ /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh random-uuid                                                                                                          SzIhECn-QbCLzIuNxk1A2A                                  

使用生成的集群ID配置文件格式化存储目录log.dirs,

所以这一步确认配置及路径确实存在

并且kafka用户有访问权限检查准备工作是否做对。

每一台主机服务器都要执行命令:

                                                                                        /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh format \                                                                                                          -t SzIhECn-QbCLzIuNxk1A2A \                                                                                                          -c /opt/kafka_2.12-3.3.1/config/kraft/server.properties                                  

格式化操作完成之后log.dirs​目录下多出一个Meta.properties文件​存储了当前的kafka节点的id(node.id)当前节点属于哪个集群(cluster.id)

                                                                          [rootdata-vm2 ~]              # ll /data/kafka/                                                                                          总用量 8                                                                                          -rw-r--r--. 1 root root 249 10月 11 18:23 bootstrap.checkpoint                                                                                          -rw-r--r--. 1 root root  86 10月 11 18:23 meta.properties                                                                                                                                                                       $               cat /data/kafka/meta.properties                                                                                          #                                                                                          #Tue Apr 12 07:39:07 CST 2022                                                                                          node.id1                                                                                          version1                                                                                          cluster.idSzIhECn-QbCLzIuNxk1A2A                            
五、 启动集群完成基础测试

zimug1 zimug2 zimug3是三台应用服务器的主机名称(参考上文中的角色规划)实现方式已经在本专栏《linux主机与ip解析》中进行了说明。将下面的命令集合保存为一个shell脚本并赋予执行权限。执行该脚本即可启动kafka集群所有的节点前提是你已经按照本专栏的《集群各节点之间的ssh免密登录》安装方式做了集群各节点之间的ssh免密登录。

启动命令

                                                                          bin/kafka-server-start.sh \                                                                                          /opt/kafka_2.12-3.3.1/config/kraft/server.properties                                                                                                                                                                       # 后台运行                                                                                          nohup bin/kafka-server-start.sh \                                                                                          /opt/kafka_2.12-3.3.1/config/kraft/server.properties 2>&1 &                            

脚本 

                                                                          #!/bin/bash                                                                                          kafkaServers              data-vm1 data-vm2 data-vm3                                                                                          #启动所有的kafka                                                                                          for kafka               in               $kafkaServers                                                                                          do                                                                                              ssh -T               $kafka <<              EOF                                                                                              nohup /opt/kafka_2.12-3.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-3.3.1/config/kraft/server.properties 1>/dev/null 2>&1 &                                                                                          EOF                                                                                          echo 从节点               $kafka 启动kafka3.0...[               done ]                                                                                          sleep 5                                                                                          done                            
六、一键停止集群脚本

一键停止kafka集群各节点的脚本与启动脚本的使用方式及原理是一样的。

停止命令

/opt/kafka_2.12-3.3.1/bin/kafka-server-stop.sh      

执行脚本

                                                                          #!/bin/bash                                                                                          kafkaServers              data-vm1 data-vm2 data-vm3                                                                                          #停止所有的kafka                                                                                          for kafka               in               $kafkaServers                                                                                          do                                                                                              ssh -T               $kafka <<              EOF                                                                                              cd /opt/kafka_2.12-3.3.1                                                                                              bin/kafka-server-stop.sh                                                                                          EOF                                                                                          echo 从节点               $kafka 停止kafka...[               done ]                                                                                          sleep 5                                                                                          done                            
七、测试Kafka集群 7.1 创建topic
                                                                          [rootdata-vm1 kafka_2.12-3.3.1]              # bin/kafka-topics.sh \                                                                                          --create \                                                                                          --topic quickstart-events \                                                                                          --bootstrap-server data-vm4:9092                                                                                                                                                                       Created topic quickstart-events.                                                                                          [rootdata-vm1 kafka_2.12-3.3.1]              #                                                                                                                                                                       #                                                                                           [rootdata-vm1 kafka_2.12-3.3.1]              # bin/kafka-topics.sh \                                                                                          --create \                                                                                          --topic quickstart-events \                                                                                          --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092                            
7.2 查看topic列表
                                                                          bin/kafka-topics.sh \                                                                                          --list \                                                                                          --bootstrap-server data-vm4:9092                                                                                                                                                                       #                                                                                           bin/kafka-topics.sh \                                                                                          --list \                                                                                          --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092,data-vm4:9092                            
7.3 查看消息详情
                                                                          [rootdata-vm1 kafka_2.12-3.3.1]              # bin/kafka-topics.sh \                                                                                          --describe \                                                                                          --topic quickstart-events \                                                                                          --bootstrap-server data-vm3:9092                                                                                                                                                                       Topic: quickstart-events        TopicId: zSOJC6wNRRGQ4MudfHLGvQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes1073741824                                                                                                  Topic: quickstart-events        Partition: 0    Leader: 1       Replicas: 1     Isr: 1                                                                                                                                                                       [rootdata-vm1 kafka_2.12-3.3.1]              #                            
7.4 生产消息
                                                                          [rootdata-vm1 kafka_2.12-3.3.1]              # bin/kafka-console-producer.sh \                                                                                          --topic quickstart-events \                                                                                          --bootstrap-server data-vm1:9092                                                                                                                                                                       # 参考: 创建并配置topic                                                                                          bin/kafka-topics.sh \                                                                                          --bootstrap-server localhost:9092 \                                                                                          --create \                                                                                          --topic my-topic \                                                                                          --partitions 1 \                                                                                          --replication-factor 1 \                                                                                          --config max.message.bytes64000 \                                                                                          --config flush.messages1                                                                                                                                                                       # ------------------------- 参考 ------------------------ #                                                                                          # 1: 修改已创建topic配置                                                                                          # (Overrides can also be changed or set later using the alter configs command.)                                                                                          bin/kafka-configs.sh \                                                                                          --bootstrap-server localhost:9092 \                                                                                          --entity-type topics \                                                                                          --entity-name my-topic \                                                                                          --alter \                                                                                          --add-config max.message.bytes128000                                                                                                                                                                       # 2: 检查已修改的topic配置是否生效                                                                                          # (To check overrides set on the topic you can do)                                                                                          bin/kafka-configs.sh \                                                                                          --bootstrap-server localhost:9092 \                                                                                          --entity-type topics \                                                                                          --entity-name my-topic \                                                                                          --describe                                                                                                                                                                       # 3. 恢复到原来的配置                                                                                          # (To remove an override you can do)                                                                                          bin/kafka-configs.sh \                                                                                          --bootstrap-server localhost:9092 \                                                                                          --entity-type topics \                                                                                          --entity-name my-topic \                                                                                          --alter \                                                                                          --delete-config max.message.bytes                                                                                                                                                                       # 4. 增加分区数                                                                                          # (To add partitions you can do)                                                                                          bin/kafka-topics.sh \                                                                                          --bootstrap-server broker_host:port \                                                                                          --alter \                                                                                          --topic my_topic_name \                                                                                          --partitions 40                                                                                                                                                                       # 5. 添加配置                                                                                          # (To add configs:)                                                                                          bin/kafka-configs.sh \                                                                                          --bootstrap-server broker_host:port \                                                                                          --entity-type topics \                                                                                          --entity-name my_topic_name \                                                                                          --alter \                                                                                          --add-config xy                                                                                                                                                                       # 6. 移除配置                                                                                          # (To remove a config:)                                                                                          bin/kafka-configs.sh \                                                                                          --bootstrap-server broker_host:port \                                                                                          --entity-type topics \                                                                                          --entity-name my_topic_name \                                                                                          --alter \                                                                                          --delete-config x                                                                                                                                                                       # 7. 删除topic                                                                                          # (And finally deleting a topic:)                                                                                          bin/kafka-topics.sh \                                                                                          --bootstrap-server broker_host:port \                                                                                          --delete \                                                                                          --topic my_topic_name                                                
7.5 消费消息
                                                                          bin/kafka-console-consumer.sh \                                                                                          --topic quickstart-events \                                                                                          --from-beginning \                                                                                          --bootstrap-server data-vm4:9092                            
7.6 查看消费者组
                                                                          # 检查消费者postition                                                                                          # Checking consumer position                                                                                          bin/kafka-consumer-groups.sh \                                                                                          --bootstrap-server localhost:9092 \                                                                                          --describe \                                                                                          --group my-group                                                                                                                                                                         TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID                                                                                            my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1                                                                                            my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1                                                                                            my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2                            
7.7 查看消费者组列表
                                                                          # list all consumer groups across all topics                                                                                          bin/kafka-consumer-groups.sh \                                                                                          --bootstrap-server localhost:9092 \                                                                                          --list                                                                                                                                                                         test-consumer-group                                                                                                                                                                                                                                                    # To view offsets, as mentioned earlier,                                                                                           # we describe the consumer group like this:                                                                                          bin/kafka-consumer-groups.sh \                                                                                          --bootstrap-server localhost:9092 \                                                                                          --describe \                                                                                          --group my-group                                                                                                                                                                         TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID                                                                                            topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2                                                                                            topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2                                                                                            topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2                                                                                            topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1                                                                                            topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1                                                                                            topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4                                                                                                                                                                       # 更多配置参考:                                                                                          #                                                 

标签:
声明:无特别说明,转载请标明本文来源!