kafka结合zookeeper集群搭建

0.服务器规划

ip地址 主机名 安装软件
192.168.10.40 kafka01 zookeeper、kafka、kafka-manager
192.168.10.41 kafka02 zookeeper、kafka
192.168.10.42 kafka03 zookeeper、kafka

1.安装jdk1.8

 rpm -ivh jdk-8u131-linux-x64.rpm
 [root@kafka01 ~]# java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

2.安装三个节点的zookeeper

https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
tar -zvxf apache-zookeeper-3.6.1.tar.gz -C /usr/local
mv apache-zookeeper-3.6.1 zookeeper
mkdir /usr/local/zookeeper/data
cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data/
dataLogDir=/usr/local/zookeeper/data/logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=192.168.10.40:2888:3888
server.2=192.168.10.41:2888:3888
server.3=192.168.10.42:2888:3888

echo "1" > /usr/local/zookeeper/data/myid
echo "2" > /usr/local/zookeeper/data/myid
echo "3" > /usr/local/zookeeper/data/myid

启动zookeeper
 ./bin/zkServer.sh start

 [root@kafka03 zookeeper]# ./bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

3.安装kafka集群

wget  http://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xf kafka_2.11-1.0.0.tgz  -C /usr/local
 mv kafka_2.11-1.0.0 kafka
 cp /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties_bak
egrep -v  "^$|^#" /usr/local/kafka/config/server.properties_bak > /usr/local/kafka/config/server.properties

broker.id=0
listeners=PLAINTEXT://192.168.10.40:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.40:2181,192.168.10.41:2181,192.168.10.42:2181
zookeeper.connection.timeout.ms=2000
group.initial.rebalance.delay.ms=0

auto.leader.rebalance.enable=true #如果这是true,控制者将会自动平衡brokers对于partitions的leadership
auto.create.topics.enable=false   #auto.create.topics.enable默认是true的,如果在没有创建topic的前提下,启动producer,会自动创建topic
delete.topic.enable=true  #delete.topic.enable默认是false的,如果不设置成true,只能进行逻辑删除,并不能真正的删除

启动kafka
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties >/dev/null 2>&1 &

[root@kafka01 local]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    

tcp6       0      0 :::39759                :::*                    LISTEN      20111/java          
tcp6       0      0 192.168.10.40:3888      :::*                    LISTEN      20111/java          
tcp6       0      0 :::8080                 :::*                    LISTEN      20111/java          
tcp6       0      0 :::22                   :::*                    LISTEN      811/sshd            
tcp6       0      0 ::1:25                  :::*                    LISTEN      912/master          
tcp6       0      0 :::37983                :::*                    LISTEN      35030/java          
tcp6       0      0 192.168.10.40:9092      :::*                    LISTEN      35030/java          
tcp6       0      0 :::2181                 :::*                    LISTEN      20111/java          

创建一个topic测试
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.40:2181,192.168.10.41:2181,192.168.10.42:2181 --replication-factor 1 --partitions 1 --topic test

##解释
##  --replication-factor 2   #复制一份
## --partitions 1 #创建1个分区
## --topic #主题为test

查看创建的topic(换台服务器)
[root@kafka02 local]# /usr/local/kafka/bin/kafka-topics.sh  --list  --zookeeper 192.168.10.40:2181,192.168.10.41:2181,192.168.10.42:2181 
test

#创建一个broker,发布者
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

kafka结合zookeeper集群搭建

在一台服务器上创建一个订阅者

/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
kafka结合zookeeper集群搭建 kafka结合zookeeper集群搭建

4.安装kafka集群管理工具kafka-manager

为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。kafka-manager 项目地址:https://github.com/yahoo/kafka-manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,kafka-manager有如下功能: - 管理多个kafka集群 - 便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况) - 选择你要运行的副本 - 基于当前分区状况进行 - 可以选择topic配置并创建topic(0.8.1.1和0.8.2的配置不同) - 删除topic(只支持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true) - Topic list会指明哪些topic被删除(在0.8.2以上版本适用) - 为已存在的topic增加分区 - 为已存在的topic更新配置 - 在多个topic上批量重分区 - 在多个topic上批量重分区(可选partition broker位置)

4.1 安装管理工具

wget https://github.com/yahoo/CMAK/archive/3.0.0.5.zip
mv CMAK-3.0.0.5 kafka-manager
 cp application.conf application.conf.bak
 #kafka-manager.zkhosts="kafka-manager-zookeeper:2181"

#kafka-manager.zkhosts=${?ZK_HOSTS}
kafka-manager.zkhosts="192.168.10.40:2181,192.168.10.41:2181,192.168.10.42:2181"

启动kafka-manager
nohup /usr/local/kafka-manager/bin/kafka-manager  >/dev/null 2>&1 &

需要注意:
kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
 /usr/local/kafka-manager/bin/kafka-manager -Dconfig.file= /usr/local/kafka-manager/conf/application.conf -Dhttp.port=8080 &

[root@kafka01 kafka-manager]# lsof -i:9000
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    92115 root  111u  IPv6  75245      0t0  TCP *:cslistener (LISTEN)

web端访问

kafka结合zookeeper集群搭建

界面建一个topic

kafka结合zookeeper集群搭建

5.kafka管理常用

5.1 kafka 数据迁移

cat /tmp/topics-to-move.json
{"topics":[{"topic":"topic1"}],"version":2}

获取生成分配方案
bin/kafka-reassign-partitions.sh   --zookeeper localhost:2181 --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "3,4,5" --generate

结果复制到/tmp/reassign-plan.json
执行分配方案
bin/kafka-reassign-partitions.sh   --zookeeper localhost:2181 --reassignment-json-file /tmp/reassign-plan.json --execute 

查看配过程进
 ./bin/kafka-reassign-partitions.sh  --zookeeper localhost:2181   --reassignment-json-file  /tmp/reassign-plan.json --verify

  • 我的微信
  • 这是我的微信扫一扫
  • weinxin
  • 我的微信公众号
  • 我的微信公众号扫一扫
  • weinxin
avatar

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: