diff --git a/mapreduce/mapreduce-python.ipynb b/mapreduce/mapreduce-python.ipynb new file mode 100644 index 0000000..7513229 --- /dev/null +++ b/mapreduce/mapreduce-python.ipynb @@ -0,0 +1,452 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Hadoop MapReduce: Python Streaming with mrjob\n", + "\n", + "* Introduction\n", + "* Setup\n", + "* Processing S3 Logs\n", + "* Running Amazon Elastic MapReduce (EMR) Jobs\n", + "* Unit Testing S3 Logs\n", + "* Running S3 Logs Unit Test\n", + "* Sample .mrjob.conf" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Introduction\n", + "\n", + "[mrjob](https://pythonhosted.org/mrjob/) lets you write MapReduce jobs in Python 2.5+ and run them on several platforms. You can:\n", + "\n", + "* Write multi-step MapReduce jobs in pure Python\n", + "* Test on your local machine\n", + "* Run on a Hadoop cluster\n", + "* Run in the cloud using Amazon Elastic MapReduce (EMR)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "From PyPI:\n", + "\n", + "``pip install mrjob``\n", + "\n", + "From source:\n", + "\n", + "``python setup.py install``\n", + "\n", + "See \"Sample .mrjob.conf\" section for additional config details." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Processing S3 Logs\n", + "\n", + "Sample mrjob code that processes log files on Amazon S3 based on the [S3 logging format](http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "%%file mr_s3_log_parser.py\n", + "\n", + "import time\n", + "from mrjob.job import MRJob\n", + "from mrjob.protocol import RawValueProtocol, ReprProtocol\n", + "import re\n", + "\n", + "\n", + "class MrS3LogParser(MRJob):\n", + " \"\"\"Parses the logs from S3 based on the S3 logging format:\n", + " http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html\n", + " \n", + " Aggregates a user's daily requests by user agent and operation\n", + " \n", + " Outputs date_time, requester, user_agent, operation, count\n", + " \"\"\"\n", + "\n", + " LOGPATS = r'(\\S+) (\\S+) \\[(.*?)\\] (\\S+) (\\S+) ' \\\n", + " r'(\\S+) (\\S+) (\\S+) (\"([^\"]+)\"|-) ' \\\n", + " r'(\\S+) (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) ' \\\n", + " r'(\"([^\"]+)\"|-) (\"([^\"]+)\"|-)'\n", + " NUM_ENTRIES_PER_LINE = 17\n", + " logpat = re.compile(LOGPATS)\n", + "\n", + " (S3_LOG_BUCKET_OWNER, \n", + " S3_LOG_BUCKET, \n", + " S3_LOG_DATE_TIME,\n", + " S3_LOG_IP, \n", + " S3_LOG_REQUESTER_ID, \n", + " S3_LOG_REQUEST_ID,\n", + " S3_LOG_OPERATION, \n", + " S3_LOG_KEY, \n", + " S3_LOG_HTTP_METHOD,\n", + " S3_LOG_HTTP_STATUS, \n", + " S3_LOG_S3_ERROR, \n", + " S3_LOG_BYTES_SENT,\n", + " S3_LOG_OBJECT_SIZE, \n", + " S3_LOG_TOTAL_TIME, \n", + " S3_LOG_TURN_AROUND_TIME,\n", + " S3_LOG_REFERER, \n", + " S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)\n", + "\n", + " DELIMITER = '\\t'\n", + "\n", + " # We use RawValueProtocol for input to be format agnostic\n", + " # and avoid any type of parsing errors\n", + " INPUT_PROTOCOL = RawValueProtocol\n", + "\n", + " # We use RawValueProtocol for output so we can output raw lines\n", + " # instead of (k, v) pairs\n", + " OUTPUT_PROTOCOL = RawValueProtocol\n", + "\n", + " # Encode the intermediate records using repr() instead of JSON, so the\n", + " # record doesn't get Unicode-encoded\n", + " INTERNAL_PROTOCOL = ReprProtocol\n", + "\n", + " def clean_date_time_zone(self, raw_date_time_zone):\n", + " \"\"\"Converts entry 22/Jul/2013:21:04:17 +0000 to the format\n", + " 'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into\n", + " a database such as Redshift or RDS\n", + "\n", + " Note: requires the chars \"[ ]\" to be stripped prior to input\n", + " Returns the converted datetime annd timezone\n", + " or None for both values if failed\n", + "\n", + " TODO: Needs to combine timezone with date as one field\n", + " \"\"\"\n", + " date_time = None\n", + " time_zone_parsed = None\n", + "\n", + " # TODO: Probably cleaner to parse this with a regex\n", + " date_parsed = raw_date_time_zone[:raw_date_time_zone.find(\":\")]\n", + " time_parsed = raw_date_time_zone[raw_date_time_zone.find(\":\") + 1:\n", + " raw_date_time_zone.find(\"+\") - 1]\n", + " time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find(\"+\"):]\n", + "\n", + " try:\n", + " date_struct = time.strptime(date_parsed, \"%d/%b/%Y\")\n", + " converted_date = time.strftime(\"%Y-%m-%d\", date_struct)\n", + " date_time = converted_date + \" \" + time_parsed\n", + "\n", + " # Throws a ValueError exception if the operation fails that is\n", + " # caught by the calling function and is handled appropriately\n", + " except ValueError as error:\n", + " raise ValueError(error)\n", + " else:\n", + " return converted_date, date_time, time_zone_parsed\n", + "\n", + " def mapper(self, _, line):\n", + " line = line.strip()\n", + " match = self.logpat.search(line)\n", + "\n", + " date_time = None\n", + " requester = None\n", + " user_agent = None\n", + " operation = None\n", + "\n", + " try:\n", + " for n in range(self.NUM_ENTRIES_PER_LINE):\n", + " group = match.group(1 + n)\n", + "\n", + " if n == self.S3_LOG_DATE_TIME:\n", + " date, date_time, time_zone_parsed = \\\n", + " self.clean_date_time_zone(group)\n", + " # Leave the following line of code if \n", + " # you want to aggregate by date\n", + " date_time = date + \" 00:00:00\"\n", + " elif n == self.S3_LOG_REQUESTER_ID:\n", + " requester = group\n", + " elif n == self.S3_LOG_USER_AGENT:\n", + " user_agent = group\n", + " elif n == self.S3_LOG_OPERATION:\n", + " operation = group\n", + " else:\n", + " pass\n", + "\n", + " except Exception:\n", + " yield ((\"Error while parsing line: %s\", line), 1)\n", + " else:\n", + " yield ((date_time, requester, user_agent, operation), 1)\n", + "\n", + " def reducer(self, key, values):\n", + " output = list(key)\n", + " output = self.DELIMITER.join(output) + \\\n", + " self.DELIMITER + \\\n", + " str(sum(values))\n", + "\n", + " yield None, output\n", + "\n", + " def steps(self):\n", + " return [\n", + " self.mr(mapper=self.mapper,\n", + " reducer=self.reducer)\n", + " ]\n", + "\n", + "\n", + "if __name__ == '__main__':\n", + " MrS3LogParser.run()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Running Amazon Elastic MapReduce (EMR) Jobs" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run an Amazon EMR job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!python mr-mr_s3_log_parser.py -r emr s3://bucket-source/ --output-dir=s3://bucket-dest/" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run a MapReduce job locally on the specified input file, sending the results to the specified output file:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!python mr_s3_log_parser.py input_data.txt > output_data.txt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Unit Testing S3 Logs" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Accompanying unit test:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "%%file test_mr_s3_log_parser.py\n", + "\n", + "from StringIO import StringIO\n", + "import unittest2 as unittest\n", + "from mr_s3_log_parser import MrS3LogParser\n", + "\n", + "\n", + "class MrTestsUtil:\n", + "\n", + " def run_mr_sandbox(self, mr_job, stdin):\n", + " # inline runs the job in the same process so small jobs tend to\n", + " # run faster and stack traces are simpler\n", + " # --no-conf prevents options from local mrjob.conf from polluting\n", + " # the testing environment\n", + " # \"-\" reads from standard in\n", + " mr_job.sandbox(stdin=stdin)\n", + "\n", + " # make_runner ensures job cleanup is performed regardless of\n", + " # success or failure\n", + " with mr_job.make_runner() as runner:\n", + " runner.run()\n", + " for line in runner.stream_output():\n", + " key, value = mr_job.parse_output_line(line)\n", + " yield value\n", + "\n", + " \n", + "class TestMrS3LogParser(unittest.TestCase):\n", + "\n", + " mr_job = None\n", + " mr_tests_util = None\n", + "\n", + " RAW_LOG_LINE_INVALID = \\\n", + " '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \\\n", + " '00000388225bcc00000 ' \\\n", + " 's3-storage [22/Jul/2013:21:03:27 +0000] ' \\\n", + " '00.111.222.33 ' \\\n", + "\n", + " RAW_LOG_LINE_VALID = \\\n", + " '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \\\n", + " '00000388225bcc00000 ' \\\n", + " 's3-storage [22/Jul/2013:21:03:27 +0000] ' \\\n", + " '00.111.222.33 ' \\\n", + " 'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \\\n", + " 'REST.HEAD.OBJECT user/file.pdf ' \\\n", + " '\"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \\\n", + " '00000SDZk ' \\\n", + " 'HTTP/1.1\" 200 - - 4000272 18 - \"-\" ' \\\n", + " '\"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0\" ' \\\n", + " '00000XMHZJp6DjM9x5JVEAMo8MG00000'\n", + "\n", + " DATE_TIME_ZONE_INVALID = \"AB/Jul/2013:21:04:17 +0000\"\n", + " DATE_TIME_ZONE_VALID = \"22/Jul/2013:21:04:17 +0000\"\n", + " DATE_VALID = \"2013-07-22\"\n", + " DATE_TIME_VALID = \"2013-07-22 21:04:17\"\n", + " TIME_ZONE_VALID = \"+0000\"\n", + "\n", + " def __init__(self, *args, **kwargs):\n", + " super(TestMrS3LogParser, self).__init__(*args, **kwargs)\n", + " self.mr_job = MrS3LogParser(['-r', 'inline', '--no-conf', '-'])\n", + " self.mr_tests_util = MrTestsUtil()\n", + "\n", + " def test_invalid_log_lines(self):\n", + " stdin = StringIO(self.RAW_LOG_LINE_INVALID)\n", + "\n", + " for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):\n", + " self.assertEqual(result.find(\"Error\"), 0)\n", + "\n", + " def test_valid_log_lines(self):\n", + " stdin = StringIO(self.RAW_LOG_LINE_VALID)\n", + "\n", + " for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):\n", + " self.assertEqual(result.find(\"Error\"), -1)\n", + "\n", + " def test_clean_date_time_zone(self):\n", + " date, date_time, time_zone_parsed = \\\n", + " self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID)\n", + " self.assertEqual(date, self.DATE_VALID)\n", + " self.assertEqual(date_time, self.DATE_TIME_VALID)\n", + " self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID)\n", + "\n", + " # Use a lambda to delay the calling of clean_date_time_zone so that\n", + " # assertRaises has enough time to handle it properly\n", + " self.assertRaises(ValueError,\n", + " lambda: self.mr_job.clean_date_time_zone(\n", + " self.DATE_TIME_ZONE_INVALID))\n", + "\n", + "if __name__ == '__main__':\n", + " unittest.main()\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Running S3 Logs Unit Test" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run the mrjob test:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "!python test_mr_s3_log_parser.py -v" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample .mrjob.conf" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "runners:\n", + " emr:\n", + " aws_access_key_id: __ACCESS_KEY__\n", + " aws_secret_access_key: __SECRET_ACCESS_KEY__\n", + " aws_region: us-east-1\n", + " ec2_key_pair: EMR\n", + " ec2_key_pair_file: ~/.ssh/EMR.pem\n", + " ssh_tunnel_to_job_tracker: true\n", + " ec2_master_instance_type: m3.xlarge\n", + " ec2_instance_type: m3.xlarge\n", + " num_ec2_instances: 5\n", + " s3_scratch_uri: s3://bucket/tmp/\n", + " s3_log_uri: s3://bucket/tmp/logs/\n", + " enable_emr_debugging: True\n", + " bootstrap:\n", + " - sudo apt-get install -y python-pip\n", + " - sudo pip install --upgrade simplejson" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 2", + "language": "python", + "name": "python2" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.9" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/mapreduce/mr_s3_log_parser.py b/mapreduce/mr_s3_log_parser.py new file mode 100644 index 0000000..f445d2e --- /dev/null +++ b/mapreduce/mr_s3_log_parser.py @@ -0,0 +1,137 @@ + +import time +from mrjob.job import MRJob +from mrjob.protocol import RawValueProtocol, ReprProtocol +import re + + +class MrS3LogParser(MRJob): + """Parses the logs from S3 based on the S3 logging format: + http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html + + Aggregates a user's daily requests by user agent and operation + + Outputs date_time, requester, user_agent, operation, count + """ + + LOGPATS = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \ + r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \ + r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \ + r'("([^"]+)"|-) ("([^"]+)"|-)' + NUM_ENTRIES_PER_LINE = 17 + logpat = re.compile(LOGPATS) + + (S3_LOG_BUCKET_OWNER, + S3_LOG_BUCKET, + S3_LOG_DATE_TIME, + S3_LOG_IP, + S3_LOG_REQUESTER_ID, + S3_LOG_REQUEST_ID, + S3_LOG_OPERATION, + S3_LOG_KEY, + S3_LOG_HTTP_METHOD, + S3_LOG_HTTP_STATUS, + S3_LOG_S3_ERROR, + S3_LOG_BYTES_SENT, + S3_LOG_OBJECT_SIZE, + S3_LOG_TOTAL_TIME, + S3_LOG_TURN_AROUND_TIME, + S3_LOG_REFERER, + S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE) + + DELIMITER = '\t' + + # We use RawValueProtocol for input to be format agnostic + # and avoid any type of parsing errors + INPUT_PROTOCOL = RawValueProtocol + + # We use RawValueProtocol for output so we can output raw lines + # instead of (k, v) pairs + OUTPUT_PROTOCOL = RawValueProtocol + + # Encode the intermediate records using repr() instead of JSON, so the + # record doesn't get Unicode-encoded + INTERNAL_PROTOCOL = ReprProtocol + + def clean_date_time_zone(self, raw_date_time_zone): + """Converts entry 22/Jul/2013:21:04:17 +0000 to the format + 'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into + a database such as Redshift or RDS + + Note: requires the chars "[ ]" to be stripped prior to input + Returns the converted datetime annd timezone + or None for both values if failed + + TODO: Needs to combine timezone with date as one field + """ + date_time = None + time_zone_parsed = None + + # TODO: Probably cleaner to parse this with a regex + date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")] + time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1: + raw_date_time_zone.find("+") - 1] + time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):] + + try: + date_struct = time.strptime(date_parsed, "%d/%b/%Y") + converted_date = time.strftime("%Y-%m-%d", date_struct) + date_time = converted_date + " " + time_parsed + + # Throws a ValueError exception if the operation fails that is + # caught by the calling function and is handled appropriately + except ValueError as error: + raise ValueError(error) + else: + return converted_date, date_time, time_zone_parsed + + def mapper(self, _, line): + line = line.strip() + match = self.logpat.search(line) + + date_time = None + requester = None + user_agent = None + operation = None + + try: + for n in range(self.NUM_ENTRIES_PER_LINE): + group = match.group(1 + n) + + if n == self.S3_LOG_DATE_TIME: + date, date_time, time_zone_parsed = \ + self.clean_date_time_zone(group) + # Leave the following line of code if + # you want to aggregate by date + date_time = date + " 00:00:00" + elif n == self.S3_LOG_REQUESTER_ID: + requester = group + elif n == self.S3_LOG_USER_AGENT: + user_agent = group + elif n == self.S3_LOG_OPERATION: + operation = group + else: + pass + + except Exception: + yield (("Error while parsing line: %s", line), 1) + else: + yield ((date_time, requester, user_agent, operation), 1) + + def reducer(self, key, values): + output = list(key) + output = self.DELIMITER.join(output) + \ + self.DELIMITER + \ + str(sum(values)) + + yield None, output + + def steps(self): + return [ + self.mr(mapper=self.mapper, + reducer=self.reducer) + ] + + +if __name__ == '__main__': + MrS3LogParser.run() \ No newline at end of file diff --git a/mapreduce/test_mr_s3_log_parser.py b/mapreduce/test_mr_s3_log_parser.py new file mode 100644 index 0000000..61cffac --- /dev/null +++ b/mapreduce/test_mr_s3_log_parser.py @@ -0,0 +1,87 @@ + +from StringIO import StringIO +import unittest2 as unittest +from mr_s3_log_parser import MrS3LogParser + + +class MrTestsUtil: + + def run_mr_sandbox(self, mr_job, stdin): + # inline runs the job in the same process so small jobs tend to + # run faster and stack traces are simpler + # --no-conf prevents options from local mrjob.conf from polluting + # the testing environment + # "-" reads from standard in + mr_job.sandbox(stdin=stdin) + + # make_runner ensures job cleanup is performed regardless of + # success or failure + with mr_job.make_runner() as runner: + runner.run() + for line in runner.stream_output(): + key, value = mr_job.parse_output_line(line) + yield value + + +class TestMrS3LogParser(unittest.TestCase): + + mr_job = None + mr_tests_util = None + + RAW_LOG_LINE_INVALID = \ + '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \ + '00000388225bcc00000 ' \ + 's3-storage [22/Jul/2013:21:03:27 +0000] ' \ + '00.111.222.33 ' \ + + RAW_LOG_LINE_VALID = \ + '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \ + '00000388225bcc00000 ' \ + 's3-storage [22/Jul/2013:21:03:27 +0000] ' \ + '00.111.222.33 ' \ + 'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \ + 'REST.HEAD.OBJECT user/file.pdf ' \ + '"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \ + '00000SDZk ' \ + 'HTTP/1.1" 200 - - 4000272 18 - "-" ' \ + '"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0" ' \ + '00000XMHZJp6DjM9x5JVEAMo8MG00000' + + DATE_TIME_ZONE_INVALID = "AB/Jul/2013:21:04:17 +0000" + DATE_TIME_ZONE_VALID = "22/Jul/2013:21:04:17 +0000" + DATE_VALID = "2013-07-22" + DATE_TIME_VALID = "2013-07-22 21:04:17" + TIME_ZONE_VALID = "+0000" + + def __init__(self, *args, **kwargs): + super(TestMrS3LogParser, self).__init__(*args, **kwargs) + self.mr_job = MrS3LogParser(['-r', 'inline', '--no-conf', '-']) + self.mr_tests_util = MrTestsUtil() + + def test_invalid_log_lines(self): + stdin = StringIO(self.RAW_LOG_LINE_INVALID) + + for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin): + self.assertEqual(result.find("Error"), 0) + + def test_valid_log_lines(self): + stdin = StringIO(self.RAW_LOG_LINE_VALID) + + for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin): + self.assertEqual(result.find("Error"), -1) + + def test_clean_date_time_zone(self): + date, date_time, time_zone_parsed = \ + self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID) + self.assertEqual(date, self.DATE_VALID) + self.assertEqual(date_time, self.DATE_TIME_VALID) + self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID) + + # Use a lambda to delay the calling of clean_date_time_zone so that + # assertRaises has enough time to handle it properly + self.assertRaises(ValueError, + lambda: self.mr_job.clean_date_time_zone( + self.DATE_TIME_ZONE_INVALID)) + +if __name__ == '__main__': + unittest.main()