storm kafka hdfs 详细
package com.curiousby.baoyou.cn.storm;
import java.util.UUID;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* @see com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology
* @Type TerminalInfosAnalysisTopology.java
* @Desc
* @author cmcc-B100036
* @date 2016年12月15日 下午4:54:50
* @version
*/
public class TerminalInfosAnalysisTopology {
private static String topicName = "baoy-topic";
private static String zkRoot = "/kafka" ;
public static void main(String[] args) {
BrokerHosts hosts = new ZkHosts(
"172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka");
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString());
spoutConfig.forceFromStart= false;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//spoutConfig.socketTimeoutMs=60;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("\r\n");
SyncPolicy syncPolicy = new CountSyncPolicy(2);
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.HOURS);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/user/hadoop/storm/").withPrefix("terminalInfo_").withExtension(".log");
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://172.23.27.120:9000/")
.withFileNameFormat(fileNameFormat).withRecordFormat(format)
.withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", kafkaSpout);
builder.setBolt("terminalInfosAnalysisIsValidBolt", new TerminalInfosAnalysisIsValidBolt(),1).shuffleGrouping("kafkaSpout");
builder.setBolt("terminalInfosAnalysisRedisBolt", new TerminalInfosAnalysisRedisBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt");
builder.setBolt("terminalInfosAnalysisHdfsReportBolt", new TerminalInfosAnalysisHdfsReportBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt");
builder.setBolt("terminalInfo", hdfsBolt,1).fieldsGrouping("terminalInfosAnalysisHdfsReportBolt",new Fields("hdfs-terminalinfo"));
// builder.setBolt("terminalInfosAnalysisHdfsBolt", new TerminalInfosAnalysisHdfsBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
try {
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
conf.setMaxSpoutPending(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("terminalInfosAnalysisTopology", conf, builder.createTopology());
}
}
}
public class TerminalInfosAnalysisIsValidBolt extends BaseRichBolt {
private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisIsValidBolt.class);
private OutputCollector collector;
@Override
public void execute(Tuple tuple) {
System.out.println(tuple.size());
logger.info("============================TerminalInfosAnalysisIsValidBolt execute===============================");
for (int i = 0; i < tuple.size(); i++) {
JSONObject formate = TerminalInfos.formate(tuple.getString(i));
TerminalInfos entity = new TerminalInfos();
entity.formate(formate);
if (entity != null && entity.isValid()) {
System.out.println(entity);
collector.emit(tuple, new Values(entity));
collector.ack(tuple);
}
}
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("after_isvalid"));
}
}
public class TerminalInfosAnalysisRedisBolt extends BaseRichBolt {
private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisRedisBolt.class);
private OutputCollector collector;
JedisPool pool;
@Override
public void execute(Tuple tuple) {
Jedis jedis = pool.getResource();
logger.info("============================TerminalInfosAnalysisRedisBolt execute===============================");
for (int i = 0; i < tuple.size(); i++) {
TerminalInfos entity = (TerminalInfos) tuple.getValue(i);
TerminalInfoHeader tih = entity.getTerminalInfoHeader();
String key = tih.getAppId()+"-"+tih.getDeviceToken();
String value = jedis.get(key);
if (value == null || "".equals(value)) {
//
jedis.set( key, JSON.toJSONString(tih));
// insert es all infos
}else{
//update es lastupdatetime
}
}
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
logger.info("============================redis prepare===============================");
this.collector = collector;
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxActive(1000);
config.setMaxIdle(50);
config.setMaxWait(1000l);
config.setTestOnBorrow(false);
this.pool = new JedisPool(config, "172.23.27.120", 6379);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public class TerminalInfosAnalysisHdfsReportBolt extends BaseRichBolt {
private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisHdfsReportBolt.class);
private OutputCollector collector;
@Override
public void execute(Tuple tuple) {
logger.info("============================TerminalInfosAnalysisHdfsReportBolt execute===============================");
for (int i = 0; i < tuple.size(); i++) {
TerminalInfos entity = (TerminalInfos) tuple.getValue(i);
TerminalInfoHeader tih = entity.getTerminalInfoHeader();
StringBuffer sb = new StringBuffer();
sb.append(tih.getAppId()).append(",");
sb.append(tih.getDeviceMac()).append(",");
sb.append(tih.getDeviceId()).append(",");
sb.append(tih.getDeviceToken()).append(",");
sb.append(tih.getDeviceImsi()).append(",");
sb.append(tih.getDeviceModel()).append(",");
sb.append(tih.getDeviceManufacture()).append(",");
sb.append(tih.getChannel()).append(",");
sb.append(tih.getAppKey()).append(",");
sb.append(tih.getUserId()).append(",");
sb.append(tih.getAppVersion()).append(",");
sb.append(tih.getVersionCode()).append(",");
sb.append(tih.getSdkType()).append(",");
sb.append(tih.getOs()).append(",");
sb.append(tih.getCountry()).append(",");
sb.append(tih.getLanguage()).append(",");
sb.append(tih.getTimezone()).append(",");
sb.append(tih.getResolution()).append(",");
sb.append(tih.getAccess()).append(",");
sb.append(tih.getAccessSubtype()).append(",");
sb.append(tih.getCarrier()).append(",");
sb.append(tih.getCpu());
collector.emit(tuple, new Values("hdfs-terminalinfo",sb.toString()));
collector.ack(tuple);
}
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hdfs-terminalinfo", "record"));
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.curiousby.baoy.cn</groupId>
<artifactId>KafkaStormJavaDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SpringKafkaStormDemo</name>
<url>http://maven.apache.org</url>
<!-- properties constant -->
<properties>
<spring.version>4.2.5.RELEASE</spring.version>
<java.version>1.7</java.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.4</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>0.9.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- json start -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<!-- JSON转化 -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<!-- JSON库 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.23</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.2.3</version>
</dependency>
<!-- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.4</version>
<type>jar</type>
</dependency> -->
<!-- Other Dependency -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.2.0</version>
</dependency>
<!-- hdfs start -->
<!--
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.2.0</version>
</dependency>
-->
<!-- <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
-->
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
<build>
<finalName>SpringKafkaStormDemo</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<dependencies>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-compiler-javac</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
<compilerArguments>
<verbose />
<bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath>
</compilerArguments>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.artifactId}_TerminalInfosAnalysisTopology_main_start</finalName>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.artifactId}_main_start</finalName>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
捐助开发者
在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!



