介绍
由Scala和Java编写的一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
安装kafka包
-
将kafka_2.9.2-0.8.1.tgz拷贝到sparkproject1的/usr/local目录下。
-
对kafka_2.9.2-0.8.1.tgz进行解压缩
tar -zxvf kafka_2.9.2-0.8.1.tgz
- 对kafka目录进行改名
mv kafka_2.9.2-0.8.1 kafka
配置kafka
vi $KAFKA_HOME/config/server.properties
broker.id:依次增长的整数,0.1.2,集群中Broker的唯一id
zookeeper.connect=172.16.131.105:2181,172.16.131.106:2181,172.16.131.107:2181
# 把slf4j中的slf4j-nop-1.7.6.jar复制到kafka的libs目录下面
scp -r $KAFKA_HOME root@sparkproject2:/usr/local
scp -r $KAFKA_HOME root@sparkproject3:/usr/local
启动kafka集群
- 解决kafka Unrecognized VM option ‘UseCompressedOops’问题
vi $KAFKA_HOME/bin/kafka-run-class.sh
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
# 去掉-XX:+UseCompressedOops即可
- 在三台机器上的kafka目录下,分别执行以下命令
nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
- 使用jps检查启动是否成功
测试kafka集群
使用基本命令检查kafka是否搭建成功
bin/kafka-topics.sh --zookeeper 172.16.131.110:2181,172.16.131.111:2181,172.16.131.112:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create
bin/kafka-console-producer.sh --broker-list 172.16.131.110:9092,172.16.131.111:9092,172.16.131.112:9092 --topic TestTopic
bin/kafka-console-consumer.sh --zookeeper 172.16.131.110:2181,172.16.131.111:2181,172.16.131.112:2181 --topic TestTopic --from-beginning