diff --git a/spark/spark.ipynb b/spark/spark.ipynb index 9723b4f..4bfac0a 100644 --- a/spark/spark.ipynb +++ b/spark/spark.ipynb @@ -64,11 +64,19 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": { "collapsed": false }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/bin/sh: pyspark: command not found\r\n" + ] + } + ], "source": [ "!pyspark" ] @@ -82,11 +90,22 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": { "collapsed": false }, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "sc" ] @@ -113,7 +132,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": { "collapsed": false }, @@ -404,6 +423,213 @@ " print user_id, count, user_info" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## DataFrames" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Given the Spark Context, create a SQLContext:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.sql import SQLContext\n", + "sqlContext = SQLContext(sc)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a dataframe based on the content of a file:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df = sqlContext.jsonFile(\"file:/path/file.json\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Display the content of the DataFrame:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Print the schema:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Select a column:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.select(\"column_name\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a DataFrame with rows matching a given filter:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.filter(df.column_name > 10)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Aggregate the results and count:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'df' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mdf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgroupBy\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"column_name\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", + "\u001b[0;31mNameError\u001b[0m: name 'df' is not defined" + ] + } + ], + "source": [ + "df.groupBy(\"column_name\").count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Convert a RDD to a DataFrame (by inferring the schema):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df = sqlContext.inferSchema(my_data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Register the DataFrame as a table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "df.registerTempTable(\"dataframe_name\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run a SQL Query on a DataFrame registered as a table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "rdd_from_df = sqlContext.sql(\"SELECT * FROM dataframe_name\") #the result is a RDD" + ] + }, { "cell_type": "markdown", "metadata": {},