Exercises in the Scala programming language with an emphasis on big data programming and applications in Apache Hadoop and Apache Spark.
Apache Maven
- [P] junit
Scala
Python
[H] Apache Attic
- [H] Apache MRUnit
More
Apache Hadoop
- Sammer, Eric. (2012). Hadoop Operations: A Guide for Developers and Administrators. O'Reilly. Home. GitHub.
- [H][G1][G2] White, Tom. (2015). Hadoop: The Definitive Guide: Storage and Analysis at Internet Scale. 4th Ed. O'Reilly.
Apache HBase
- [H] George, Lars. HBase: The Definitive Guide.
Apache Spark
- Chambers, Bill & Matei Zaharia. (2018). Spark The Definitive Guide: Big Data Processing Made Simple. O'Reilly. GitHub.
- Damji et al. (2020). Learning Spark: Lightning-Fast Data Analytics. 2nd Ed. O'Reilly. GitHub.
- Karau, Holden & Rachel Warren. (2017). High Performance Spark: Best Practices for Scaling & Optimizing Apache Spark. O'Reilly. GitHub.
- Maas, Gerard & Francois Garillot. (2019). Stream Processing with Apache Spark: Best Practices for Scaling and Optimizing Apache Spark. O'Reilly. GitHub.
- Parsian, Mahmoud. (2022). Data Algorithms with Spark: Recipes and Design Patterns for Scaling Up Using PySpark. O'Reilly. GitHub.
- Perrin, Jean-Georges. (2020). Spark in Action: With examples in Java, Python, and Scala. 2nd Ed. Manning.
- Polak, Adi. (2023). Machine Learning with Spark: Designing Distributed ML Platforms with PyTorch, TensorFlow, and MLLib. O'Reilly.
- Ryza et al. (2017). Advanced Analytics with Spark: Patterns for Learning from Data at Scale. 2nd Ed. O'Reilly. GitHub.
- Tandon et al. (2022). Advanced Analytics with PySpark: Patterns for Learning from Data at Scale Using Python and Spark. O'Reilly.
Big Data
- Leskovec, Jure; Anand Rajaraman; & Jeff Ullman. Mining of Massive Datasets. Home.
- Linn, Jimmy & Chris Dyer. (2010). Data-Intensive Text Processing with MapReduce. Home.
MapReduce
- [P] Jeffrey Dean and Sanjay Ghemawat. "MapReduce: Simplified Data Processing on Large Clusters". Communications of the ACM January 2008, Vol 52. No.1.
Scala
- Alexander, Alvin. (2021). Scala Cookbook: Recipes for Object-Oriented and Functional Programming. 2nd Ed. O'Reilly. GitHub.
- Chiusano, Paul & Runar Bjarnason. (2014). Functional Programming in Scala. Manning.
- Odersky, Martin; Lex Spoon; & Bill Venners. (2023). Advanced Programming in Scala. 5th Ed. Artima.
- Odersky, Martin; Lex Spoon; Bill Venners; & Frank Sommers. (2021). Programming in Scala. 5th Ed. Artima.
- Odersky, Martin; Lex Spoon; & Bill Venners. Programming in Scala. 3rd. Ed. Artima. Home.
- Phillips, Andrew & Nermin Šerifovic. (2014). Scala Puzzlers. Artima.
- Wampler, Dean. (2021). Programming Scala: Scalability = Functional Programming + Objects. 3rd Ed. O'Reilly. Home. GitHub.
- [W] Anamorphism
- [W] Apache Hadoop
- [W] Apache Spark
- [W] Big Data
- [W] Catamorphism
- [W] Clustered File System
- [W] Communication Protocol
- [W] Computer Cluster
- [W] Consensus
- [W] Device File
- [W] Disk
- [W] Distributed Computing
- [W] Distributed File System
- [W] Fault Tolerance
- [W] Fold
- [W] Functional Programming
- [W] Hadoop Distributed File System (HDFS)
- [W] Hearbeat
- [W] High-Availability Cluster
- [W] Higher-Order Function
- [W] Lazy Replication
- [W] Line-Oriented ASCII Format
- [W] Map
- [W] Map
- [W] MapReduce
- [W] Multi-Master Replication
- [W] Network Partition
- [W] Node
- [W] PageRank
- [W] Parallel Computing
- [W] Programming Model
- [W] Quiescence
- [W] Quorum
- [W] Reduce
- [W] Replication
- [W] Resource
- [W] Sequence File
- [W] Shard
- [W] Standard Stream
- [W] Yet Another Resource Manager (YARN)
[H] Dan Kifer. CMPSC/DS 410 Programming Models for Big Data. The Pennsylvania State University.
[FILE SHARD]
A large file is split up into pieces called blocks, chunks, or shards (e.g., 64 MB chunk). Shards are replicated and then distributed to different physical machines (3 by default).
[DATA NODE]
A machine that hosts a file shard is called a data node.
[NAME NODE]
The name node is responsible for
- tracking file shards
- the name node stores metadata (file name, file shard name, number of replicas, storage location of shards) in memory
- which data nodes store which shards?
- if a shard is under-replicated, the name node searches for other machines to replicate on
- the name node stores metadata (file name, file shard name, number of replicas, storage location of shards) in memory
- detecting and managing machine failure
- if a data node does not send a heartbeat message, then the name node assumes that the data node has failed
- reading files at the level of the application
[CLIENT]
The application that needs the data is called the client.
The client contacts the name node in order to discover the location of file shards, and then contacts the appropriate data nodes to retrieve them.
Limitations of HDFS
- slower than a native file system due to network communication
- better for a few large files than for many small files
- one 6.4-GB file is 100 64 MB shards: 100 metadata entries are stored in the main memory of the name node
- 6,400 1-MB files: 6,400 metadata entries...
- one 6.4-GB file is 100 64 MB shards: 100 metadata entries are stored in the main memory of the name node
- better for jobs that read the entire file than for jobs that require random access
[MAPREDUCE]
- Mapper
- the mapper generates key-value messages from each line
- can be written in Java instead of Python mrjob via Hadoop streaming
- the mapper generates key-value messages from each line
- Partitioner
- the partitioner decides which reducer receives each key
- write the partitioner in Java
- the partitioner decides which reducer receives each key
- Reducer
- the reducer performs an aggregation with the messages received
- can be written in Java instead of Python mrjob via Hadoop streaming
- the reducer performs an aggregation with the messages received
- Sorter
- the sorter determines the order in which messages are received
- write the sorter in Java
- the sorter determines the order in which messages are received
- Combiner
- the combiner suggests an optimization
parallel read: multiple mappers, single reducer
- problem: the data do not fit on one machine
- multiple mapper nodes read separate file shards in parallel
- each mapper node sends many messages to the reducer node: key "word", value "1"
- the reducer node interprets messages to mean "increment the count of the word by one"
- the reducer node stores the dictionary
mapper node j:
read file shards on mapper node j
for each line:
words = line.split()
for word in words:
send message (word, 1) to the reducer node
reducer node:
receive messages
aggregate words
increment words counts
parallel read, parallel aggregation: multiple mappers, multiple reducers
- problem: the words output does not fit on one machine
- multiple reducers: each reducer is associated with a set of words
- for example, 6 reducers:
- reducer 0 gets words where hash(words) % 6 == 0
- reducer 1 gets words where hash(words) % 6 == 1
- reducer 0 gets words where hash(words) % 6 == 0
- reducer i produces output shard i
mapper node j:
read file shards on mapper node j
for each line:
words = line.split()
for word in words:
determine the appropriate reducer node
send message (word, 1) to the appropriate reducer node
reducer node i:
receive messages
aggregate words
increment words counts
save the output as file shard i to HDFS as a file shard
[MAPPER]
- input: key-value pair
- key = line number, value = line
- key = null, value = line
- output: key-value pairs
Hadoop automatically
- finds mapper nodes that have file shards or can get them quickly
- each mapper node reads its file shards
- for every line, it calls the mapper function on it and saves the results to local disk (not HDFS)
- messages for reducer 1 are saved in one file, sorted by keys assigned to that reducer; messages for reducer 2 are saved in another file, sorted by keys assigned to that reducer; etc.
- key-value pairs generated by the mapper are first save to a file on disk and then sent to reducer nodes
[PARTITIONER]
- the partitioner is a function that takes a key as input and produces a number as output, which represents the reducer the message is sent to
- if the partitioner is not specified, Hadoop uses the default
Hadoop automatically
- reads the output from each mapper
- collects key-value pairs for each reducer into a file sorted by key
- ensures reducers receive their keys in sorted order
- shuffle and sort phase
- the sort order may be specified
[REDUCER]
- input: key, list of values (i.e., a list of values from all the messages that share a key)
- output: key-value pair
Hadoop automatically
- collects the incoming key-value pair files and merges them to maintain the sorted order
- groups by key in order to generate the key-valuelist pair
- calls
reducer(key, valuelist)
on each key-valuelist pair - saves the output to HDFS
- reducer input is locally sorted, not globally sorted
[EXAMPLE]
mapper node 1 input shards
That is that
to be or
not to be
mapper node 2 input shard
That is big
mapper node 1 function outputs
(That, 1)
(is, 1)
(that, 1)
(to, 1)
(be, 1)
(or, 1)
(not, 1)
(to, 1)
(be, 1)
mapper node 2 function output
(That, 1)
(is, 1)
(big, 1)
mapper node 1 function outputs sorted for reducer 1
(be, 1)
(be, 1)
(not, 1)
(that, 1)
(That, 1)
mapper node 1 function outputs sorted for reducer 2
(is, 1)
(or, 1)
(to, 1)
(to, 1)
mapper node 2 function outputs sorted for reducer 1
(big, 1)
(That, 1)
mapper node 2 function outputs sorted for reducer 2
(is, 1)
reducer node 1 input
be, [1, 1]
big, [1]
not, [1]
that, [1]
That, [1, 1]
reducer node 2 input
is, [1, 1]
or, [1]
to, [1, 1]
reducer node 1 output
be 2
big 1
not 1
that 1
That 2
reducer node 2 output
is 2
or 1
to 1
[COUNTER]
- Hadoop supports counters for maintaining job statistics
- counter work in a distributed setting because increment/decrement are associative: increment/decrement messages can arrive out of order; the order does not matter for the final result
- counters are useful only when the job has finished
[HADOOP STREAMING]
Hadoop supports other languages via Hadoop streaming. The mapper and reducer functions are written in the desired language. The key-value inputs are sent to stdin and the key-value outputs are sent to stdout. Tabs, spaces, non-ascii characters, and language encodings can cause crashes. Hadoop streaming is slower than using Java because it requires running code outside of, and communicating with, the JVM.
YARN allows Hadoop and Spark to coexist on a cluster.
components
- app master - starts up in the job's first container and negotiates additional containers from the resource manager; tracks the job's progress
- container - a share of CPU, memory, etc. (analogous to a VM)
- node manager - monitors containers and reports usage to the resource manager
- resource manager - the scheduler maintains a queue of waiting jobs and the applications manager accepts new jobs and determines its first container
[FAULT TOLERANCE]
worker nodes (mappers and reducers) send heartbeat messages
- indirectly via key-value messages
- directly via hearbeat messages (mrjob's
.set_status()
)
worker node failure is determined by a lack of heartbeat messages and the worker node's job is reallocated to another worker node
[mrjob]
- Hadoop's native language is Java
- writing jobs in Java provides type safety, and more control over such things as the partitioner
- mrjob is a convenient Python interface to Hadoop's Java code
- using mrjob is slower than using Java: mrjob uses Hadoop streaming interface to communicate with the Java code via stdin and stdout
- mrjob provides less control over some useful components like the partitioner
[IN MEMORY COMBINING]
- combining messages inside the mapper is called in memory combining
- Hadoop combiners do not guarantee when/if/how-many-times a combiner will run, so in memory combining is preferred
- don't let data stored by the mapper get too large, otherwise it will not fit in memory
- avoid small data solutions
- flush the cache: output key-value pairs when the cache gets too big, and then flush the cache
- reduces network traffic
- increases the speed of the shuffle-and-sort phase because there's less data to shuffle and sort
- helps to avoid overloading a reducer with too many messages