Java 代码:
package
com.xunjie.dmsp.olduser;
import
java.util.Properties;
import
cascading.flow.Flow;
import
cascading.flow.FlowConnector;
import
cascading.operation.regex.RegexSplitter;
import
cascading.pipe.Each;
import
cascading.pipe.Pipe;
import
cascading.scheme.TextLine;
import
cascading.tap.Hfs;
import
cascading.tap.Tap;
import
cascading.tuple.Fields;
/**
* test.txt:
* 1 a
* 2 b
* 3 c
*
* /data/hadoop/hadoop/bin/hadoop jar
* dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar
* hdfs:/user/hadoop/test/lky/test.txt
* file:///data/hadoop/test/lky/output
*/
public
class
Test2 {
public
static
void
main(String[] args) {
//
设定输入文件
String sourcePath
=
args[
0
];
//
设置输出文件夹
String sinkPath
=
args[
1
];
//
定义读取列
Fields inputfields
=
new
Fields(
"
num
"
,
"
value
"
);
//
定义分解正则,默认 \t
RegexSplitter spliter
=
new
RegexSplitter(inputfields);
//
管道定义
Pipe p1
=
new
Pipe(
"
test
"
);
//
管道嵌套:
//
分解日志源文件,输出给定字段
p1
=
new
Each(p1,
new
Fields(
"
line
"
) ,spliter);
//
设定输入和输出 ,使用 泛型Hfs
Tap source
=
new
Hfs(
new
TextLine(), sourcePath );
Tap sink
=
new
Hfs(
new
TextLine() , sinkPath );
//
配置job
Properties properties
=
new
Properties();
properties.setProperty(
"
hadoop.job.ugi
"
,
"
hadoop,hadoop
"
);
FlowConnector.setApplicationJarClass( properties, Main.
class
);
FlowConnector flowConnector
=
new
FlowConnector(properties);
Flow importFlow
=
flowConnector.connect(
"
import flow
"
, source,sink,p1);
importFlow.start();
importFlow.complete();
}
}
本文转自博客园刘凯毅的博客,原文链接:hadoop cascading demo,如需转载请自行联系原博主。