From 7195c5bc8235e380348d0a644c5c3d0993ca4d53 Mon Sep 17 00:00:00 2001 From: Donne Martin Date: Mon, 30 Mar 2015 19:01:07 -0400 Subject: [PATCH] Added Spark broadcast variables snippets. --- spark/spark.ipynb | 56 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/spark/spark.ipynb b/spark/spark.ipynb index c33ac24..241650e 100644 --- a/spark/spark.ipynb +++ b/spark/spark.ipynb @@ -1,7 +1,7 @@ { "metadata": { "name": "", - "signature": "sha256:0fb24b809c8caadd2f68f7898036387e846da3f839a97e63be9a65aa17f3dbb1" + "signature": "sha256:cf2651f340403267ecf82871c01a4f9deb80df25b8345be4f56dc02db970a5a7" }, "nbformat": 3, "nbformat_minor": 0, @@ -25,7 +25,8 @@ "* Writing and Running a Spark Application\n", "* Configuring Spark Applications\n", "* Streaming\n", - "* Streaming with States" + "* Streaming with States\n", + "* Broadcast Variables" ] }, { @@ -1080,18 +1081,65 @@ "metadata": {}, "outputs": [] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Broadcast Variables" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Read in list of items to broadcast from a local file:" + ] + }, { "cell_type": "code", "collapsed": false, - "input": [], + "input": [ + "broadcast_file = \"broadcast.txt\"\n", + "broadcast_list = list(map(lambda l: l.strip(), open(broadcast_file)))" + ], "language": "python", "metadata": {}, "outputs": [] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Broadcast the target list to all workers:" + ] + }, { "cell_type": "code", "collapsed": false, - "input": [], + "input": [ + "broadcast_list_sc = sc.broadcast(broadcast_list)" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Filter based on the broadcast list:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "log_file = \"hdfs://localhost/user/logs/*\"\n", + "filtered_data = sc.textFile(log_file)\\\n", + " .filter(lambda line: any(item in line for item in broadcast_list_sc.value))\n", + "\n", + "filtered_data.take(10)" + ], "language": "python", "metadata": {}, "outputs": []