Added DataFrames section

This commit is contained in:
Alessandro 2016-02-20 19:35:22 +01:00
parent 09f6c35138
commit d4450573c3

View File

@ -64,11 +64,19 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 1,
"metadata": { "metadata": {
"collapsed": false "collapsed": false
}, },
"outputs": [], "outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"/bin/sh: pyspark: command not found\r\n"
]
}
],
"source": [ "source": [
"!pyspark" "!pyspark"
] ]
@ -82,11 +90,22 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 2,
"metadata": { "metadata": {
"collapsed": false "collapsed": false
}, },
"outputs": [], "outputs": [
{
"data": {
"text/plain": [
"<pyspark.context.SparkContext at 0x103923610>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [ "source": [
"sc" "sc"
] ]
@ -113,7 +132,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 3,
"metadata": { "metadata": {
"collapsed": false "collapsed": false
}, },
@ -404,6 +423,213 @@
" print user_id, count, user_info" " print user_id, count, user_info"
] ]
}, },
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## DataFrames"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Given the Spark Context, create a SQLContext:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql import SQLContext\n",
"sqlContext = SQLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a dataframe based on the content of a file:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = sqlContext.jsonFile(\"file:/path/file.json\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Display the content of the DataFrame:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Print the schema:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Select a column:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.select(\"column_name\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a DataFrame with rows matching a given filter:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.filter(df.column_name > 10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Aggregate the results and count:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"ename": "NameError",
"evalue": "name 'df' is not defined",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-5-af17cfa6d2c8>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mdf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgroupBy\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"column_name\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;31mNameError\u001b[0m: name 'df' is not defined"
]
}
],
"source": [
"df.groupBy(\"column_name\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Convert a RDD to a DataFrame (by inferring the schema):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = sqlContext.inferSchema(my_data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Register the DataFrame as a table:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df.registerTempTable(\"dataframe_name\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run a SQL Query on a DataFrame registered as a table:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"rdd_from_df = sqlContext.sql(\"SELECT * FROM dataframe_name\") #the result is a RDD"
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},