您现在的位置是:首页 > 文章详情

spark-streaming集成Kafka工程实例【转】

日期:2018-11-29点击:610

场景模拟


我试图覆盖工程上最为常用的一个场景:


1)首先,向Kafka里实时的写入订单数据,JSON格式,包含订单ID-订单类型-订单收益


2)然后,spark-streaming每十秒实时去消费kafka中的订单数据,并以订单类型分组统计收益


3)最后,spark-streaming统计结果实时的存入本地MySQL

 

前提条件


安装


1)spark:我使用的yarn-client模式下的spark,环境中集群客户端已经搞定


2)zookeeper:我使用的是这个集群:10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181


3)kafka:我使用的是standalone模式:10.93.21.21:9093


4)mysql:10.93.84.53:3306


语言


python:pykafka,pip install pykafka


java:spark,spark-streaming


下面开始


1、数据写入kafka


  • kafka写入


我们使用pykafka模拟数据实时写入,代码如下:


kafka_producer.py

# -* coding:utf8 *-   import time import json import uuid import random import threading from pykafka import KafkaClient # 创建kafka实例 hosts = '10.93.21.21:9093' client = KafkaClient(hosts=hosts) # 打印一下有哪些topic print client.topics   # 创建kafka producer句柄 topic = client.topics['kafka_spark'] producer = topic.get_producer() # work def work():     while 1:         msg = json.dumps({             "id": str(uuid.uuid4()).replace('-', ''),             "type": random.randint(1, 5),             "profit": random.randint(13, 100)})         producer.produce(msg) # 多线程执行 thread_list = [threading.Thread(target=work) for i in range(10)] for thread in thread_list:     thread.setDaemon(True)     thread.start() time.sleep(60) # 关闭句柄, 退出 producer.stop()

可以看到,我们写入的形式是一个json,订单id是一个uuid,订单类型type从1-5随机,订单收益profit从13-100随机,形如

{"id": ${uid}, "type": 1, "profit": 30}

注意:1)python对kafka的读写不需要借助zookeeper,2)使用多线程的形式写入,让数据量具有一定的规模。


执行producer,会持续写入数据1分钟。

python kafka_producer.py
  • 验证一下

kafka_consumer.py

# -* coding:utf8 *- from pykafka import KafkaClient hosts = '10.93.21.21:9093' client = KafkaClient(hosts=hosts) # 消费者 topic = client.topics['kafka_spark'] consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,                                      consumer_id='test') for message in consumer:     if message is not None:         print message.offset, message.value

 执行,可以消费kafka刚才写入的数据

python kafka_consumer.py


2、spark-streaming


1)先解决依赖


其中比较核心的是spark-streaming和kafka集成包spark-streaming-kafka_2.10,还有spark引擎spark-core_2.10


json和mysql看大家爱好。


pom.xml

<dependencies>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-kafka_2.10</artifactId>             <version>1.6.0</version>         </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming_2.10</artifactId>             <version>1.6.0</version>         </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-core_2.10</artifactId>             <version>1.6.0</version>             <scope>provided</scope>         </dependency>         <dependency>             <groupId>com.alibaba</groupId>             <artifactId>fastjson</artifactId>             <version>1.2.19</version>         </dependency>         <dependency>             <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>5.1.38</version>         </dependency>         <dependency>             <groupId>commons-dbcp</groupId>             <artifactId>commons-dbcp</artifactId>             <version>1.4</version>         </dependency>     </dependencies>

2)MySQL准备


建表

我们的结果去向是MySQL,先建立一个结果表。


id:主键,自增id


type:订单类型


profit:每个spark batch聚合出的订单收益结果


time:时间戳

CREATE TABLE `order` (   `id` int(11) NOT NULL AUTO_INCREMENT,   `type` int(11) DEFAULT NULL,   `profit` int(11) DEFAULT NULL,   `time` mediumtext,   PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8
  • Java客户端

采用了单例线程池的模式简单写了一下。


ConnectionPool.java

package com.xiaoju.dqa.realtime_streaming; import java.sql.Connection; import java.sql.DriverManager; import java.util.LinkedList; public class ConnectionPool {     private static LinkedList<Connection> connectionQueue;     static {         try {             Class.forName("com.mysql.jdbc.Driver");         } catch (ClassNotFoundException e) {             e.printStackTrace();         }     }     public synchronized static Connection getConnection() {         try {             if (connectionQueue == null) {                 connectionQueue = new LinkedList<Connection>();                 for (int i = 0; i < 5; i++) {                     Connection conn = DriverManager.getConnection(                             "jdbc:mysql://10.93.84.53:3306/big_data",                             "root",                             "1234");                     connectionQueue.push(conn);                 }             }         } catch (Exception e) {             e.printStackTrace();         }         return connectionQueue.poll();     }     public  static void returnConnection(Connection conn){connectionQueue.push(conn);} }

3)代码实现


我用java写的,不会用scala很尴尬。


即时用java整个的处理过程依然比较简单。跟常见的wordcount也没有多大的差别。


SparkStreaming特点


spark的特点就是RDD,通过对RDD的操作,来屏蔽分布式运算的复杂度。


而spark-streaming的操作对象是RDD的时间序列DStream,这个序列的生成是跟batch的选取有关。例如我这里Batch是10s一个,那么每隔10s会产出一个RDD,对RDD的切割和序列的生成,spark-streaming对我们透明了。唯一暴露给我们的DStream和原生RDD的使用方式基本一致。


这里需要讲解一下MySQL写入注意的事项。


MySQL写入


在处理mysql写入时使用了foreachPartition方法,即,在foreachPartition中使用borrow mysql句柄。


这样做的原因是:


