diff --git a/aws/aws.ipynb b/aws/aws.ipynb index 88ac9c4..a9d8397 100644 --- a/aws/aws.ipynb +++ b/aws/aws.ipynb @@ -1,7 +1,7 @@ { "metadata": { "name": "", - "signature": "sha256:760f0227418945ff60ae747898cfec6f9614a279e133bfb3ef96560860b3ce0d" + "signature": "sha256:0cdb20316206ad6c3f940e94e66a032ed0c4eb65efde6e22ae63148c58f75547" }, "nbformat": 3, "nbformat_minor": 0, @@ -354,7 +354,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Run a MapReduce job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):" + "Run an Amazon EMR job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):" ] }, { @@ -384,6 +384,160 @@ "metadata": {}, "outputs": [] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Sample mrjob code that processes log files on Amazon S3 based on the [S3 logging format](http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html):" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%%file mr_s3_log_parser.py\n", + "\n", + "import time\n", + "from mrjob.job import MRJob\n", + "from mrjob.protocol import RawValueProtocol, ReprProtocol\n", + "import re\n", + "\n", + "\n", + "class MrS3LogParser(MRJob):\n", + " \"\"\"Parses the logs from S3 based on the S3 logging format:\n", + " http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html\n", + " \n", + " Aggregates a user's daily requests by user agent and operation\n", + " \n", + " Outputs date_time, requester, user_agent, operation, count\n", + " \"\"\"\n", + "\n", + " LOGPATS = r'(\\S+) (\\S+) \\[(.*?)\\] (\\S+) (\\S+) ' \\\n", + " r'(\\S+) (\\S+) (\\S+) (\"([^\"]+)\"|-) ' \\\n", + " r'(\\S+) (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) ' \\\n", + " r'(\"([^\"]+)\"|-) (\"([^\"]+)\"|-)'\n", + " NUM_ENTRIES_PER_LINE = 17\n", + " logpat = re.compile(LOGPATS)\n", + "\n", + " (S3_LOG_BUCKET_OWNER, \n", + " S3_LOG_BUCKET, \n", + " S3_LOG_DATE_TIME,\n", + " S3_LOG_IP, \n", + " S3_LOG_REQUESTER_ID, \n", + " S3_LOG_REQUEST_ID,\n", + " S3_LOG_OPERATION, \n", + " S3_LOG_KEY, \n", + " S3_LOG_HTTP_METHOD,\n", + " S3_LOG_HTTP_STATUS, \n", + " S3_LOG_S3_ERROR, \n", + " S3_LOG_BYTES_SENT,\n", + " S3_LOG_OBJECT_SIZE, \n", + " S3_LOG_TOTAL_TIME, \n", + " S3_LOG_TURN_AROUND_TIME,\n", + " S3_LOG_REFERER, \n", + " S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)\n", + "\n", + " DELIMITER = '\\t'\n", + "\n", + " # We use RawValueProtocol for input to be format agnostic\n", + " # and avoid any type of parsing errors\n", + " INPUT_PROTOCOL = RawValueProtocol\n", + "\n", + " # We use RawValueProtocol for output so we can output raw lines\n", + " # instead of (k, v) pairs\n", + " OUTPUT_PROTOCOL = RawValueProtocol\n", + "\n", + " # Encode the intermediate records using repr() instead of JSON, so the\n", + " # record doesn't get Unicode-encoded\n", + " INTERNAL_PROTOCOL = ReprProtocol\n", + "\n", + " def clean_date_time_zone(self, raw_date_time_zone):\n", + " \"\"\"Converts entry 22/Jul/2013:21:04:17 +0000 to the format\n", + " 'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into\n", + " a database such as Redshift or RDS\n", + "\n", + " Note: requires the chars \"[ ]\" to be stripped prior to input\n", + " Returns the converted datetime annd timezone\n", + " or None for both values if failed\n", + "\n", + " TODO: Needs to combine timezone with date as one field\n", + " \"\"\"\n", + " date_time = None\n", + " time_zone_parsed = None\n", + "\n", + " # TODO: Probably cleaner to parse this with a regex\n", + " date_parsed = raw_date_time_zone[:raw_date_time_zone.find(\":\")]\n", + " time_parsed = raw_date_time_zone[raw_date_time_zone.find(\":\") + 1:\n", + " raw_date_time_zone.find(\"+\") - 1]\n", + " time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find(\"+\"):]\n", + "\n", + " try:\n", + " date_struct = time.strptime(date_parsed, \"%d/%b/%Y\")\n", + " converted_date = time.strftime(\"%Y-%m-%d\", date_struct)\n", + " date_time = converted_date + \" \" + time_parsed\n", + "\n", + " # Throws a ValueError exception if the operation fails that is\n", + " # caught by the calling function and is handled appropriately\n", + " except ValueError as error:\n", + " raise ValueError(error)\n", + " else:\n", + " return converted_date, date_time, time_zone_parsed\n", + "\n", + " def mapper(self, _, line):\n", + " line = line.strip()\n", + " match = self.logpat.search(line)\n", + "\n", + " date_time = None\n", + " requester = None\n", + " user_agent = None\n", + " operation = None\n", + "\n", + " try:\n", + " for n in range(self.NUM_ENTRIES_PER_LINE):\n", + " group = match.group(1 + n)\n", + "\n", + " if n == self.S3_LOG_DATE_TIME:\n", + " date, date_time, time_zone_parsed = \\\n", + " self.clean_date_time_zone(group)\n", + " # Leave the following line of code if \n", + " # you want to aggregate by date\n", + " date_time = date + \" 00:00:00\"\n", + " elif n == self.S3_LOG_REQUESTER_ID:\n", + " requester = group\n", + " elif n == self.S3_LOG_USER_AGENT:\n", + " user_agent = group\n", + " elif n == self.S3_LOG_OPERATION:\n", + " operation = group\n", + " else:\n", + " pass\n", + "\n", + " except Exception:\n", + " yield ((\"Error while parsing line: %s\", line), 1)\n", + " else:\n", + " yield ((date_time, requester, user_agent, operation), 1)\n", + "\n", + " def reducer(self, key, values):\n", + " output = list(key)\n", + " output = self.DELIMITER.join(output) + \\\n", + " self.DELIMITER + \\\n", + " str(sum(values))\n", + "\n", + " yield None, output\n", + "\n", + " def steps(self):\n", + " return [\n", + " self.mr(mapper=self.mapper,\n", + " reducer=self.reducer)\n", + " ]\n", + "\n", + "\n", + "if __name__ == '__main__':\n", + " MrS3LogParser.run()" + ], + "language": "python", + "metadata": {}, + "outputs": [] + }, { "cell_type": "markdown", "metadata": {},