kafka详细介绍,安装,配置
1、简介
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
Kafka是一种分布式的,基于发布/订阅的消息系统 , 它的架构包括以下组件:
i. 消息的发布(publish)称作producer,消息的订阅(subscribe)称作consumer,中间的存储阵列称作broker。
ii. 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发。
iii. producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。
iv. broker端不维护数据的消费状态,提升了性能。 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
v. 直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。
vi. Kafka使用scala编写,可以运行在JVM上。
如上图所示,一个典型的Kafka集群中包含:
若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高)。
若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Topic & Partition
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹
开始安装kafka集群:
1,创建用户
在所有主机上添加用户:
groupadd kafka
useradd kafka -g kafka
2,主机分配 分别为Hadoop1 ,Hadoop2,Hadoop3
3,绑定hosts
172.16.1.250 hadoop1
172.16.1.252 hadoop2
172.16.1.253 hadoop3
4,下载,解压
https://kafka.apache.org/
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1
ln -s /usr/local/hadoop/kafka_2.10-0.8.1.1 /usr/local/hadoop/kafka
chown -R kafka:kafka /usr/local/hadoop
在Hadoop3机器先安装
5,修改配置文件
cd /usr/local/hadoop/kafka/config
vim /kafka/server.properties
broker.id=3 三台机的id不能一样
port=9092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka (zookpeer集群)
zookeeper.connection.timeout.ms=1000000
启动
bin/kafka-server-start.sh /usr/local/hadoop/kafka/config/server.properties &
6,配置Java环境
#java
export JAVA_HOME=/soft/jdk1.7.0_79
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:/$JAVA_HOME/bin:$HADOOP_HOME/bin
7,部署kafka集群
由于kafka集群需要依赖zookeeper,所以安装下zookeeper
见:
https://taoistwar.gitbooks.io/spark-operationand-maintenance-management/content/spark_relate_software/kafka_install.html
7,同步三台机器的配置文件,并修改相应的broker.id=1,broker.id=2,broker.id=3
cd /usr/local/hadoop/
在Hadoop3机器上先安装好一台了
scp -r kafka/ hadoop1:/usr/local/hadoop/
scp -r kafka/ hadoop2:/usr/local/hadoop/
在Hadoop1机器上,修改配置文件,并启动
vim conf/server.properties
broker.id=1 三台机的id不能一样
port=9092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka (zookpeer集群)
zookeeper.connection.timeout.ms=1000000
启动
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
在Hadoop2机器上,修改配置文件,并启动
vim conf/server.properties
broker.id=2 三台机的id不能一样
port=9092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka (zookpeer集群)
zookeeper.connection.timeout.ms=1000000
启动
bin/kafka-server-start.sh /usr/local/hadoop/kafka/config/server.properties &
8验证
使用Kafka自带的脚本,启动基于Console的producer和consumer。
9,错误总结:
ttp://wenda.chinahadoop.cn/question/4079?notification_id=290954&rf=false&item_id=10382#!answer_10382
http://blog.csdn.net/wenxuechaozhe/article/details/52664774
http://472053211.blog.51cto.com/3692116/1655844
10,实际操作见:
https://taoistwar.gitbooks.io/spark-operationand-maintenance-management/content/spark_relate_software/kafka_install.html
本文转自crazy_charles 51CTO博客,原文链接:http://blog.51cto.com/douya/1860895,如需转载请自行联系原作者

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
大数据和Hadoop时代的维度建模和Kimball数据集市
维度建模已死? 在回答这个问题之前,让我们回头来看看什么是所谓的维度数据建模。 为什么需要为数据建模? 有一个常见的误区,数据建模的目的是用 ER 图来设计物理数据库,实际上远不仅如此。数据建模代表了企业业务流程的复杂度,记录了重要的业务规则和概念,并有助于规范企业的关键术语。它清晰地阐述、协助企业揭示商业过程中模糊的想法和歧义。此外,可以使用数据模型与其他利益相关者进行有效沟通。没有蓝图,不可能建造一个房子或桥梁。所以,没有数据模型这样一个蓝图,为什么要建立一个数据应用,比如数据仓库呢? 为什么需要维度建模? 维度建模是数据建模的一种特殊方法。维度建模有两个同义词,数据集市和星型结构。星型结构是为了更好地进行数据分析,参考下面图示的维度模型,可以有一个很直观的理解。通过它可以立即知道如何通过客户、产品、时间对订单进行分割,如何通过度量的聚集和比较对订单业务过程进行绩效评估。 维度建模最关键的一点,是要定义事务性业务过程中的最低粒度是什么。如果切割或钻入数据,到叶级就不能再往下钻取。从另一个角度看,星型结构中的最低粒度,即事实和维度之间没有进行任何聚集的关联。 数据建模和维度建模 标准...
- 下一篇
centos 6.6 hadoop 2.7.1 完全分布式安装部署
1.安装前,准备三台CENTOS 6.6系统的主机或虚机,并且关闭防火墙及selinux. 2.按如下表格配置IP地址,修改hosts文件及本机名 192.168.199.21hadoop21 Master 102.168.199.22hadoop22 Slave1 192.168.199.23hadoop23 Slave2 6.6hadoop2.7.1完全分布式安装部署" title="centos6.6hadoop2.7.1完全分布式安装部署" style="margin:0px;padding:0px;border:0px;list-style:none;" data-original="http://s14.sinaimg.cn/mw690/002nJwOegy6UlUhMq6xcd&690">同理修改Slave1,Slave2的IP地址,hosts文件及本机名。 3.安装ORACLE JDK 先卸载本机openJDK,使用rpm -qa|grep java查看,然后用rpm -e 卸载 从oracle网站找到最新JDK,我这选择了JDK8 http://www.ora...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Hadoop3单机部署,实现最简伪集群
- Red5直播服务器,属于Java语言的直播服务器
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7,8上快速安装Gitea,搭建Git服务器
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2更换Tomcat为Jetty,小型站点的福音