{ "metadata": { "name": "", "signature": "sha256:0426fb2480e184a6d65b40b69e4601e1abb23c84cc1090d1fe0e2e98803c6220" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Spark\n", "\n", "* Python Shell\n", "* RDDs\n", "* Pair RDDs\n", "* Running Spark on a Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Python Shell" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start the pyspark shell:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "pyspark" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "View the spark context:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sc" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDDs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an RDD from the contents of a directory:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "my_data = sc.textFile(\"file:/path/*\")" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Count the number of lines in the data:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "my_data.count()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Display the data in the data:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "my_data.collect()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Return the first 10 lines in the data:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "my_data.take(10)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an RDD with lines matching the given filter:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "my_data.filter(lambda line: \".txt\" in line)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Chain a series of commands:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sc.textFile(\"file:/path/file.txt\") \\\n", " .filter(lambda line: \".txt\" in line) \\\n", " .count()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a new RDD mapping each line to an array of words, taking only the first word of each array:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "first_words = my_data.map(lambda line: line.split()[0])" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Output each word in first_words:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "for word in first_words.take(10):\n", " print word" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save the first words to a text file:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "first_words.saveAsTextFile(\"file:/path/file\")" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pair RDDs\n", "\n", "Pair RDDs contain elements that are key-value pairs. Keys and values can be any type." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given a log file with the following space deilmited format: [date_time, user_id, ip_address, action], map each request to (user_id, 1):" ] }, { "cell_type": "code", "collapsed": false, "input": [ "DATE_TIME = 0\n", "USER_ID = 1\n", "IP_ADDRESS = 2\n", "ACTION = 3\n", "\n", "log_data = sc.textFile(\"file:/path/*\")\n", "\n", "user_actions = log_data \\\n", " .map(lambda line: line.split()) \\\n", " .map(lambda words: (words[USER_ID], 1)) \\\n", " .reduceByKey(lambda count1, count2: count1 + count2)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show the top 5 users by count, sorted in descending order:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "user_actions.map(lambda pair: (pair[0], pair[1])).sortyByKey(False).take(5)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Group IP addresses by user id:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "user_ips = log_data \\\n", " .map(lambda line: line.split()) \\\n", " .map(lambda words: (words[IP_ADDRESS],words[USER_ID])) \\\n", " .groupByKey()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given a user table with the following csv format: [user_id, user_info0, user_info1, ...], map each line to (user_id, [user_info...]):" ] }, { "cell_type": "code", "collapsed": false, "input": [ "user_data = sc.textFile(\"file:/path/*\")\n", "\n", "user_profile = user_data \\\n", " .map(lambda line: line.split(',')) \\\n", " .map(lambda words: (words[0], words[1:]))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Inner join the user_actions and user_profile RDDs:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "user_actions_with_profile = user_actions.join(user_profile)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Show the joined table:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "for (user_id, (user_info, count)) in user_actions_with_profiles.take(10):\n", " print user_id, count, user_info" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running Spark on a Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start the standalone cluster's Master and Worker daemons:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sudo service spark-master start\n", "sudo service spark-worker start" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Stop the standalone cluster's Master and Worker daemons:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sudo service spark-master stop\n", "sudo service spark-worker stop" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Restart the standalone cluster's Master and Worker daemons:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sudo service spark-master stop\n", "sudo service spark-worker stop" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "View the Spark standalone cluster UI:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "http://localhost:18080//" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start the Spark shell and connect to the cluster:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "MASTER=spark://localhost:7077 pyspark" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Confirm you are connected to the correct master:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "sc.master" ], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }