From 404676a1f7cf204eb5dca323551213b683ef3bc6 Mon Sep 17 00:00:00 2001 From: Donne Martin Date: Sat, 7 Mar 2015 09:07:18 -0500 Subject: [PATCH] Added discussion and snippet for working with partitions in Spark. --- spark/spark.ipynb | 74 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/spark/spark.ipynb b/spark/spark.ipynb index 1035c73..76cf45e 100644 --- a/spark/spark.ipynb +++ b/spark/spark.ipynb @@ -1,7 +1,7 @@ { "metadata": { "name": "", - "signature": "sha256:4117412dc3ddca1bb16e3800a74447569b7bd2b7484113922798e4326e254940" + "signature": "sha256:ba2a57f8daa28f0934b9121adbb394f85c2a7b6b1c77196c1982e814079af90f" }, "nbformat": 3, "nbformat_minor": 0, @@ -18,7 +18,8 @@ "* RDDs\n", "* Pair RDDs\n", "* Running Spark on a Cluster\n", - "* Viewing the Spark Application UI" + "* Viewing the Spark Application UI\n", + "* Working with Partitions" ] }, { @@ -477,6 +478,8 @@ "cell_type": "markdown", "metadata": {}, "source": [ + "From the following [reference](http://spark.apache.org/docs/1.2.0/monitoring.html):\n", + "\n", "Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:\n", "\n", "A list of scheduler stages and tasks\n", @@ -486,9 +489,7 @@ "\n", "You can access this interface by simply opening http://:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).\n", "\n", - "Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage.\n", - "\n", - "[Reference](http://spark.apache.org/docs/1.2.0/monitoring.html)" + "Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage." ] }, { @@ -500,6 +501,69 @@ "language": "python", "metadata": {}, "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Working with Partitions" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "From the following [reference](http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/):\n", + "\n", + "The Spark map() and flatMap() methods only operate on one input at a time, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "val dbConnection = ...\n", + "lines.map(... dbConnection.createStatement(...) ...)\n", + "dbConnection.close() // Wrong!" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "However, this fails for several reasons:\n", + "\n", + "* It puts the object dbConnection into the map function\u2019s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.\n", + "* map() is a transformation, rather than an operation, and is lazily evaluated. The connection can\u2019t be closed immediately here.\n", + "* Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark \u2014 it\u2019s the important mapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It\u2019s like a \u201cbulk map\u201d method. This means that the mapPartitions() function can allocate resources locally at its start, and release them when done mapping many values." + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "def count_txt(partIter):\n", + " for line in partIter: \n", + " if \".txt\" in line: txt_count += 1\n", + " yield (txt_count)\n", + "\n", + "sc.textFile(\"file:/path/*\") \\\n", + " .mapPartitions(count_txt) \\\n", + " .collect()" + ], + "language": "python", + "metadata": {}, + "outputs": [] } ], "metadata": {}