Presented by Myles Baker / @mydpy
Resilient Distributed Dataset: An abstraction that enables efficient data reuse in distributed processing environments
>spark-shell
>pyspark
>spark-submit --class <class_name> --master <context_mode> \
><jar_file> [args]
15/04/16 11:50:48 INFO SecurityManager: Changing view acls to: cloudera
15/04/16 11:50:48 INFO SecurityManager: Changing modify acls to: cloudera
15/04/16 11:50:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
15/04/16 11:50:48 INFO HttpServer: Starting HTTP Server
15/04/16 11:50:48 INFO Utils: Successfully started service 'HTTP class server' on port 43049.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
15/04/16 11:50:54 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/04/16 11:50:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/04/16 11:50:54 INFO SecurityManager: Changing view acls to: cloudera
15/04/16 11:50:54 INFO SecurityManager: Changing modify acls to: cloudera
15/04/16 11:50:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
15/04/16 11:50:55 INFO Slf4jLogger: Slf4jLogger started
15/04/16 11:50:55 INFO Remoting: Starting remoting
15/04/16 11:50:55 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.2.15:51331]
15/04/16 11:50:55 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@10.0.2.15:51331]
15/04/16 11:50:55 INFO Utils: Successfully started service 'sparkDriver' on port 51331.
15/04/16 11:50:56 INFO SparkEnv: Registering MapOutputTracker
15/04/16 11:50:56 INFO SparkEnv: Registering BlockManagerMaster
15/04/16 11:50:56 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150416115056-a439
15/04/16 11:50:56 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/04/16 11:50:56 INFO HttpFileServer: HTTP File server directory is /tmp/spark-cc9f46f6-5288-48b0-b106-ee9281b10156
15/04/16 11:50:56 INFO HttpServer: Starting HTTP Server
15/04/16 11:50:56 INFO Utils: Successfully started service 'HTTP file server' on port 41679.
15/04/16 11:50:56 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/04/16 11:50:56 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040
15/04/16 11:50:57 INFO AppClient$ClientActor: Connecting to master spark://quickstart.cloudera:7077...
15/04/16 11:50:57 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150416115057-0001
15/04/16 11:50:57 INFO AppClient$ClientActor: Executor added: app-20150416115057-0001/0 on worker-20150408095110-10.0.2.15-7078 (10.0.2.15:7078) with 2 cores
15/04/16 11:50:57 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150416115057-0001/0 on hostPort 10.0.2.15:7078 with 2 cores, 512.0 MB RAM
15/04/16 11:50:57 INFO AppClient$ClientActor: Executor updated: app-20150416115057-0001/0 is now LOADING
15/04/16 11:50:57 INFO AppClient$ClientActor: Executor updated: app-20150416115057-0001/0 is now RUNNING
15/04/16 11:50:58 INFO NettyBlockTransferService: Server created on 40182
15/04/16 11:50:58 INFO BlockManagerMaster: Trying to register BlockManager
15/04/16 11:50:58 INFO BlockManagerMasterActor: Registering block manager 10.0.2.15:40182 with 265.4 MB RAM, BlockManagerId(, 10.0.2.15, 40182)
15/04/16 11:50:58 INFO BlockManagerMaster: Registered BlockManager
15/04/16 11:51:07 INFO EventLoggingListener: Logging events to hdfs://quickstart.cloudera:8020/user/spark/applicationHistory/app-20150416115057-0001
15/04/16 11:51:13 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
15/04/16 11:51:14 INFO SparkILoop: Created spark context..
Spark context available as sc.
scala> 15/04/16 11:51:18 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@10.0.2.15:47007/user/Executor#-876449197] with ID 0
15/04/16 11:51:20 INFO BlockManagerMasterActor: Registering block manager 10.0.2.15:52204 with 265.4 MB RAM, BlockManagerId(0, 10.0.2.15, 52204)
scala> sc
res38: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5f4d088e
scala> val rdd = sc.textFile("spark-input/boa-constrictor")
15/04/16 12:36:03 INFO MemoryStore: ensureFreeSpace(259918) called with curMem=2248344, maxMem=278302556
15/04/16 12:36:03 INFO MemoryStore: Block broadcast_32 stored as values in memory (estimated size 253.8 KB, free 263.0 MB)
15/04/16 12:36:04 INFO MemoryStore: ensureFreeSpace(21134) called with curMem=2508262, maxMem=278302556
15/04/16 12:36:04 INFO MemoryStore: Block broadcast_32_piece0 stored as bytes in memory (estimated size 20.6 KB, free 263.0 MB)
15/04/16 12:36:04 INFO BlockManagerInfo: Added broadcast_32_piece0 in memory on 10.0.2.15:40182 (size: 20.6 KB, free: 265.2 MB)
15/04/16 12:36:04 INFO BlockManagerMaster: Updated info of block broadcast_32_piece0
15/04/16 12:36:04 INFO SparkContext: Created broadcast 32 from textFile at <console>:12
rdd: org.apache.spark.rdd.RDD[String] = spark-input/boa-constrictor MappedRDD[40] at textFile at <console>:12
scala> val silverstein = rdd.collect
...
silverstein: Array[String] = Array(
Im being swallered by a Boa Constrictor, a Boa Constrictor, a Boa Constrictor,
Im being swallered by a Boa Constrictor,
and I dont - like snakes - one bit!,
Oh no, he swallered my toe.,
Oh gee, he swallered my knee.,
Oh fiddle, he swallered my middle.,
Oh what a pest, he swallered my chest.,
Oh heck, he swallered my neck.,
Oh, dread, he swallered my - (BURP)
)
val BoaConstrictor = rdd.filter(line => line.contains("Boa"))
BoaConstrictor.collect
res39: Array[String] = Array(
Im being swallered by a Boa Constrictor, a Boa Constrictor, a Boa Constrictor,
Im being swallered by a Boa Constrictor
)
val words = rdd.flatMap(line => line.split(" "))
val counts = words.map(word => (word,1)).reduceByKey{case (x,y) => x+y}
counts.collect
res40: Array[(String, Int)] = Array(
(don't,1), (pest,,1), (fiddle,,1), (one,1), (bit!,1), ((BURP),1),
(toe.,1), (Boa,4), (Constrictor,,1), (my,6), (what,1), (dread,,1),
(Constrictor,3), (heck,,1), (neck.,1), (swallered,8), (Oh,,1),
(a,5), (snakes,1), (no,,1), (I,1), (he,6), (Oh,5), (middle.,1),
(by,2), (-,3), (like,1), (I'm,2), (and,1), (chest.,1), (gee,,1), (being,2), (knee.,1)
)
val Boa = counts.filter(pair => pair._1.equals("Boa"))
Boa.collect
res41: Array[(String, Int)] = Array((Boa,4))
import java.util.Arrays;
import java.util.List;
import java.lang.Iterable;
import scala.Tuple2;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.*
import org.apache.spark.api.java.function.*
public class WordCount {
public static void main(String[] args) throws Exception {
String master = "local";
JavaSparkContext sc = new JavaSparkContext(
master, "wordcount", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> rdd = sc.textFile("spark-input/boa-constrictor");
JavaPairRDD<String, Integer> counts = rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}}).mapToPair(new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String x){
return new Tuple2(x, 1);
}}).reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer x, Integer y){ return x+y;}});
counts.saveAsTextFile("output/boa-constrictor");
}
}
>spark-submit --class WordCount --master spark://quickstart.cloudera:7077 \
>spark-clt-demo.jar
Content courtesy of David Der / @davidder
public List<Product> getProducts(List<Order> orders) {
List<Product> products = new ArrayList<Product>();
for (Order order : orders) {
products.addAll(order.getProducts());
}
return products;
}
def products = orders.flatMap(o => o.products)
JavaRDD<String> distFile = sc.textFile("README.md");
// Map each line to multiple word
JavaRDD<String> words = distFile.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
JavaRDD<String> distFile = sc.textFile("README.md");
JavaRDD<String> words =
distFile.flatMap(line -> Arrays.asList(line.split(" ")));