一. 简述:
Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。
二. 安装部署:
1. 依赖:
a). Java:Kafka 需要 Java 8 或更高版本。
b). zookeeper:
#tar fxvz zookeeper-3.7.0.tar.gz
#mv zookeeper-3.7.0 zookeeper && cd zookeeper
#mkdir data log
编辑 conf/zoo.cfg
文件,设置 dataDir
和其他必要参数。对于多节点集群,还需要配置 myid
文件,并在每个节点上指定唯一的 ID。
#cd conf/ && cp zoo_sample.cfg zoo.cfg
#vim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/xiaoxiang/elk/zookeeper/data
dataLogDir=/xiaoxiang/elk/zookeeper/log
clientPort=12181
server.1=10.0.4.80:12888:13888
server.2=10.0.4.81:12888:13888
server.3=10.0.4.82:12888:13888
#echo 1 > /xiaoxiang/elk/zookeeper/data/myid (不同的node,输入不同的id)
#bin/zkServer.sh start (启动)
#检测:
# netstart -nltp (leader会多一个12888 port)
#jps 会有QuorumPeerMain 进程
连接测试:
# bin/zkCli.sh -server 10.0.4.80:12181,10.0.4.81:12181,10.0.4.82:12181
Connecting to 10.0.4.80:12181,10.0.4.81:12181,10.0.4.82:12181
............
12181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@506c589e
Welcome to ZooKeeper!
2016-09-28 16:55:29,711 [myid:] - INFO [main-SendThread(10.0.4.81:12181):ClientCnxn$SendThread@1032] - Opening socket connection to server 10.0.4.81/10.0.4.81:12181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2016-09-28 16:55:29,906 [myid:] - INFO [main-SendThread(10.0.4.81:12181):ClientCnxn$SendThread@876] - Socket connection established to 10.0.4.81/10.0.4.81:12181, initiating session
[zk: 10.0.4.80:12181,10.0.4.81:12181,10.0.4.82:12181(CONNECTING) 0] 2016-09-28 16:55:30,093 [myid:] - INFO [main-SendThread(10.0.4.81:12181):ClientCnxn$SendThread@1299] - Session establishment complete on server 10.0.4.81/10.0.4.81:12181, sessionid = 0x2576ff98af20000, negotiated timeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
# 从log上看,client端连接的是10.0.4.81:12181(连接上哪台机器的zk进程是随机的)
2. kafka安装:
#tar fxvz kafka_2.11-0.10.0.0.tgz
#mv kafka_2.11-0.10.0.0 kafka
#cd kafka/config
#vim server.properties
broker.id=1 #节点id,各broker会不同
port=19092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.4.80:12181,10.0.4.81:12181,10.0.4.82:12181
zookeeper.connection.timeout.ms=6000
启动:
#nohup bin/kafka-server-start.sh config/server.properties &
#bin/kafka-server-stop.sh
测试生产/消费:
# bin/kafka-topics.sh --create --zookeeper 10.0.4.80:12181 --replication-factor 1 --partitions 1 --topic test 创建topic
# bin/kafka-topics.sh --list --zookeeper 10.0.4.80:21181 查看topic列表
#bin/kafka-console-producer.sh --broker-list 10.0.4.80:19092 --topic test 生产
#bin/kafka-console-consumer.sh --zookeeper 10.0.4.80:12181 --topic test --from-beginning 消费