diff --git a/spark/spark.ipynb b/spark/spark.ipynb index 1999b94..ea05858 100644 --- a/spark/spark.ipynb +++ b/spark/spark.ipynb @@ -1,7 +1,7 @@ { "metadata": { "name": "", - "signature": "sha256:41d7aa3100998f60caf63db5380eee43e78d4de66c65af923e4877df81248ae7" + "signature": "sha256:7abeb0526adb21448f2c1ad55b4e1cb97cb81bcc62e9c450b3c5645578c7b59b" }, "nbformat": 3, "nbformat_minor": 0, @@ -23,7 +23,8 @@ "* Caching RDDs\n", "* Checkpointing RDDs\n", "* Writing and Running a Spark Application\n", - "* Configuring Spark Applications" + "* Configuring Spark Applications\n", + "* Streaming" ] }, { @@ -816,6 +817,177 @@ "language": "python", "metadata": {}, "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Streaming" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the Spark Shell locally with at least two threads (need a minimum of two threads for streaming, one for receiving, one for processing):" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "!spark-shell --master local[2]" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a StreamingContext (similar to SparkContext in core Spark) with a batch duration of 1 second:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "val ssc = new StreamingContext(new SparkConf(), Seconds(1)) \n", + "val my_stream = ssc.socketTextStream(hostname, port)" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Get a DStream from a streaming data source (text from a socket):" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "val logs = ssc.socketTextStream(hostname, port)" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "DStreams support regular transformations such as map, flatMap, and filter, and pair transformations such as reduceByKey, groupByKey, and joinByKey.\n", + "\n", + "Apply a DStream operation to each batch of RDDs (count up requests by user id, reduce by key to get the count):" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "val requests = my_stream\n", + " .map(line => (line.split(\" \")(2), 1))\n", + " .reduceByKey((x, y) => x + y)" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The transform(function) method creates a new DStream by executing the input function on the RDDs." + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "val sorted_requests = requests\n", + " .map(pair => pair.swap)\n", + " .transform(rdd => rdd.sortByKey(false))" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "foreachRDD(function) performs a function on each RDD in the DStream (map is like a shortcut not requiring you to get the RDD first before doing an operation):" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "sorted_requests.foreachRDD((rdd,time) => {\n", + " println(\"Top users @ \" + time)\n", + " rdd.take(5).foreach(\n", + " pair => printf(\"User: %s (%s)\\n\",pair._2, pair._1))\n", + "}" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Save the DStream result part files with the given folder prefix, the actual folder will be /dir/requests-timestamp0/:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "requests.saveAsTextFiles(\"/dir/requests\")" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the execution of all DStreams:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "ssc.start()" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Wait for all background threads to complete before ending the main thread:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "ssc.awaitTermination()" + ], + "language": "python", + "metadata": {}, + "outputs": [] } ], "metadata": {}