scala> val rdd_a = sc.textFile(
"hdfs://master:9000/tmp/wordcount.txt"
)
15
/03/24
13:20:31 INFO storage.MemoryStore: ensureFreeSpace(141503) called with curMem=0, maxMem=311387750
15
/03/24
13:20:31 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 138.2 KB,
free
296.8
MB)rdd_a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
scala> rdd_a.first()
15
/03/24
13:25:31 INFO mapred.FileInputFormat: Total input paths to process : 1
15
/03/24
13:25:31 INFO spark.SparkContext: Starting job: first at <console>:15
15
/03/24
13:25:31 INFO scheduler.DAGScheduler: Got job 0 (first at <console>:15) with 1 output partitions (allowLocal=
true
)
15
/03/24
13:25:31 INFO scheduler.DAGScheduler: Final stage: Stage 0(first at <console>:15)
15
/03/24
13:25:31 INFO scheduler.DAGScheduler: Parents of final stage: List()
15
/03/24
13:25:31 INFO scheduler.DAGScheduler: Missing parents: List()
15
/03/24
13:25:31 INFO scheduler.DAGScheduler: Computing the requested partition locally
15
/03/24
13:25:31 INFO rdd.HadoopRDD: Input
split
: hdfs:
//master
:9000
/tmp/wordcount
.txt:0+26
15
/03/24
13:25:31 INFO Configuration.deprecation: mapred.tip.
id
is deprecated. Instead, use mapreduce.task.
id
15
/03/24
13:25:31 INFO Configuration.deprecation: mapred.task.
id
is deprecated. Instead, use mapreduce.task.attempt.
id
15
/03/24
13:25:31 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15
/03/24
13:25:31 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15
/03/24
13:25:31 INFO Configuration.deprecation: mapred.job.
id
is deprecated. Instead, use mapreduce.job.
id
15
/03/24
13:25:32 INFO spark.SparkContext: Job finished: first at <console>:15, took 0.397477806 s
res1: String = hello world
scala> rdd_a.collect()
15
/03/24
14:00:32 INFO mapred.FileInputFormat: Total input paths to process : 1
15
/03/24
14:00:32 INFO spark.SparkContext: Starting job: collect at <console>:15
15
/03/24
14:00:32 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:15) with 2 output partitions (allowLocal=
false
)
15
/03/24
14:00:32 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:15)
15
/03/24
14:00:32 INFO scheduler.DAGScheduler: Parents of final stage: List()
15
/03/24
14:00:32 INFO scheduler.DAGScheduler: Missing parents: List()
15
/03/24
14:00:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at <console>:12),
which
has no missing parents
15
/03/24
14:00:32 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[1] at textFile at <console>:12)
15
/03/24
14:00:32 INFO scheduler.TaskSchedulerImpl: Adding task
set
0.0 with 2 tasks
15
/03/24
14:00:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 1: slave2 (NODE_LOCAL)
15
/03/24
14:00:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 1725 bytes
in
5 ms
15
/03/24
14:00:32 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 1: slave2 (NODE_LOCAL)
15
/03/24
14:00:32 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 1725 bytes
in
0 ms
15
/03/24
14:00:38 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
15
/03/24
14:00:38 INFO scheduler.TaskSetManager: Finished TID 1
in
5942 ms on slave2 (progress: 1
/2
)
15
/03/24
14:00:38 INFO scheduler.TaskSetManager: Finished TID 0
in
5974 ms on slave2 (progress: 2
/2
)
15
/03/24
14:00:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15
/03/24
14:00:38 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
15
/03/24
14:00:38 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:15) finished
in
6.015 s
15
/03/24
14:00:38 INFO spark.SparkContext: Job finished: collect at <console>:15, took 6.133297026 s
res0: Array[String] = Array(hello world, hello world1, hello world1, hello world1,
""
)
scala> val rdd_b = rdd_a.flatMap((line => line.
split
(
" "
))).map(word => (word, 1))
rdd_b: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[3] at map at <console>:14
scala> rdd_b.collect()
15
/03/24
14:11:41 INFO spark.SparkContext: Starting job: collect at <console>:17
15
/03/24
14:11:41 INFO scheduler.DAGScheduler: Got job 1 (collect at <console>:17) with 2 output partitions (allowLocal=
false
)
15
/03/24
14:11:41 INFO scheduler.DAGScheduler: Final stage: Stage 1(collect at <console>:17)
15
/03/24
14:11:41 INFO scheduler.DAGScheduler: Parents of final stage: List()
15
/03/24
14:11:41 INFO scheduler.DAGScheduler: Missing parents: List()
15
/03/24
14:11:41 INFO scheduler.DAGScheduler: Submitting Stage 1 (MappedRDD[3] at map at <console>:14),
which
has no missing
parents15
/03/24
14:11:41 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[3] at map at <console>:14)
15
/03/24
14:11:41 INFO scheduler.TaskSchedulerImpl: Adding task
set
1.0 with 2 tasks
15
/03/24
14:11:42 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 1: slave2 (NODE_LOCAL)
15
/03/24
14:11:42 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 1816 bytes
in
0 ms
15
/03/24
14:11:42 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 1: slave2 (NODE_LOCAL)
15
/03/24
14:11:42 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 1816 bytes
in
0 ms
15
/03/24
14:11:42 INFO scheduler.TaskSetManager: Finished TID 2
in
177 ms on slave2 (progress: 1
/2
)
15
/03/24
14:11:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0)
15
/03/24
14:11:42 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1)
15
/03/24
14:11:42 INFO scheduler.TaskSetManager: Finished TID 3
in
207 ms on slave2 (progress: 2
/2
)
15
/03/24
14:11:42 INFO scheduler.DAGScheduler: Stage 1 (collect at <console>:17) finished
in
0.209 s
15
/03/24
14:11:42 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15
/03/24
14:11:42 INFO spark.SparkContext: Job finished: collect at <console>:17, took 0.279714483 s
res1: Array[(String, Int)] = Array((hello,1), (world,1), (hello,1), (world1,1), (hello,1), (world1,1), (hello,1), (world1,1),
(
""
,1))
scala> val rdd_c = rdd_b.reduceByKey((a, b) => a + b)
rdd_c: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at <console>:16
scala> rdd_c.collect()
15
/03/24
14:14:42 INFO spark.SparkContext: Starting job: collect at <console>:19
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at <console>:16)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Got job 2 (collect at <console>:19) with 2 output partitions (allowLocal=
false
)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Final stage: Stage 2(collect at <console>:19)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 3)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Missing parents: List(Stage 3)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[4] at reduceByKey at <console>:16),
which
has no missing parents15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 3 (MapPartitionsRDD[4] at reduceByKey at
<console>:16)15
/03/24
14:14:43 INFO scheduler.TaskSchedulerImpl: Adding task
set
3.0 with 2 tasks
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Starting task 3.0:0 as TID 4 on executor 1: slave2 (NODE_LOCAL)
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Serialized task 3.0:0 as 2074 bytes
in
36 ms
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Starting task 3.0:1 as TID 5 on executor 1: slave2 (NODE_LOCAL)
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Serialized task 3.0:1 as 2074 bytes
in
0 ms
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Finished TID 4
in
282 ms on slave2 (progress: 1
/2
)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3, 0)
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Finished TID 5
in
241 ms on slave2 (progress: 2
/2
)
15
/03/24
14:14:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3, 1)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at <console>:16) finished
in
0.286 s
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: looking
for
newly runnable stages
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: running: Set()
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: waiting: Set(Stage 2)
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: failed: Set()
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Missing parents
for
Stage 2: List()
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[6] at reduceByKey at <console>:16),
which
is now runnable15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 2 (MapPartitionsRDD[6] at reduceByKey at
<console>:16)15
/03/24
14:14:43 INFO scheduler.TaskSchedulerImpl: Adding task
set
2.0 with 2 tasks
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 6 on executor 1: slave2 (PROCESS_LOCAL)
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 1953 bytes
in
1 ms
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Starting task 2.0:1 as TID 7 on executor 0: slave1 (PROCESS_LOCAL)
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Serialized task 2.0:1 as 1953 bytes
in
0 ms
15
/03/24
14:14:43 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations
for
shuffle 0 to spark@slave2:374
0415
/03/24
14:14:43 INFO spark.MapOutputTrackerMaster: Size of output statuses
for
shuffle 0 is 136 bytes
15
/03/24
14:14:43 INFO scheduler.DAGScheduler: Completed ResultTask(2, 0)
15
/03/24
14:14:43 INFO scheduler.TaskSetManager: Finished TID 6
in
211 ms on slave2 (progress: 1
/2
)
15
/03/24
14:14:45 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations
for
shuffle 0 to spark@slave1:57339
15
/03/24
14:14:46 INFO scheduler.DAGScheduler: Completed ResultTask(2, 1)
15
/03/24
14:14:46 INFO scheduler.TaskSetManager: Finished TID 7
in
3192 ms on slave1 (progress: 2
/2
)
15
/03/24
14:14:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15
/03/24
14:14:46 INFO scheduler.DAGScheduler: Stage 2 (collect at <console>:19) finished
in
3.193 s
15
/03/24
14:14:46 INFO spark.SparkContext: Job finished: collect at <console>:19, took 3.634568622 s
res2: Array[(String, Int)] = Array((
""
,1), (hello,4), (world,1), (world1,3))
scala> rdd_c.cache()
res3: rdd_c.
type
= MapPartitionsRDD[6] at reduceByKey at <console>:16
scala> rdd_c.saveAsTextFile(
"hdfs://master:9000/tmp/spark_result"
)
15
/03/24
14:17:57 INFO Configuration.deprecation: mapred.tip.
id
is deprecated. Instead, use mapreduce.task.
id
15
/03/24
14:17:57 INFO Configuration.deprecation: mapred.task.
id
is deprecated. Instead, use mapreduce.task.attempt.
id
15
/03/24
14:17:57 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15
/03/24
14:17:57 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15
/03/24
14:17:57 INFO Configuration.deprecation: mapred.job.
id
is deprecated. Instead, use mapreduce.job.
id
15
/03/24
14:17:58 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:19
15
/03/24
14:17:58 INFO scheduler.DAGScheduler: Got job 3 (saveAsTextFile at <console>:19) with 2 output partitions (allowLocal
=
false
)15
/03/24
14:17:58 INFO scheduler.DAGScheduler: Final stage: Stage 4(saveAsTextFile at <console>:19)
15
/03/24
14:17:58 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 5)
15
/03/24
14:17:58 INFO scheduler.DAGScheduler: Missing parents: List()
15
/03/24
14:17:58 INFO scheduler.DAGScheduler: Submitting Stage 4 (MappedRDD[8] at saveAsTextFile at <console>:19),
which
has
no missing parents15
/03/24
14:17:58 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 4 (MappedRDD[8] at saveAsTextFile at <con
sole>:19)15
/03/24
14:17:58 INFO scheduler.TaskSchedulerImpl: Adding task
set
4.0 with 2 tasks
15
/03/24
14:17:58 INFO scheduler.TaskSetManager: Starting task 4.0:0 as TID 8 on executor 0: slave1 (PROCESS_LOCAL)
15
/03/24
14:17:58 INFO scheduler.TaskSetManager: Serialized task 4.0:0 as 11506 bytes
in
1 ms
15
/03/24
14:17:58 INFO scheduler.TaskSetManager: Starting task 4.0:1 as TID 9 on executor 1: slave2 (PROCESS_LOCAL)
15
/03/24
14:17:58 INFO scheduler.TaskSetManager: Serialized task 4.0:1 as 11506 bytes
in
0 ms
15
/03/24
14:17:58 INFO storage.BlockManagerInfo: Added rdd_6_1
in
memory on slave2:37855 (size: 216.0 B,
free
: 297.0 MB)
15
/03/24
14:17:58 INFO storage.BlockManagerInfo: Added rdd_6_0
in
memory on slave1:48694 (size: 408.0 B,
free
: 297.0 MB)
15
/03/24
14:17:58 INFO scheduler.TaskSetManager: Finished TID 9
in
653 ms on slave2 (progress: 1
/2
)
15
/03/24
14:17:58 INFO scheduler.DAGScheduler: Completed ResultTask(4, 1)
15
/03/24
14:18:00 INFO scheduler.DAGScheduler: Completed ResultTask(4, 0)
15
/03/24
14:18:00 INFO scheduler.TaskSetManager: Finished TID 8
in
2104 ms on slave1 (progress: 2
/2
)
15
/03/24
14:18:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
15
/03/24
14:18:00 INFO scheduler.DAGScheduler: Stage 4 (saveAsTextFile at <console>:19) finished
in
2.105 s
15
/03/24
14:18:00 INFO spark.SparkContext: Job finished: saveAsTextFile at <console>:19, took 2.197440038 s
[hadoop@master ~]$ hadoop dfs -
cat
/tmp/spark_result/
*
DEPRECATED: Use of this script to execute hdfs
command
is deprecated.
Instead use the hdfs
command
for
it.
15
/03/24
14:19:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library
for
your platform... using
builtin
-java cla
sses where applicable(,1)
(hello,4)
(world,1)
(world1,3)