Added Spark streaming snippets.

This commit is contained in:
Donne Martin 2015-03-16 16:01:51 -04:00
parent 8364d476b3
commit 10d63efb4a

View File

@ -1,7 +1,7 @@
{
"metadata": {
"name": "",
"signature": "sha256:41d7aa3100998f60caf63db5380eee43e78d4de66c65af923e4877df81248ae7"
"signature": "sha256:7abeb0526adb21448f2c1ad55b4e1cb97cb81bcc62e9c450b3c5645578c7b59b"
},
"nbformat": 3,
"nbformat_minor": 0,
@ -23,7 +23,8 @@
"* Caching RDDs\n",
"* Checkpointing RDDs\n",
"* Writing and Running a Spark Application\n",
"* Configuring Spark Applications"
"* Configuring Spark Applications\n",
"* Streaming"
]
},
{
@ -816,6 +817,177 @@
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Streaming"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the Spark Shell locally with at least two threads (need a minimum of two threads for streaming, one for receiving, one for processing):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"!spark-shell --master local[2]"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a StreamingContext (similar to SparkContext in core Spark) with a batch duration of 1 second:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"val ssc = new StreamingContext(new SparkConf(), Seconds(1)) \n",
"val my_stream = ssc.socketTextStream(hostname, port)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Get a DStream from a streaming data source (text from a socket):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"val logs = ssc.socketTextStream(hostname, port)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"DStreams support regular transformations such as map, flatMap, and filter, and pair transformations such as reduceByKey, groupByKey, and joinByKey.\n",
"\n",
"Apply a DStream operation to each batch of RDDs (count up requests by user id, reduce by key to get the count):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"val requests = my_stream\n",
" .map(line => (line.split(\" \")(2), 1))\n",
" .reduceByKey((x, y) => x + y)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The transform(function) method creates a new DStream by executing the input function on the RDDs."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"val sorted_requests = requests\n",
" .map(pair => pair.swap)\n",
" .transform(rdd => rdd.sortByKey(false))"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"foreachRDD(function) performs a function on each RDD in the DStream (map is like a shortcut not requiring you to get the RDD first before doing an operation):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sorted_requests.foreachRDD((rdd,time) => {\n",
" println(\"Top users @ \" + time)\n",
" rdd.take(5).foreach(\n",
" pair => printf(\"User: %s (%s)\\n\",pair._2, pair._1))\n",
"}"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Save the DStream result part files with the given folder prefix, the actual folder will be /dir/requests-timestamp0/:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"requests.saveAsTextFiles(\"/dir/requests\")"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start the execution of all DStreams:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"ssc.start()"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Wait for all background threads to complete before ending the main thread:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"ssc.awaitTermination()"
],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}