通过python操作kafka
通过python操作kafka
kafka特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量[2] :即使是非常普通的硬件Kafka也可以支持每秒数百万[2] 的消息
支持通过Kafka服务器和消费机集群来分区消息
支持Hadoop并行数据加载
术语:
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
一、安装
在pypi.python.org有很多关于操作kafka的组件,我们选择weight最高的kafka 1.3.5
1、有internet网的情况下执行如下命令安装:
pip install kafka easy_install kafka
2、无internet网的情况下把源码下载下来,上传到需要安装的主机
压缩包:kafka-x.x.x.tar.gz
解压: tar xvf kafka-x.x.x.tar.gz
执行安装命令: cd kafka-x.x.x
python setup.py install
如安装报依赖错误,需要把依赖的组件也下载下来,然后进行安装,同样的方法,不赘述!
二、按照官网的样例,先跑一个应用
1、生产者:
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['xxx.xx.xx.xxx:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ] for i in range(3): msg = "msg%d" % i producer.send('test', msg) producer.close()
2、消费者(简单demo):
from kafka import KafkaConsumer consumer = KafkaConsumer('test', bootstrap_servers=['xxx.xx.xx.xx:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
启动后生产者、消费者可以正常消费。
3、消费者(消费群组)
from kafka import KafkaConsumer consumer = KafkaConsumer('test', group_id='my-group', bootstrap_servers=['xxx.xx.xx.xx:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力
4、消费者(读取目前最早可读的消息)
from kafka import KafkaConsumer consumer = KafkaConsumer('test', auto_offset_reset='earliest', bootstrap_servers=['xxx.xx.xx.xxx:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest 源码定义:{'smallest': 'earliest', 'largest': 'latest'}
5、消费者(手动设置偏移量)
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer('test', bootstrap_servers=['xxx.xx.xx.xxx:9092']) print consumer.partitions_for_topic("test") #获取test主题的分区信息 print consumer.topics() #获取主题列表 print consumer.subscription() #获取当前消费者订阅的主题 print consumer.assignment() #获取当前消费者topic、分区信息 print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量 consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
6、消费者(订阅多个主题)
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092']) consumer.subscribe(topics=('test','test0')) #订阅要消费的主题 print consumer.topics() print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
7、消费者(手动拉取消息)
from kafka import KafkaConsumer import time consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092']) consumer.subscribe(topics=('test','test0')) while True: msg = consumer.poll(timeout_ms=5) #从kafka获取消息 print msg time.sleep(1)
8、消费者(消息挂起与恢复)
from kafka import KafkaConsumer from kafka.structs import TopicPartition import time consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092']) consumer.subscribe(topics=('test')) consumer.topics() consumer.pause(TopicPartition(topic=u'test', partition=0)) num = 0 while True: print num print consumer.paused() #获取当前挂起的消费者 msg = consumer.poll(timeout_ms=5) print msg time.sleep(2) num = num + 1 if num == 10: print "resume..." consumer.resume(TopicPartition(topic=u'test', partition=0)) print "resume......" pause执行后,consumer不能读取,直到调用resume后恢复。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spring Boot入门(9)网页版计算器
介绍 在写了前八篇Spring Boot项目的介绍文章后,我们已经初步熟悉了利用Spring Boot来做Web应用和数据库的使用方法了,但是这些仅仅是官方介绍的一个例子而已。 本次分享将介绍笔者自己的一个项目:网页版计算器,以这两篇博客为基础: Java之调用Python代码 和 Spring Boot入门(6)前端接受后台传参。因为在Java中并没有类似于Python的eval()函数的功能,所以,为了避免自己写一个计算数学表达式的java代码,我们的解决方法是:用Java调用Python代码来实现。 话不多说,直接上项目! 项目 网页版计算器的整个项目结构如下图: Expression.java为实体类,用于页面中表单提交的数学表达式的处理,其代码如下: package com.hello.operation.Controller; public class Expression { private String expr; public String getExpr() { return expr; } public void setExpr(String ...
- 下一篇
异步社区本周(4.23-4.29)半价电子书
点击关注异步图书,置顶公众号 每天与你分享 IT好书 技术干货 职场知识 《R语言编程指南》 任坤著 点击封面购买纸书 R 语言是从事数据科学和统计学需要的工具之一。强大且复杂的 R 对于初学者和不熟悉其独特特性的人来说可能具有一定的挑战性。本书以一种简单且实践性强的方式来教授读者学习R语言,并逐步建立对R语言广泛、一致的理解。通过实际操作实例,亲身体验强大的 R 工具,并总结 R 的使用方法,你能够更深入地了解如何使用数据。 本书面向数据领域的从业人员,尤其适合想要通过学习R编程及相关工具提升数据处理效率的读者阅读,也适合计算机或统计相关专业的学生参考使用。通过阅读本书,读者将全面掌握R的相关特性及其在数据处理和分析方面的应用,极大地提升自己的专业技能。 《HTML5移动开发》 【美】Estelle Weyl(埃斯特尔 韦尔)著 点击封面购买纸书 树莓派是一个低成本、信用卡大小的计算系统,可以针对渗透测试在内的所有用途进行定制。树莓派凭借其强大的功能(而非低廉的价格)成为知名的平台。Kali是用于进行渗透测试、安全审计的Linux版本。Kali Linux内置了很多渗...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8编译安装MySQL8.0.19
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7,8上快速安装Gitea,搭建Git服务器