1)你无法再Driver端创建mysql句柄,并通过序列化的形式发送到worker端


2)如果你在处理rdd中创建mysql句柄,很容易对每一条数据创建一个句柄,在处理过程中很快内存就会溢出。


OrderProfitAgg.java

package com.xiaoju.dqa.realtime_streaming; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.sql.Connection; import java.sql.Statement; import java.util.*; /* *   生产者可以选用kafka自带的producer脚本 *   bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test * */ public class OrderProfitAgg {     public static void main(String[] args) throws InterruptedException {         /*         *   kafka所注册的zk集群         * */         String zkQuorum = "10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181";         /*         *   spark-streaming消费kafka的topic名称, 多个以逗号分隔         * */         String topics = "kafka_spark,kafka_spark2";         /*         *   消费组 group         * */         String group = "bigdata_qa";         /*         *   topic的分区数         * */         int numThreads = 2;         /*         *   选用yarn队列模式, spark-streaming程序的app名称是"order profit"         * */         SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("order profit");         /*         *   创建sc, 全局唯一, 设置logLevel可以打印一些东西到控制台         * */         JavaSparkContext sc = new JavaSparkContext(sparkConf);         sc.setLogLevel("WARN");         /*         *   创建jssc, spark-streaming的batch是每10s划分一个         * */         JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));         /*         *   准备topicMap         * */         Map<String ,Integer> topicMap = new HashMap<String, Integer>();         for (String topic : topics.split(",")) {             topicMap.put(topic, numThreads);         }         /*         *   kafka数据流         * */         List<JavaPairReceiverInputDStream<String, String>> streams = new ArrayList<JavaPairReceiverInputDStream<String, String>>();         for (int i = 0; i < numThreads; i++) {             streams.add(KafkaUtils.createStream(jssc, zkQuorum, group, topicMap));         }         /*         *   从kafka消费数据的RDD         * */         JavaPairDStream<String, String> streamsRDD = streams.get(0);         for (int i = 1; i < streams.size(); i++) {             streamsRDD = streamsRDD.union(streams.get(i));         }         /*         *   kafka消息形如: {"id": ${uuid}, "type": 1, "profit": 35}         *   统计结果, 以type分组的总收益         *   mapToPair, 将kafka消费的数据, 转化为type-profit key-value对         *   reduceByKey, 以type分组, 聚合profit         * */         JavaPairDStream<Integer, Integer> profits = streamsRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {             @Override             public Tuple2<Integer, Integer> call(Tuple2<String, String> s_tuple2) throws Exception {                 JSONObject jsonObject = JSON.parseObject(s_tuple2._2);                 return new Tuple2<Integer, Integer>(jsonObject.getInteger("type"), jsonObject.getInteger("profit"));             }         }).reduceByKey(new Function2<Integer, Integer, Integer>() {             @Override             public Integer call(Integer i1, Integer i2) throws Exception {                 return i1 + i2;             }         });         /*         *   输出结果到MySQL         *   需要为每一个partition创建一个MySQL句柄, 使用foreachPartition         * */         profits.foreachRDD(new Function<JavaPairRDD<Integer, Integer>, Void>() {             @Override             public Void call(JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD) throws Exception {                 integerIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<Integer, Integer>>>() {                     @Override                     public void call(Iterator<Tuple2<Integer, Integer>> tuple2Iterator) throws Exception {                         Connection connection = ConnectionPool.getConnection();                         Statement stmt = connection.createStatement();                         long timestamp = System.currentTimeMillis();                         while(tuple2Iterator.hasNext()) {                             Tuple2<Integer, Integer> tuple = tuple2Iterator.next();                             Integer type = tuple._1;                             Integer profit = tuple._2;                             String sql = String.format("insert into `order` (`type`, `profit`, `time`) values (%s, %s, %s)", type, profit, timestamp);                             stmt.executeUpdate(sql);                         }                         ConnectionPool.returnConnection(connection);                     }                 });                 return null;             }         });         /*         *   开始计算, 等待计算结束         * */         jssc.start();         try {             jssc.awaitTermination();         } catch (Exception ex) {             ex.printStackTrace();         } finally {             jssc.close();         }     } }

4)打包方法


编写pom.xml build tag。


mvn clean package打包即可。


pom.xml

<build>         <plugins>             <plugin>                 <artifactId>maven-assembly-plugin</artifactId>                 <configuration>                     <archive>                         <manifest>                             <!--这里要替换成jar包main方法所在类 -->                             <!--<mainClass>com.bigdata.qa.hotdog.driver.WordCount</mainClass>-->                             <mainClass>com.xiaoju.dqa.realtime_streaming.OrderProfitAgg</mainClass>                         </manifest>                     </archive>                     <descriptorRefs>                         <descriptorRef>jar-with-dependencies</descriptorRef>                     </descriptorRefs>                 </configuration>                 <executions>                     <execution>                         <id>make-assembly</id> <!-- this is used for inheritance merges -->                         <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->                         <goals>                             <goal>single</goal>                         </goals>                     </execution>                 </executions>             </plugin>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <configuration>                     <source>1.6</source>                     <target>1.6</target>                 </configuration>             </plugin>         </plugins>     </build>


3、执行与结果


1)执行kafka_producer.py

python kafka_producer.py

2) 执行spark-streaming

这里使用的是默认参数提交yarn队列。

spark-submit --queue=root.XXXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar

3)查看结果


到MySQL中查看结果,每隔10秒会聚合出type=1-5的5条数据。


例如第一条数据,就是type=4这种类型的业务,在10s内收益是555473元。业务量惊人啊。哈哈。

images/e55NpnGdrZX8hz5B55xwd3c7GDGBK7zH.png


原文链接:https://blog.roncoo.com/article/130711
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章