diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..df3e054 Binary files /dev/null and b/.DS_Store differ diff --git a/spark/spark.ipynb b/spark/spark.ipynb index 9723b4f..e84259e 100644 --- a/spark/spark.ipynb +++ b/spark/spark.ipynb @@ -15,6 +15,7 @@ "\n", "* IPython Notebook Setup\n", "* Python Shell\n", + "* DataFrames\n", "* RDDs\n", "* Pair RDDs\n", "* Running Spark on a Cluster\n", @@ -91,12 +92,374 @@ "sc" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## DataFrames" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "From the following [reference](https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html):\n", + "\n", + "A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a DataFrame from JSON files on S3:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "users = context.load(\"s3n://path/to/users.json\", \"json\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a new DataFrame that contains “young users” only:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "young = users.filter(users.age<21)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Alternatively, using Pandas-like syntax:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "young = users[users.age<21]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Increment everybody’s age by 1:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "young.select(young.name, young.age+1)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Count the number of young users by gender:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "young.groupBy(\"gender\").count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Join young users with another DataFrame called logs:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "young.join(logs, logs.userId == users.userId, \"left_outer\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Count the number of users in the young DataFrame:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "young.registerTempTable(\"young\")\n", + "context.sql(\"SELECT count(*) FROM young\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Convert Spark DataFrame to Pandas:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "pandas_df = young.toPandas()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a Spark DataFrame from Pandas:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "spark_df = context.createDataFrame(pandas_df)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Given the Spark Context, create a SQLContext:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.sql import SQLContext\n", + "sqlContext = SQLContext(sc)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a DataFrame based on the content of a file:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df = sqlContext.jsonFile(\"file:/path/file.json\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Display the content of the DataFrame:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Print the schema:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Select a column:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.select(\"column_name\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a DataFrame with rows matching a given filter:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.filter(df.column_name>10)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Aggregate the results and count:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "df.groupBy(\"column_name\").count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Convert a RDD to a DataFrame (by inferring the schema):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df = sqlContext.inferSchema(my_data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Register the DataFrame as a table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.registerTempTable(\"dataframe_name\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run a SQL Query on a DataFrame registered as a table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "rdd_from_df = sqlContext.sql(\"SELECT * FROM dataframe_name\")" + ] + }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDDs\n", "\n", + "Note: RDDs are included for completeness. In Spark 1.3, DataFrames were introduced which are recommended over RDDs. Check out the [DataFrames announcement](https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html) for more info.\n", + "\n", "Resilient Distributed Datasets (RDDs) are the fundamental unit of data in Spark. RDDs can be created from a file, from data in memory, or from another RDD. RDDs are immutable.\n", "\n", "There are two types of RDD operations:\n", @@ -1302,21 +1665,21 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 2", + "display_name": "Python 3", "language": "python", - "name": "python2" + "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", - "version": 2 + "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", - "pygments_lexer": "ipython2", - "version": "2.7.10" + "pygments_lexer": "ipython3", + "version": "3.4.3" } }, "nbformat": 4,