From c240bde8c0972e80ae06f8383398794bb6e411ed Mon Sep 17 00:00:00 2001 From: Donne Martin Date: Sun, 24 May 2015 10:49:36 -0400 Subject: [PATCH] Updated notebook to v3. --- spark/spark.ipynb | 2514 +++++++++++++++++++++++---------------------- 1 file changed, 1289 insertions(+), 1225 deletions(-) diff --git a/spark/spark.ipynb b/spark/spark.ipynb index 82caf9a..f619b7b 100644 --- a/spark/spark.ipynb +++ b/spark/spark.ipynb @@ -1,1228 +1,1292 @@ { - "metadata": { - "name": "", - "signature": "sha256:f13a3255902ddadfe3c0b99e722529cfe44804d81633f80f0f7681afcf3acb8a" - }, - "nbformat": 3, - "nbformat_minor": 0, - "worksheets": [ + "cells": [ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Spark\n", - "\n", - "* Python Shell\n", - "* RDDs\n", - "* Pair RDDs\n", - "* Running Spark on a Cluster\n", - "* Viewing the Spark Application UI\n", - "* Working with Partitions\n", - "* Caching RDDs\n", - "* Checkpointing RDDs\n", - "* Writing and Running a Spark Application\n", - "* Configuring Spark Applications\n", - "* Streaming\n", - "* Streaming with States\n", - "* Broadcast Variables\n", - "* Accumulators" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Python Shell" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Start the pyspark shell:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "pyspark" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "View the spark context:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "sc" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## RDDs" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create an RDD from the contents of a directory:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "my_data = sc.textFile(\"file:/path/*\")" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Count the number of lines in the data:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "my_data.count()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Display the data in the data:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "my_data.collect()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Return the first 10 lines in the data:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "my_data.take(10)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create an RDD with lines matching the given filter:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "my_data.filter(lambda line: \".txt\" in line)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Chain a series of commands:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "sc.textFile(\"file:/path/file.txt\") \\\n", - " .filter(lambda line: \".txt\" in line) \\\n", - " .count()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create a new RDD mapping each line to an array of words, taking only the first word of each array:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "first_words = my_data.map(lambda line: line.split()[0])" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Output each word in first_words:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "for word in first_words.take(10):\n", - " print word" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Save the first words to a text file:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "first_words.saveAsTextFile(\"file:/path/file\")" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Pair RDDs\n", - "\n", - "Pair RDDs contain elements that are key-value pairs. Keys and values can be any type." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Given a log file with the following space deilmited format: [date_time, user_id, ip_address, action], map each request to (user_id, 1):" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "DATE_TIME = 0\n", - "USER_ID = 1\n", - "IP_ADDRESS = 2\n", - "ACTION = 3\n", - "\n", - "log_data = sc.textFile(\"file:/path/*\")\n", - "\n", - "user_actions = log_data \\\n", - " .map(lambda line: line.split()) \\\n", - " .map(lambda words: (words[USER_ID], 1)) \\\n", - " .reduceByKey(lambda count1, count2: count1 + count2)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Show the top 5 users by count, sorted in descending order:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "user_actions.map(lambda pair: (pair[0], pair[1])).sortyByKey(False).take(5)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Group IP addresses by user id:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "user_ips = log_data \\\n", - " .map(lambda line: line.split()) \\\n", - " .map(lambda words: (words[IP_ADDRESS],words[USER_ID])) \\\n", - " .groupByKey()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Given a user table with the following csv format: [user_id, user_info0, user_info1, ...], map each line to (user_id, [user_info...]):" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "user_data = sc.textFile(\"file:/path/*\")\n", - "\n", - "user_profile = user_data \\\n", - " .map(lambda line: line.split(',')) \\\n", - " .map(lambda words: (words[0], words[1:]))" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Inner join the user_actions and user_profile RDDs:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "user_actions_with_profile = user_actions.join(user_profile)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Show the joined table:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "for (user_id, (user_info, count)) in user_actions_with_profiles.take(10):\n", - " print user_id, count, user_info" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Running Spark on a Cluster" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Start the standalone cluster's Master and Worker daemons:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!sudo service spark-master start\n", - "!sudo service spark-worker start" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Stop the standalone cluster's Master and Worker daemons:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!sudo service spark-master stop\n", - "!sudo service spark-worker stop" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Restart the standalone cluster's Master and Worker daemons:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!sudo service spark-master stop\n", - "!sudo service spark-worker stop" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "View the Spark standalone cluster UI:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "http://localhost:18080//" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Start the Spark shell and connect to the cluster:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!MASTER=spark://localhost:7077 pyspark" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Confirm you are connected to the correct master:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "sc.master" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Viewing the Spark Application UI" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "From the following [reference](http://spark.apache.org/docs/1.2.0/monitoring.html):\n", - "\n", - "Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:\n", - "\n", - "A list of scheduler stages and tasks\n", - "A summary of RDD sizes and memory usage\n", - "Environmental information.\n", - "Information about the running executors\n", - "\n", - "You can access this interface by simply opening http://:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).\n", - "\n", - "Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage." - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "http://localhost:4040/" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Working with Partitions" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "From the following [reference](http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/):\n", - "\n", - "The Spark map() and flatMap() methods only operate on one input at a time, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "val dbConnection = ...\n", - "lines.map(... dbConnection.createStatement(...) ...)\n", - "dbConnection.close() // Wrong!" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "However, this fails for several reasons:\n", - "\n", - "* It puts the object dbConnection into the map function\u2019s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.\n", - "* map() is a transformation, rather than an operation, and is lazily evaluated. The connection can\u2019t be closed immediately here.\n", - "* Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark \u2014 it\u2019s the important mapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It\u2019s like a \u201cbulk map\u201d method. This means that the mapPartitions() function can allocate resources locally at its start, and release them when done mapping many values." - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "def count_txt(partIter):\n", - " for line in partIter: \n", - " if \".txt\" in line: txt_count += 1\n", - " yield (txt_count)\n", - "\n", - "my_data = sc.textFile(\"file:/path/*\") \\\n", - " .mapPartitions(count_txt) \\\n", - " .collect()\n", - " \n", - "# Show the partitioning \n", - "print \"Data partitions: \", my_data.toDebugString()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Caching RDDs" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Caching an RDD saves the data in memory. Caching is a suggestion to Spark as it is memory dependent.\n", - "\n", - "By default, every RDD operation executes the entire lineage. Caching can boost performance for datasets that are likely to be used by saving this expensive recomputation and is ideal for iterative algorithms or machine learning.\n", - "\n", - "* cache() stores data in memory\n", - "* persist() stores data in MEMORY_ONLY, MEMORY_AND_DISK (spill to disk), and DISK_ONLY\n", - "\n", - "Disk memory is stored on the node, not on HDFS.\n", - "\n", - "Replication is possible by using MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. If a cached partition becomes unavailable, Spark recomputes the partition through the lineage.\n", - "\n", - "Serialization is possible with MEMORY_ONLY_SER and MEMORY_AND_DISK_SER. This is more space efficient but less time efficient, as it uses Java serialization by default." - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "# Cache RDD to memory\n", - "my_data.cache()\n", - "\n", - "# Persist RDD to both memory and disk (if memory is not enough), with replication of 2\n", - "my_data.persist(MEMORY_AND_DISK_2)\n", - "\n", - "# Unpersist RDD, removing it from memory and disk\n", - "my_data.unpersist()\n", - "\n", - "# Change the persistence level after unpersist\n", - "my_data.persist(MEMORY_AND_DISK)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Checkpointing RDDs\n", - "\n", - "Caching maintains RDD lineage, providing resilience. If the lineage is very long, it is possible to get a stack overflow.\n", - "\n", - "Checkpointing saves the data to HDFS, which provide fault tolerant storage across nodes. HDFS is not as fast as local storage for both reading and writing. Checkpointing is good for long lineages and for very large data sets that might not fit on local storage. Checkpointing removes lineage." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create a checkpoint and perform an action by calling count() to materialize the checkpoint and save it to the checkpoint file:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "# Enable checkpointing by setting the checkpoint directory, \n", - "# which will contain all checkpoints for the given data:\n", - "sc.setCheckpointDir(\"checkpoints\")\n", - "\n", - "my_data = sc.parallelize([1,2,3,4,5])\n", - "\n", - "# Long loop that may cause a stack overflow\n", - "for i in range(1000):\n", - " my_data = mydata.map(lambda myInt: myInt + 1)\n", - "\n", - " if i % 10 == 0: \n", - " my_data.checkpoint()\n", - " my_data.count()\n", - "\n", - "my_data.collect()\n", - " \n", - "# Display the lineage\n", - "for rddstring in my_data.toDebugString().split('\\n'): \n", - " print rddstring.strip()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Writing and Running a Spark Application" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create a Spark application to count the number of text files:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "import sys\n", - "from pyspark import SparkContext\n", - "\n", - "if __name__ == \"__main__\":\n", - " if len(sys.argv) < 2:\n", - " print >> sys.stderr, \"Usage: App Name \"\n", - " exit(-1)\n", - " \n", - " count_text_files()\n", - " \n", - "def count_text_files():\n", - " sc = SparkContext()\n", - " logfile = sys.argv[1]\n", - " text_files_count = sc.textFile(logfile)\n", - " .filter(lambda line: '.txt' in line)\n", - " text_files_count.cache()\n", - " print(\"Number of text files: \", text_files_count.count())" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Submit the script to Spark for processing:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!spark-submit --properties-file dir/myspark.conf script.py data/*" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Configuring Spark Applications" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Run a Spark app and set the configuration options in the command line:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!spark-submit --master spark//localhost:7077 --name 'App Name' script.py data/*" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Configure spark.conf:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "spark.app.name App Name\n", - "spark.ui.port 4141\n", - "spark.master spark://localhost:7077" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Run a Spark app and set the configuration options through spark.conf:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!spark-submit --properties-file spark.conf script.py data/*" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Set the config options programmatically:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "sconf = SparkConf() \\\n", - " .setAppName(\"Word Count\") \\\n", - " .set(\"spark.ui.port\",\"4141\")\n", - "sc = SparkContext(conf=sconf)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Set logging levels located in the following file, or place a copy in your pwd:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "$SPARK_HOME/conf/log4j.properties.template" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Streaming" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Start the Spark Shell locally with at least two threads (need a minimum of two threads for streaming, one for receiving, one for processing):" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "!spark-shell --master local[2]" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create a StreamingContext (similar to SparkContext in core Spark) with a batch duration of 1 second:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "val ssc = new StreamingContext(new SparkConf(), Seconds(1)) \n", - "val my_stream = ssc.socketTextStream(hostname, port)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Get a DStream from a streaming data source (text from a socket):" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "val logs = ssc.socketTextStream(hostname, port)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "DStreams support regular transformations such as map, flatMap, and filter, and pair transformations such as reduceByKey, groupByKey, and joinByKey.\n", - "\n", - "Apply a DStream operation to each batch of RDDs (count up requests by user id, reduce by key to get the count):" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "val requests = my_stream\n", - " .map(line => (line.split(\" \")(2), 1))\n", - " .reduceByKey((x, y) => x + y)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The transform(function) method creates a new DStream by executing the input function on the RDDs." - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "val sorted_requests = requests\n", - " .map(pair => pair.swap)\n", - " .transform(rdd => rdd.sortByKey(false))" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "foreachRDD(function) performs a function on each RDD in the DStream (map is like a shortcut not requiring you to get the RDD first before doing an operation):" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "sorted_requests.foreachRDD((rdd, time) => {\n", - " println(\"Top users @ \" + time)\n", - " rdd.take(5).foreach(\n", - " pair => printf(\"User: %s (%s)\\n\", pair._2, pair._1))\n", - "}" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Save the DStream result part files with the given folder prefix, the actual folder will be /dir/requests-timestamp0/:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "requests.saveAsTextFiles(\"/dir/requests\")" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Start the execution of all DStreams:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "ssc.start()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Wait for all background threads to complete before ending the main thread:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "ssc.awaitTermination()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Streaming with States" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Enable checkpointing to prevent infinite lineages:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "ssc.checkpoint(\"dir\")" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Compute a DStream based on the previous states plus the current state:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "def updateCount = (newCounts: Seq[Int], state: Option[Int]) => {\n", - " val newCount = newCounts.foldLeft(0)(_ + _)\n", - " val previousCount = state.getOrElse(0)\n", - " Some(newCount + previousCount)\n", - "}\n", - "\n", - "val totalUserreqs = userreqs.updateStateByKey(updateCount)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Compute a DStream based Sliding window, every 30 seconds, count requests by user over the last 5 minutes:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "val reqcountsByWindow = logs.map(line => (line.split(' ')(2), 1))\n", - " .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Minutes(5), Seconds(30))" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Collect statistics with the StreamingListener API:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "// define listener\n", - "class MyListener extends StreamingListener {\n", - " override def onReceiverStopped(...) {\n", - " streamingContext.stop()\n", - " }\n", - "} \n", - "\n", - "// attach listener\n", - "streamingContext. addStreamingListener(new MyListener())" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Broadcast Variables" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Read in list of items to broadcast from a local file:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "broadcast_file = \"broadcast.txt\"\n", - "broadcast_list = list(map(lambda l: l.strip(), open(broadcast_file)))" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Broadcast the target list to all workers:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "broadcast_list_sc = sc.broadcast(broadcast_list)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Filter based on the broadcast list:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "log_file = \"hdfs://localhost/user/logs/*\"\n", - "filtered_data = sc.textFile(log_file)\\\n", - " .filter(lambda line: any(item in line for item in broadcast_list_sc.value))\n", - "\n", - "filtered_data.take(10)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Accumulators" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Create an accumulator:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "txt_count = sc.accumulator(0)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Count the number of txt files in the RDD:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "my_data = sc.textFile(filePath)\n", - "my_data.foreach(lambda line: if '.txt' in line: txt_count.add(1))" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Count the number of file types encountered:" - ] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "jpg_count = sc.accumulator(0)\n", - "html_count = sc.accumulator(0)\n", - "css_count = sc.accumulator(0)\n", - "\n", - "def countFileType(s):\n", - " if '.jpg' in s: jpg_count.add(1)\n", - " elif '.html' in s: html_count.add(1)\n", - " elif '.css' in s: css_count.add(1)\n", - "\n", - "filename=\"hdfs://logs/*\"\n", - "\n", - "logs = sc.textFile(filename)\n", - "logs.foreach(lambda line: countFileType(line))\n", - "\n", - "print 'File Type Totals:'\n", - "print '.css files: ', css_count.value\n", - "print '.html files: ', html_count.value\n", - "print '.jpg files: ', jpg_count.value" - ], - "language": "python", - "metadata": {}, - "outputs": [] - } - ], - "metadata": {} + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Spark\n", + "\n", + "* Python Shell\n", + "* RDDs\n", + "* Pair RDDs\n", + "* Running Spark on a Cluster\n", + "* Viewing the Spark Application UI\n", + "* Working with Partitions\n", + "* Caching RDDs\n", + "* Checkpointing RDDs\n", + "* Writing and Running a Spark Application\n", + "* Configuring Spark Applications\n", + "* Streaming\n", + "* Streaming with States\n", + "* Broadcast Variables\n", + "* Accumulators" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Python Shell" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the pyspark shell:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "pyspark" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View the spark context:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "sc" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## RDDs" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create an RDD from the contents of a directory:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "my_data = sc.textFile(\"file:/path/*\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Count the number of lines in the data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "my_data.count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Display the data in the data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "my_data.collect()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Return the first 10 lines in the data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "my_data.take(10)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create an RDD with lines matching the given filter:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "my_data.filter(lambda line: \".txt\" in line)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Chain a series of commands:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "sc.textFile(\"file:/path/file.txt\") \\\n", + " .filter(lambda line: \".txt\" in line) \\\n", + " .count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a new RDD mapping each line to an array of words, taking only the first word of each array:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "first_words = my_data.map(lambda line: line.split()[0])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Output each word in first_words:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "for word in first_words.take(10):\n", + " print word" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Save the first words to a text file:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "first_words.saveAsTextFile(\"file:/path/file\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Pair RDDs\n", + "\n", + "Pair RDDs contain elements that are key-value pairs. Keys and values can be any type." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Given a log file with the following space deilmited format: [date_time, user_id, ip_address, action], map each request to (user_id, 1):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "DATE_TIME = 0\n", + "USER_ID = 1\n", + "IP_ADDRESS = 2\n", + "ACTION = 3\n", + "\n", + "log_data = sc.textFile(\"file:/path/*\")\n", + "\n", + "user_actions = log_data \\\n", + " .map(lambda line: line.split()) \\\n", + " .map(lambda words: (words[USER_ID], 1)) \\\n", + " .reduceByKey(lambda count1, count2: count1 + count2)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Show the top 5 users by count, sorted in descending order:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "user_actions.map(lambda pair: (pair[0], pair[1])).sortyByKey(False).take(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Group IP addresses by user id:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "user_ips = log_data \\\n", + " .map(lambda line: line.split()) \\\n", + " .map(lambda words: (words[IP_ADDRESS],words[USER_ID])) \\\n", + " .groupByKey()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Given a user table with the following csv format: [user_id, user_info0, user_info1, ...], map each line to (user_id, [user_info...]):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "user_data = sc.textFile(\"file:/path/*\")\n", + "\n", + "user_profile = user_data \\\n", + " .map(lambda line: line.split(',')) \\\n", + " .map(lambda words: (words[0], words[1:]))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Inner join the user_actions and user_profile RDDs:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "user_actions_with_profile = user_actions.join(user_profile)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Show the joined table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "for (user_id, (user_info, count)) in user_actions_with_profiles.take(10):\n", + " print user_id, count, user_info" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Running Spark on a Cluster" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the standalone cluster's Master and Worker daemons:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!sudo service spark-master start\n", + "!sudo service spark-worker start" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Stop the standalone cluster's Master and Worker daemons:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!sudo service spark-master stop\n", + "!sudo service spark-worker stop" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Restart the standalone cluster's Master and Worker daemons:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!sudo service spark-master stop\n", + "!sudo service spark-worker stop" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "View the Spark standalone cluster UI:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "http://localhost:18080//" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the Spark shell and connect to the cluster:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!MASTER=spark://localhost:7077 pyspark" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Confirm you are connected to the correct master:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "sc.master" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Viewing the Spark Application UI" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "From the following [reference](http://spark.apache.org/docs/1.2.0/monitoring.html):\n", + "\n", + "Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:\n", + "\n", + "A list of scheduler stages and tasks\n", + "A summary of RDD sizes and memory usage\n", + "Environmental information.\n", + "Information about the running executors\n", + "\n", + "You can access this interface by simply opening http://:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).\n", + "\n", + "Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "http://localhost:4040/" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Working with Partitions" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "From the following [reference](http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/):\n", + "\n", + "The Spark map() and flatMap() methods only operate on one input at a time, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "val dbConnection = ...\n", + "lines.map(... dbConnection.createStatement(...) ...)\n", + "dbConnection.close() // Wrong!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "However, this fails for several reasons:\n", + "\n", + "* It puts the object dbConnection into the map function’s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.\n", + "* map() is a transformation, rather than an operation, and is lazily evaluated. The connection can’t be closed immediately here.\n", + "* Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark — it’s the important mapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It’s like a “bulk map” method. This means that the mapPartitions() function can allocate resources locally at its start, and release them when done mapping many values." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "def count_txt(partIter):\n", + " for line in partIter: \n", + " if \".txt\" in line: txt_count += 1\n", + " yield (txt_count)\n", + "\n", + "my_data = sc.textFile(\"file:/path/*\") \\\n", + " .mapPartitions(count_txt) \\\n", + " .collect()\n", + " \n", + "# Show the partitioning \n", + "print \"Data partitions: \", my_data.toDebugString()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Caching RDDs" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Caching an RDD saves the data in memory. Caching is a suggestion to Spark as it is memory dependent.\n", + "\n", + "By default, every RDD operation executes the entire lineage. Caching can boost performance for datasets that are likely to be used by saving this expensive recomputation and is ideal for iterative algorithms or machine learning.\n", + "\n", + "* cache() stores data in memory\n", + "* persist() stores data in MEMORY_ONLY, MEMORY_AND_DISK (spill to disk), and DISK_ONLY\n", + "\n", + "Disk memory is stored on the node, not on HDFS.\n", + "\n", + "Replication is possible by using MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. If a cached partition becomes unavailable, Spark recomputes the partition through the lineage.\n", + "\n", + "Serialization is possible with MEMORY_ONLY_SER and MEMORY_AND_DISK_SER. This is more space efficient but less time efficient, as it uses Java serialization by default." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "# Cache RDD to memory\n", + "my_data.cache()\n", + "\n", + "# Persist RDD to both memory and disk (if memory is not enough), with replication of 2\n", + "my_data.persist(MEMORY_AND_DISK_2)\n", + "\n", + "# Unpersist RDD, removing it from memory and disk\n", + "my_data.unpersist()\n", + "\n", + "# Change the persistence level after unpersist\n", + "my_data.persist(MEMORY_AND_DISK)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Checkpointing RDDs\n", + "\n", + "Caching maintains RDD lineage, providing resilience. If the lineage is very long, it is possible to get a stack overflow.\n", + "\n", + "Checkpointing saves the data to HDFS, which provide fault tolerant storage across nodes. HDFS is not as fast as local storage for both reading and writing. Checkpointing is good for long lineages and for very large data sets that might not fit on local storage. Checkpointing removes lineage." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a checkpoint and perform an action by calling count() to materialize the checkpoint and save it to the checkpoint file:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "# Enable checkpointing by setting the checkpoint directory, \n", + "# which will contain all checkpoints for the given data:\n", + "sc.setCheckpointDir(\"checkpoints\")\n", + "\n", + "my_data = sc.parallelize([1,2,3,4,5])\n", + "\n", + "# Long loop that may cause a stack overflow\n", + "for i in range(1000):\n", + " my_data = mydata.map(lambda myInt: myInt + 1)\n", + "\n", + " if i % 10 == 0: \n", + " my_data.checkpoint()\n", + " my_data.count()\n", + "\n", + "my_data.collect()\n", + " \n", + "# Display the lineage\n", + "for rddstring in my_data.toDebugString().split('\\n'): \n", + " print rddstring.strip()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Writing and Running a Spark Application" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a Spark application to count the number of text files:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "import sys\n", + "from pyspark import SparkContext\n", + "\n", + "if __name__ == \"__main__\":\n", + " if len(sys.argv) < 2:\n", + " print >> sys.stderr, \"Usage: App Name \"\n", + " exit(-1)\n", + " \n", + " count_text_files()\n", + " \n", + "def count_text_files():\n", + " sc = SparkContext()\n", + " logfile = sys.argv[1]\n", + " text_files_count = sc.textFile(logfile)\n", + " .filter(lambda line: '.txt' in line)\n", + " text_files_count.cache()\n", + " print(\"Number of text files: \", text_files_count.count())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Submit the script to Spark for processing:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!spark-submit --properties-file dir/myspark.conf script.py data/*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Configuring Spark Applications" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run a Spark app and set the configuration options in the command line:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!spark-submit --master spark//localhost:7077 --name 'App Name' script.py data/*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Configure spark.conf:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "spark.app.name App Name\n", + "spark.ui.port 4141\n", + "spark.master spark://localhost:7077" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run a Spark app and set the configuration options through spark.conf:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!spark-submit --properties-file spark.conf script.py data/*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Set the config options programmatically:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "sconf = SparkConf() \\\n", + " .setAppName(\"Word Count\") \\\n", + " .set(\"spark.ui.port\",\"4141\")\n", + "sc = SparkContext(conf=sconf)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Set logging levels located in the following file, or place a copy in your pwd:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "$SPARK_HOME/conf/log4j.properties.template" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Streaming" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the Spark Shell locally with at least two threads (need a minimum of two threads for streaming, one for receiving, one for processing):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!spark-shell --master local[2]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a StreamingContext (similar to SparkContext in core Spark) with a batch duration of 1 second:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "val ssc = new StreamingContext(new SparkConf(), Seconds(1)) \n", + "val my_stream = ssc.socketTextStream(hostname, port)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Get a DStream from a streaming data source (text from a socket):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "val logs = ssc.socketTextStream(hostname, port)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "DStreams support regular transformations such as map, flatMap, and filter, and pair transformations such as reduceByKey, groupByKey, and joinByKey.\n", + "\n", + "Apply a DStream operation to each batch of RDDs (count up requests by user id, reduce by key to get the count):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "val requests = my_stream\n", + " .map(line => (line.split(\" \")(2), 1))\n", + " .reduceByKey((x, y) => x + y)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The transform(function) method creates a new DStream by executing the input function on the RDDs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "val sorted_requests = requests\n", + " .map(pair => pair.swap)\n", + " .transform(rdd => rdd.sortByKey(false))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "foreachRDD(function) performs a function on each RDD in the DStream (map is like a shortcut not requiring you to get the RDD first before doing an operation):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "sorted_requests.foreachRDD((rdd, time) => {\n", + " println(\"Top users @ \" + time)\n", + " rdd.take(5).foreach(\n", + " pair => printf(\"User: %s (%s)\\n\", pair._2, pair._1))\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Save the DStream result part files with the given folder prefix, the actual folder will be /dir/requests-timestamp0/:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "requests.saveAsTextFiles(\"/dir/requests\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the execution of all DStreams:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "ssc.start()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Wait for all background threads to complete before ending the main thread:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "ssc.awaitTermination()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Streaming with States" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Enable checkpointing to prevent infinite lineages:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "ssc.checkpoint(\"dir\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Compute a DStream based on the previous states plus the current state:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "def updateCount = (newCounts: Seq[Int], state: Option[Int]) => {\n", + " val newCount = newCounts.foldLeft(0)(_ + _)\n", + " val previousCount = state.getOrElse(0)\n", + " Some(newCount + previousCount)\n", + "}\n", + "\n", + "val totalUserreqs = userreqs.updateStateByKey(updateCount)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Compute a DStream based Sliding window, every 30 seconds, count requests by user over the last 5 minutes:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "val reqcountsByWindow = logs.map(line => (line.split(' ')(2), 1))\n", + " .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Minutes(5), Seconds(30))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Collect statistics with the StreamingListener API:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "// define listener\n", + "class MyListener extends StreamingListener {\n", + " override def onReceiverStopped(...) {\n", + " streamingContext.stop()\n", + " }\n", + "} \n", + "\n", + "// attach listener\n", + "streamingContext. addStreamingListener(new MyListener())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Broadcast Variables" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Read in list of items to broadcast from a local file:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "broadcast_file = \"broadcast.txt\"\n", + "broadcast_list = list(map(lambda l: l.strip(), open(broadcast_file)))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Broadcast the target list to all workers:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "broadcast_list_sc = sc.broadcast(broadcast_list)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Filter based on the broadcast list:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "log_file = \"hdfs://localhost/user/logs/*\"\n", + "filtered_data = sc.textFile(log_file)\\\n", + " .filter(lambda line: any(item in line for item in broadcast_list_sc.value))\n", + "\n", + "filtered_data.take(10)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Accumulators" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create an accumulator:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "txt_count = sc.accumulator(0)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Count the number of txt files in the RDD:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "my_data = sc.textFile(filePath)\n", + "my_data.foreach(lambda line: if '.txt' in line: txt_count.add(1))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Count the number of file types encountered:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "jpg_count = sc.accumulator(0)\n", + "html_count = sc.accumulator(0)\n", + "css_count = sc.accumulator(0)\n", + "\n", + "def countFileType(s):\n", + " if '.jpg' in s: jpg_count.add(1)\n", + " elif '.html' in s: html_count.add(1)\n", + " elif '.css' in s: css_count.add(1)\n", + "\n", + "filename=\"hdfs://logs/*\"\n", + "\n", + "logs = sc.textFile(filename)\n", + "logs.foreach(lambda line: countFileType(line))\n", + "\n", + "print 'File Type Totals:'\n", + "print '.css files: ', css_count.value\n", + "print '.html files: ', html_count.value\n", + "print '.jpg files: ', jpg_count.value" + ] } - ] -} \ No newline at end of file + ], + "metadata": { + "kernelspec": { + "display_name": "Python 2", + "language": "python", + "name": "python2" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.9" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}