diff --git a/spark/spark.ipynb b/spark/spark.ipynb index d9f6c5f..f4f0c87 100644 --- a/spark/spark.ipynb +++ b/spark/spark.ipynb @@ -1,7 +1,7 @@ { "metadata": { "name": "", - "signature": "sha256:6de81fb20a6c3dee884019beb8604ace32cb3be3e7d63d3547dd433075d62e69" + "signature": "sha256:ecb4af31fb2838a9be26c4692a4c2619957209df829895e8486de7eb84b59fa3" }, "nbformat": 3, "nbformat_minor": 0, @@ -15,7 +15,8 @@ "# Spark\n", "\n", "* Python Shell\n", - "* RDDs" + "* RDDs\n", + "* Pair RDDs" ] }, { @@ -221,6 +222,135 @@ "language": "python", "metadata": {}, "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Pair RDDs\n", + "\n", + "Pair RDDs contain elements that are key-value pairs. Keys and values can be any type." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Given a log file with the following space deilmited format: [date_time, user_id, ip_address, action], map each request to (user_id, 1):" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "DATE_TIME = 0\n", + "USER_ID = 1\n", + "IP_ADDRESS = 2\n", + "ACTION = 3\n", + "\n", + "log_data = sc.textFile(\"file:/path/*\")\n", + "\n", + "user_actions = log_data \\\n", + " .map(lambda line: line.split()) \\\n", + " .map(lambda words: (words[USER_ID], 1)) \\\n", + " .reduceByKey(lambda count1, count2: count1 + count2)" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Show the top 5 users by count, sorted in descending order:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "user_actions.map(lambda pair: (pair[0], pair[1])).sortyByKey(False).take(5)" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Group IP addresses by user id:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "user_ips = log_data \\\n", + " .map(lambda line: line.split()) \\\n", + " .map(lambda words: (words[IP_ADDRESS],words[USER_ID])) \\\n", + " .groupByKey()" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Given a user table with the following csv format: [user_id, user_info0, user_info1, ...], map each line to (user_id, [user_info...]):" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "user_data = sc.textFile(\"file:/path/*\")\n", + "\n", + "user_profile = user_data \\\n", + " .map(lambda line: line.split(',')) \\\n", + " .map(lambda words: (words[0], words[1:]))" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Inner join the user_actions and user_profile RDDs:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "user_actions_with_profile = user_actions.join(user_profile)" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Show the joined table:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "for (user_id, (user_info, count)) in user_actions_with_profiles.take(10):\n", + " print user_id, count, user_info" + ], + "language": "python", + "metadata": {}, + "outputs": [] } ], "metadata": {}