Merge branch 'amarrella-master'

* amarrella-master:
  Add note on DataFrame recomme^Cation over RDD
  Add more Spark DataFrame examples
  Move DataFrames before RDDs
  Added DataFrames section and cleared outputs
  Added DataFrames section
This commit is contained in:
Donne Martin 2016-02-21 06:23:41 -05:00
commit 4e8f427764
2 changed files with 368 additions and 5 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@ -15,6 +15,7 @@
"\n", "\n",
"* IPython Notebook Setup\n", "* IPython Notebook Setup\n",
"* Python Shell\n", "* Python Shell\n",
"* DataFrames\n",
"* RDDs\n", "* RDDs\n",
"* Pair RDDs\n", "* Pair RDDs\n",
"* Running Spark on a Cluster\n", "* Running Spark on a Cluster\n",
@ -91,12 +92,374 @@
"sc" "sc"
] ]
}, },
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## DataFrames"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From the following [reference](https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html):\n",
"\n",
"A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a DataFrame from JSON files on S3:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"users = context.load(\"s3n://path/to/users.json\", \"json\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a new DataFrame that contains “young users” only:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"young = users.filter(users.age<21)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Alternatively, using Pandas-like syntax:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"young = users[users.age<21]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Increment everybodys age by 1:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"young.select(young.name, young.age+1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Count the number of young users by gender:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"young.groupBy(\"gender\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Join young users with another DataFrame called logs:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"young.join(logs, logs.userId == users.userId, \"left_outer\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Count the number of users in the young DataFrame:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"young.registerTempTable(\"young\")\n",
"context.sql(\"SELECT count(*) FROM young\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Convert Spark DataFrame to Pandas:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"pandas_df = young.toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a Spark DataFrame from Pandas:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"spark_df = context.createDataFrame(pandas_df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Given the Spark Context, create a SQLContext:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql import SQLContext\n",
"sqlContext = SQLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a DataFrame based on the content of a file:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = sqlContext.jsonFile(\"file:/path/file.json\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Display the content of the DataFrame:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Print the schema:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Select a column:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.select(\"column_name\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a DataFrame with rows matching a given filter:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.filter(df.column_name>10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Aggregate the results and count:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df.groupBy(\"column_name\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Convert a RDD to a DataFrame (by inferring the schema):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = sqlContext.inferSchema(my_data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Register the DataFrame as a table:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.registerTempTable(\"dataframe_name\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run a SQL Query on a DataFrame registered as a table:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rdd_from_df = sqlContext.sql(\"SELECT * FROM dataframe_name\")"
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"## RDDs\n", "## RDDs\n",
"\n", "\n",
"Note: RDDs are included for completeness. In Spark 1.3, DataFrames were introduced which are recommended over RDDs. Check out the [DataFrames announcement](https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html) for more info.\n",
"\n",
"Resilient Distributed Datasets (RDDs) are the fundamental unit of data in Spark. RDDs can be created from a file, from data in memory, or from another RDD. RDDs are immutable.\n", "Resilient Distributed Datasets (RDDs) are the fundamental unit of data in Spark. RDDs can be created from a file, from data in memory, or from another RDD. RDDs are immutable.\n",
"\n", "\n",
"There are two types of RDD operations:\n", "There are two types of RDD operations:\n",
@ -1302,21 +1665,21 @@
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "Python 2", "display_name": "Python 3",
"language": "python", "language": "python",
"name": "python2" "name": "python3"
}, },
"language_info": { "language_info": {
"codemirror_mode": { "codemirror_mode": {
"name": "ipython", "name": "ipython",
"version": 2 "version": 3
}, },
"file_extension": ".py", "file_extension": ".py",
"mimetype": "text/x-python", "mimetype": "text/x-python",
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython2", "pygments_lexer": "ipython3",
"version": "2.7.10" "version": "3.4.3"
} }
}, },
"nbformat": 4, "nbformat": 4,