kafka与zookeeper集群环境搭建

kafka与zookeeper集群环境搭建

kafka集群是把状态保存在zookeeper集群上,所以要创建kafka集群需要先创建zookeeper集群

zookeeper 集群的工作是超过半数正常才能对位提供服务,3台中超过两台则为超过半数,允许1台挂掉,集群可使用2*n + 1台搭建。

在搭建集群过程中需要将三台host主机的防火墙关闭,不然会导致集群主机无法访问。

service iptables stop #关闭防火墙
service iptables start #启动防火墙
service iptables restart #重启防火墙
service iptables status #查看防火墙状态

chkconfig iptables off #永久关闭防火墙
ckconfig iptables on #永久关闭后启用防火墙

版本信息

  • kafka kafka_2.11-0.9.0.1
  • zookeeper zookeeper-3.4.6
  • os centos 6

主机IP地址信息

  • host1 192.168.88.233
  • host2 192.168.88.234
  • host3 192.168.88.235

为了方便可以在三台host主机/etc/hosts 文件添加IP信息

192.168.88.233 host1
192.168.88.234 host2
192.168.88.235 host3

zookeeper集群搭建

  • 将zookeeper安装到/opt/zookeeper下

  • 下载zookeeper安装包到host1主机上

  • 具体安装操作如下

    #解压安装包
    tar -zxvf zookeeper-3.4.6.tar.gz
    mv zookeeper-3.4.6 /opt/zookeeper
    
    #创建zookeeper目录
    mkdir /opt/zookeeper/zkdata /opt/zookeeper/zkdatalog
    
    #修改zookeeper配置信息
    cd /opt/zookeeper/conf
    cp zoo.sample.cfg zoo.cfg
    
    vim zoo.cfg
    
    #需要修改项
    tickTime = 2000	#服务器之间维持心跳时间间隔
    initLimit = 5 #初始化连接时最长能忍受多少个心跳时间间隔数
    synclimit = 5 #leader与Follower之间发送消息,请求和应答时间长度最长不能超过多少个tickTime
    dataDir = /opt/zookeeper/zkdata # 快照日志存储目录
    dataLogDir = /opt/zookeeper/zkdatalog #事物日志存储路径
    clientPort = 2181 #客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接受访问
    
    server1=192.168.88.233:2888:3888 # server1=host1:2888:3888
    server2=192.168.88.234:2888:3888 # server2=host2:2888:3888
    server3=192.168.88.235:2888:3888 # server3=host3:2888:3888
    
    #server.1 1是服务器的标识也可以是其他数字,标识这个是第几号服务器。这个标识也需要写到快照目录下面的myid文件里
    #192.168.88.234:2888:3888 192.168.88.234为集群主机的ip地址,第一个端口是master与slave通信端口,认为2888,第二个端口为leader选举端口,认端口为3888
    
    
    #创建myid文件 三台主机创建不同的id文件
    #server1 
    echo "1" > /opt/zookeeper/zkdata/myid
    
    #server2
    echo "2" > /opt/zookeeper/zkdata/myid
    
    #server3 
    echo "3" > /opt/zookeeper/zkdata/myid
    
  • 启动并查看服务器状态

    cd /opt/zookeeper/bin
    ./zkServer.sh start   
    
    #查看服务状态 将三台集群全部启动后查看
    ./zkServer.sh status
    
    #关闭服务
    ./zkServer.sh stop
    

kafka集群搭建

  • kafka安装目录在/opt/kafka

  • 下载kafka安装包

  • 具体操作步骤如下

    #解压kafka
    tar -zxvf kafka_2.11-0.9.0.1.tgz 
    mv kafka_2.11-0.9.0.1 /opt/kafka
    
    #创建目录
    mkdir /opt/kafka/kafkalogs
    
    #修改kafka配置信息
    cd /opt/kafka/config
    cp server.properties server.properties.bak
    vim server.properties 
    
    broker.id = 0 #每台服务器的broker.id都不能相同
    listeners = PLAINTEXT://192.168.88.233:9092 #分别写上三台主机的ip地址
    log.dirs = /opt/kafka/kafkalogs/ #日志存放目录
    
    #可删除topic
    delete.topic.enable = true
    
    log.retentions.hours=168 #在该项配置后新增后续三项
    
    #新增
    message.max.byte = 5242880 
    default.replication.factor = 2
    replica.fetch.max.bytes = 5242880 
    #设置zookeeper的连接端口
    zookeeper.connect = 192.168.88.233:2181,192.168.88.234:2181,192.168.88.235:2181
    
  • 启动kafka集群并测试

    #启动服务
    cd /opt/kafka/bin
    ./kafka-server-start.sh -daemon ../config/server.properties
    
    #创建topic --replication-factor 2 复制两份 --partitions 1 创建1个分区 --topic 主题名
    ./kafka-topics.sh --create --zookeeper 192.168.88.233:2181,192.168.88.234:2181,192.168.88.235:2181 --replication-factor 2 --partitions --topic test
    #显示topic zookeeper的连接信息可以使用host主机名来缩写
    ./kafka-topic.sh --describe --zookeeper hos1:2181,host2:2181:host3:2181 --topic test
    #列出topic
    ./kafka-topic.sh --list --zookeeper host1:2181:host2:2181:host3:2181
    #删除topic
    ./kafka-topic.sh --delete --zookeeper host1:2181:host2:2181:host3:2181 --topic test
    
    #在其中host1主机上创建发布者
    ./kafka-console-producer.sh --broker-list 192.168.88.233:9092 --topic test
    #在其他主机上创建消费者
    ./kafka-console-consumer.sh --zookeeper hos1:2181,host2:2181:host3:2181 -topic test --from-beginning 
    
    #关闭kafka服务
    ./kafka-server-stop.sh 
    

注意点

  • 启动与关闭顺序
    • 启动时,先启动zookeeper集群,再启动kafka集群
    • 关闭时,先关闭kafka集群,再关闭zookeeper集群
  • 在集群的机器上可以配置集群集群的名称与IP地址的对应关系,这些在配置kafka集群信息的时候会方便很多,需要修改/etc/hosts

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...