data-science-ipython-notebooks/spark/spark.ipynb

508 lines
11 KiB
Plaintext

{
"metadata": {
"name": "",
"signature": "sha256:4117412dc3ddca1bb16e3800a74447569b7bd2b7484113922798e4326e254940"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Spark\n",
"\n",
"* Python Shell\n",
"* RDDs\n",
"* Pair RDDs\n",
"* Running Spark on a Cluster\n",
"* Viewing the Spark Application UI"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Python Shell"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the pyspark shell:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"pyspark"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"View the spark context:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sc"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## RDDs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create an RDD from the contents of a directory:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"my_data = sc.textFile(\"file:/path/*\")"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Count the number of lines in the data:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"my_data.count()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Display the data in the data:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"my_data.collect()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Return the first 10 lines in the data:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"my_data.take(10)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create an RDD with lines matching the given filter:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"my_data.filter(lambda line: \".txt\" in line)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Chain a series of commands:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sc.textFile(\"file:/path/file.txt\") \\\n",
" .filter(lambda line: \".txt\" in line) \\\n",
" .count()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a new RDD mapping each line to an array of words, taking only the first word of each array:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"first_words = my_data.map(lambda line: line.split()[0])"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Output each word in first_words:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"for word in first_words.take(10):\n",
" print word"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Save the first words to a text file:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"first_words.saveAsTextFile(\"file:/path/file\")"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pair RDDs\n",
"\n",
"Pair RDDs contain elements that are key-value pairs. Keys and values can be any type."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Given a log file with the following space deilmited format: [date_time, user_id, ip_address, action], map each request to (user_id, 1):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"DATE_TIME = 0\n",
"USER_ID = 1\n",
"IP_ADDRESS = 2\n",
"ACTION = 3\n",
"\n",
"log_data = sc.textFile(\"file:/path/*\")\n",
"\n",
"user_actions = log_data \\\n",
" .map(lambda line: line.split()) \\\n",
" .map(lambda words: (words[USER_ID], 1)) \\\n",
" .reduceByKey(lambda count1, count2: count1 + count2)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Show the top 5 users by count, sorted in descending order:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"user_actions.map(lambda pair: (pair[0], pair[1])).sortyByKey(False).take(5)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Group IP addresses by user id:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"user_ips = log_data \\\n",
" .map(lambda line: line.split()) \\\n",
" .map(lambda words: (words[IP_ADDRESS],words[USER_ID])) \\\n",
" .groupByKey()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Given a user table with the following csv format: [user_id, user_info0, user_info1, ...], map each line to (user_id, [user_info...]):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"user_data = sc.textFile(\"file:/path/*\")\n",
"\n",
"user_profile = user_data \\\n",
" .map(lambda line: line.split(',')) \\\n",
" .map(lambda words: (words[0], words[1:]))"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Inner join the user_actions and user_profile RDDs:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"user_actions_with_profile = user_actions.join(user_profile)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Show the joined table:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"for (user_id, (user_info, count)) in user_actions_with_profiles.take(10):\n",
" print user_id, count, user_info"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running Spark on a Cluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the standalone cluster's Master and Worker daemons:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sudo service spark-master start\n",
"sudo service spark-worker start"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Stop the standalone cluster's Master and Worker daemons:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sudo service spark-master stop\n",
"sudo service spark-worker stop"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Restart the standalone cluster's Master and Worker daemons:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sudo service spark-master stop\n",
"sudo service spark-worker stop"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"View the Spark standalone cluster UI:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"http://localhost:18080//"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the Spark shell and connect to the cluster:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"MASTER=spark://localhost:7077 pyspark"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Confirm you are connected to the correct master:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sc.master"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Viewing the Spark Application UI"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:\n",
"\n",
"A list of scheduler stages and tasks\n",
"A summary of RDD sizes and memory usage\n",
"Environmental information.\n",
"Information about the running executors\n",
"\n",
"You can access this interface by simply opening http://<driver-node>:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).\n",
"\n",
"Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage.\n",
"\n",
"[Reference](http://spark.apache.org/docs/1.2.0/monitoring.html)"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"http://localhost:4040/"
],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}