Added Spark streaming with states snippets.

This commit is contained in:
Donne Martin 2015-03-29 17:53:52 -04:00
parent f316416d88
commit b3fb4ae219

View File

@ -1,7 +1,7 @@
{
"metadata": {
"name": "",
"signature": "sha256:7abeb0526adb21448f2c1ad55b4e1cb97cb81bcc62e9c450b3c5645578c7b59b"
"signature": "sha256:0fb24b809c8caadd2f68f7898036387e846da3f839a97e63be9a65aa17f3dbb1"
},
"nbformat": 3,
"nbformat_minor": 0,
@ -24,7 +24,8 @@
"* Checkpointing RDDs\n",
"* Writing and Running a Spark Application\n",
"* Configuring Spark Applications\n",
"* Streaming"
"* Streaming\n",
"* Streaming with States"
]
},
{
@ -928,10 +929,10 @@
"cell_type": "code",
"collapsed": false,
"input": [
"sorted_requests.foreachRDD((rdd,time) => {\n",
"sorted_requests.foreachRDD((rdd, time) => {\n",
" println(\"Top users @ \" + time)\n",
" rdd.take(5).foreach(\n",
" pair => printf(\"User: %s (%s)\\n\",pair._2, pair._1))\n",
" pair => printf(\"User: %s (%s)\\n\", pair._2, pair._1))\n",
"}"
],
"language": "python",
@ -988,6 +989,112 @@
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Streaming with States"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Enable checkpointing to prevent infinite lineages:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"ssc.checkpoint(\"dir\")"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Compute a DStream based on the previous states plus the current state:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def updateCount = (newCounts: Seq[Int], state: Option[Int]) => {\n",
" val newCount = newCounts.foldLeft(0)(_ + _)\n",
" val previousCount = state.getOrElse(0)\n",
" Some(newCount + previousCount)\n",
"}\n",
"\n",
"val totalUserreqs = userreqs.updateStateByKey(updateCount)"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Compute a DStream based Sliding window, every 30 seconds, count requests by user over the last 5 minutes:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"val reqcountsByWindow = logs.map(line => (line.split(' ')(2), 1))\n",
" .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Minutes(5), Seconds(30))"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Collect statistics with the StreamingListener API:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"// define listener\n",
"class MyListener extends StreamingListener {\n",
" override def onReceiverStopped(...) {\n",
" streamingContext.stop()\n",
" }\n",
"} \n",
"\n",
"// attach listener\n",
"streamingContext. addStreamingListener(new MyListener())"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}