Added snippets to checkpoint RDDs in Spark.

This commit is contained in:
Donne Martin 2015-03-08 05:55:45 -04:00
parent 0481497848
commit 1815c9a122

View File

@ -20,7 +20,8 @@
"* Running Spark on a Cluster\n", "* Running Spark on a Cluster\n",
"* Viewing the Spark Application UI\n", "* Viewing the Spark Application UI\n",
"* Working with Partitions\n", "* Working with Partitions\n",
"* Caching RDDs\n" "* Caching RDDs\n",
"* Checkpointing RDDs"
] ]
}, },
{ {
@ -613,6 +614,52 @@
"language": "python", "language": "python",
"metadata": {}, "metadata": {},
"outputs": [] "outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Checkpointing RDDs\n",
"\n",
"Caching maintains RDD lineage, providing resilience. If the lineage is very long, it is possible to get a stack overflow.\n",
"\n",
"Checkpointing saves the data to HDFS, which provide fault tolerant storage across nodes. HDFS is not as fast as local storage for both reading and writing. Checkpointing is good for long lineages and for very large data sets that might not fit on local storage. Checkpointing removes lineage."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a checkpoint and perform an action by calling count() to materialize the checkpoint and save it to the checkpoint file:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Enable checkpointing by setting the checkpoint directory, \n",
"# which will contain all checkpoints for the given data:\n",
"sc.setCheckpointDir(\"checkpoints\")\n",
"\n",
"my_data = sc.parallelize([1,2,3,4,5])\n",
"\n",
"# Long loop that may cause a stack overflow\n",
"for i in range(1000):\n",
" my_data = mydata.map(lambda myInt: myInt + 1)\n",
"\n",
" if i % 10 == 0: \n",
" my_data.checkpoint()\n",
" my_data.count()\n",
"\n",
"my_data.collect()\n",
" \n",
"# Display the lineage\n",
"for rddstring in my_data.toDebugString().split('\\n'): \n",
" print rddstring.strip()"
],
"language": "python",
"metadata": {},
"outputs": []
} }
], ],
"metadata": {} "metadata": {}