kafka
kafka2.8.0不再需要zookeeper:
下载kafka:
https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
下载zookeeper:
https://dlcdn.apache.org/zookeeper/zookeeper-3.5.10/apache-zookeeper-3.5.10-bin.tar.gz
安装zookeeper:
tar -xf /root/apache-zookeeper-3.5.10-bin.tar.gz -C /usr/local/ ln -sv /usr/local/apache-zookeeper-3.5.10-bin/ /usr/local/zookeeper
修改配置:
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg vim /usr/local/zookeeper/conf/zoo.cfg dataDir=/data/zookeeper server.1=192.168.199.35:2888:3888 server.2=192.168.199.36:2888:3888 server.3=192.168.199.38:2888:3888
连接zookeeper:
]# bin/zkCli.sh -server 192.168.199.38:2181 --list
zookeeper.service:
cat >> /usr/lib/systemd/system/zookeeper.service <<EOF [Unit] Description=zookeeper.service After=network.target [Service] Type=forking Environment=JAVA_HOME=/usr/local/jdk ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh Restart=always [Install] WantedBy=multi-user.target EOF
安装kafka:
tar -xf /root/kafka_2.12-2.8.0.tgz -C /usr/local/ ln -sv /usr/local/kafka_2.12-2.8.0 /usr/local/kafka
配置:
vim /usr/local/kafka/config/server.properties broker.id=1 # 每个节点id都不能一样 listeners=PLAINTEXT://192.168.199.35:9092 # 监听的内部地址 advertised.listeners=PLAINTEXT://192.168.199.35:9092 # 监听的外部地址 log.dirs=/data/kafka # 数据目录需要改一下,默认为/tmp zookeeper.connect=zookeeper01:2181,zookeeper02:2181,zookeeper03:2181/kafka # 这个目录用于集中存放数据便于管理
kafka.service:
cat >> /usr/lib/systemd/system/kafka.service <<EOF [Unit] Description=kafka.service After=network.target [Service] Type=forking Environment=JAVA_HOME=/usr/local/jdk ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=always [Install] WantedBy=multi-user.target EOF
常用命令:
kafka-topics.sh
--bootstrap-server 连接kafka
--topic 操作topic
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 查看所有主题
--describe 查看主题详细描述
--partition 设置分区
--replication-factor 设置分区副本
--config 更新系统默认的配置
连接kafka:
]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --list
创建topic:一个分区三个副本
]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092 --topic first --create --partitions 1 --replication-factor 3
查看主题详细信息:
]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092 --topic first --describe Topic: first TopicId: AFrTNBEQQiWg6Yy-_4Jruw PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
连zookeeper也可以:
]# bin/kafka-topics.sh --zookeeper 192.168.199.38:2181 --topic first --describe Topic: first TopicId: AFrTNBEQQiWg6Yy-_4Jruw PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: first Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2 Topic: first Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3 Topic: first Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,2
修改分区:
]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092 --topic first --alter --partitions 3
创建生产者:
]# bin/kafka-console-producer.sh --bootstrap-server 192.168.199.35:9092 --topic first >hello
创建消费者:
]# ./kafka-console-consumer.sh --bootstrap-server 192.168.199.36:9092 --topic first --from-beginning hello
参数:
betch.size:数据到达这个值之后sender才会发送,默认16k
linger.ms:数据未到达betch.size,等待这个时间后也会发送默认0ms
kafka分区迁移
场景是这样的,现在有三个节点,有一个节点故障需要迁移,原来的broker.id为1、2、3现在增加了一个4,需要把坏掉的节点3上面的数据迁移出来。
创建需要迁移的topic列表:多个topic写多个topic即可。
]# cat topics-to-move.json { "topics": [ {"topic": "first"} ], "version": 1 }
生成Topic分区分配表:
--broker-list "1,2,4" 这里面的数字就是要迁移到的broker,就是不管之前数据在那些broker里面现在就是要迁移到这些borker里面。
]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --topics-to-move-json-file /root/topics-to-move.json --broker-list "1,2,4" --generate
上面这个命令输出的内容:
Current partition replica assignment # 这段保存到文件expand-cluster-reassignment.json,用来执行迁移 {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,1,2],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration # 这段保存到文件rollback-cluster-reassignment.json,用来执行回滚 {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2,4],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[4,1,2],"log_dirs":["any","any","any"]}]}
执行迁移:根据数据量时间可能有长短
]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,1,2],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for first-0,first-1,first-2
验证迁移:
]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 1,2,3,4 Clearing topic-level throttles on topic first
迁移结果查看:
]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --topic first --describe
回滚:迁移完了之后又不想迁移了就可以执行回滚
]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/rollback-cluster-reassignment.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2,4],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[4,1,2],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for first-0,first-1,first-2
回滚验证:
]# bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --reassignment-json-file /root/rollback-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 1,2,3,4 Clearing topic-level throttles on topic first
回滚结果查看:
]# bin/kafka-topics.sh --bootstrap-server 192.168.199.35:9092,192.168.199.36:9092,192.168.199.38:9092 --topic first --describe Topic: first TopicId: AFrTNBEQQiWg6Yy-_4Jruw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 2,1,3 Topic: first Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,1,3 Topic: first Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 2,1,3
参考地址:
garywu520.github.io/2018/12/06/KAFKA%E6%89%A9%E5%AE%B9%E8%8A%82%E7%82%B9%E5%92%8C%E5%88%86%E5%8C%BA%E8%BF%81%E7%A7%BB-CDH/
副本扩容缩容
按照如下修改:修改完执行即可
{ "version": 1, "partitions": [ { "topic": "first", "partition": 0, "replicas": [1,3,2], # 表里是 broker id,第一个是leader "log_dirs": ["any","any","any"] # 这里log_dirs中any的数量要和replicas中broker id的数量匹配 }, { "topic": "first", "partition": 1, "replicas": [2,3,1], # 扩容或缩容直接去掉或增加broker id就行 "log_dirs": ["any","any","any"] # log_dirs里面的值也要去掉或添加对应的数量 }, { "topic": "first", "partition": 2, "replicas": [3,1,2], "log_dirs": ["any","any","any"] } ] }
跨路径迁移
log.dirs是可以指定多个目录:
log.dirs=/data/kafka-logs-5,/data/kafka-logs-6,/data/kafka-logs-7,/data/kafka-logs-8
示例:
{ "version": 1, "partitions": [ { "topic": "first", "partition": 0, "replicas": [1,3,2], "log_dirs": ["/data/kafka-logs-5","/data/kafka-logs-6","/data/kafka-logs-7"] # 只要把any改成对应的目录即可 }, { "topic": "first", "partition": 1, "replicas": [2,3,1], "log_dirs": ["/data/kafka-logs-5","/data/kafka-logs-6","/data/kafka-logs-7"] }, { "topic": "first", "partition": 2, "replicas": [3,1,2], "log_dirs": ["/data/kafka-logs-5","/data/kafka-logs-6","/data/kafka-logs-7"] } ] }
副本扩容,跨路径迁移:
51cto.com/article/679893.html