mirror of
https://github.com/donnemartin/data-science-ipython-notebooks.git
synced 2024-03-22 13:30:56 +08:00
Added mapreduce with python using mrjob IPython Notebook.
This commit is contained in:
parent
6109274ff0
commit
0890d2b2dc
452
mapreduce/mapreduce-python.ipynb
Normal file
452
mapreduce/mapreduce-python.ipynb
Normal file
|
@ -0,0 +1,452 @@
|
|||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Hadoop MapReduce: Python Streaming with mrjob\n",
|
||||
"\n",
|
||||
"* Introduction\n",
|
||||
"* Setup\n",
|
||||
"* Processing S3 Logs\n",
|
||||
"* Running Amazon Elastic MapReduce (EMR) Jobs\n",
|
||||
"* Unit Testing S3 Logs\n",
|
||||
"* Running S3 Logs Unit Test\n",
|
||||
"* Sample .mrjob.conf"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Introduction\n",
|
||||
"\n",
|
||||
"[mrjob](https://pythonhosted.org/mrjob/) lets you write MapReduce jobs in Python 2.5+ and run them on several platforms. You can:\n",
|
||||
"\n",
|
||||
"* Write multi-step MapReduce jobs in pure Python\n",
|
||||
"* Test on your local machine\n",
|
||||
"* Run on a Hadoop cluster\n",
|
||||
"* Run in the cloud using Amazon Elastic MapReduce (EMR)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Setup\n",
|
||||
"\n",
|
||||
"From PyPI:\n",
|
||||
"\n",
|
||||
"``pip install mrjob``\n",
|
||||
"\n",
|
||||
"From source:\n",
|
||||
"\n",
|
||||
"``python setup.py install``\n",
|
||||
"\n",
|
||||
"See \"Sample .mrjob.conf\" section for additional config details."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Processing S3 Logs\n",
|
||||
"\n",
|
||||
"Sample mrjob code that processes log files on Amazon S3 based on the [S3 logging format](http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html):"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"%%file mr_s3_log_parser.py\n",
|
||||
"\n",
|
||||
"import time\n",
|
||||
"from mrjob.job import MRJob\n",
|
||||
"from mrjob.protocol import RawValueProtocol, ReprProtocol\n",
|
||||
"import re\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"class MrS3LogParser(MRJob):\n",
|
||||
" \"\"\"Parses the logs from S3 based on the S3 logging format:\n",
|
||||
" http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html\n",
|
||||
" \n",
|
||||
" Aggregates a user's daily requests by user agent and operation\n",
|
||||
" \n",
|
||||
" Outputs date_time, requester, user_agent, operation, count\n",
|
||||
" \"\"\"\n",
|
||||
"\n",
|
||||
" LOGPATS = r'(\\S+) (\\S+) \\[(.*?)\\] (\\S+) (\\S+) ' \\\n",
|
||||
" r'(\\S+) (\\S+) (\\S+) (\"([^\"]+)\"|-) ' \\\n",
|
||||
" r'(\\S+) (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) ' \\\n",
|
||||
" r'(\"([^\"]+)\"|-) (\"([^\"]+)\"|-)'\n",
|
||||
" NUM_ENTRIES_PER_LINE = 17\n",
|
||||
" logpat = re.compile(LOGPATS)\n",
|
||||
"\n",
|
||||
" (S3_LOG_BUCKET_OWNER, \n",
|
||||
" S3_LOG_BUCKET, \n",
|
||||
" S3_LOG_DATE_TIME,\n",
|
||||
" S3_LOG_IP, \n",
|
||||
" S3_LOG_REQUESTER_ID, \n",
|
||||
" S3_LOG_REQUEST_ID,\n",
|
||||
" S3_LOG_OPERATION, \n",
|
||||
" S3_LOG_KEY, \n",
|
||||
" S3_LOG_HTTP_METHOD,\n",
|
||||
" S3_LOG_HTTP_STATUS, \n",
|
||||
" S3_LOG_S3_ERROR, \n",
|
||||
" S3_LOG_BYTES_SENT,\n",
|
||||
" S3_LOG_OBJECT_SIZE, \n",
|
||||
" S3_LOG_TOTAL_TIME, \n",
|
||||
" S3_LOG_TURN_AROUND_TIME,\n",
|
||||
" S3_LOG_REFERER, \n",
|
||||
" S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)\n",
|
||||
"\n",
|
||||
" DELIMITER = '\\t'\n",
|
||||
"\n",
|
||||
" # We use RawValueProtocol for input to be format agnostic\n",
|
||||
" # and avoid any type of parsing errors\n",
|
||||
" INPUT_PROTOCOL = RawValueProtocol\n",
|
||||
"\n",
|
||||
" # We use RawValueProtocol for output so we can output raw lines\n",
|
||||
" # instead of (k, v) pairs\n",
|
||||
" OUTPUT_PROTOCOL = RawValueProtocol\n",
|
||||
"\n",
|
||||
" # Encode the intermediate records using repr() instead of JSON, so the\n",
|
||||
" # record doesn't get Unicode-encoded\n",
|
||||
" INTERNAL_PROTOCOL = ReprProtocol\n",
|
||||
"\n",
|
||||
" def clean_date_time_zone(self, raw_date_time_zone):\n",
|
||||
" \"\"\"Converts entry 22/Jul/2013:21:04:17 +0000 to the format\n",
|
||||
" 'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into\n",
|
||||
" a database such as Redshift or RDS\n",
|
||||
"\n",
|
||||
" Note: requires the chars \"[ ]\" to be stripped prior to input\n",
|
||||
" Returns the converted datetime annd timezone\n",
|
||||
" or None for both values if failed\n",
|
||||
"\n",
|
||||
" TODO: Needs to combine timezone with date as one field\n",
|
||||
" \"\"\"\n",
|
||||
" date_time = None\n",
|
||||
" time_zone_parsed = None\n",
|
||||
"\n",
|
||||
" # TODO: Probably cleaner to parse this with a regex\n",
|
||||
" date_parsed = raw_date_time_zone[:raw_date_time_zone.find(\":\")]\n",
|
||||
" time_parsed = raw_date_time_zone[raw_date_time_zone.find(\":\") + 1:\n",
|
||||
" raw_date_time_zone.find(\"+\") - 1]\n",
|
||||
" time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find(\"+\"):]\n",
|
||||
"\n",
|
||||
" try:\n",
|
||||
" date_struct = time.strptime(date_parsed, \"%d/%b/%Y\")\n",
|
||||
" converted_date = time.strftime(\"%Y-%m-%d\", date_struct)\n",
|
||||
" date_time = converted_date + \" \" + time_parsed\n",
|
||||
"\n",
|
||||
" # Throws a ValueError exception if the operation fails that is\n",
|
||||
" # caught by the calling function and is handled appropriately\n",
|
||||
" except ValueError as error:\n",
|
||||
" raise ValueError(error)\n",
|
||||
" else:\n",
|
||||
" return converted_date, date_time, time_zone_parsed\n",
|
||||
"\n",
|
||||
" def mapper(self, _, line):\n",
|
||||
" line = line.strip()\n",
|
||||
" match = self.logpat.search(line)\n",
|
||||
"\n",
|
||||
" date_time = None\n",
|
||||
" requester = None\n",
|
||||
" user_agent = None\n",
|
||||
" operation = None\n",
|
||||
"\n",
|
||||
" try:\n",
|
||||
" for n in range(self.NUM_ENTRIES_PER_LINE):\n",
|
||||
" group = match.group(1 + n)\n",
|
||||
"\n",
|
||||
" if n == self.S3_LOG_DATE_TIME:\n",
|
||||
" date, date_time, time_zone_parsed = \\\n",
|
||||
" self.clean_date_time_zone(group)\n",
|
||||
" # Leave the following line of code if \n",
|
||||
" # you want to aggregate by date\n",
|
||||
" date_time = date + \" 00:00:00\"\n",
|
||||
" elif n == self.S3_LOG_REQUESTER_ID:\n",
|
||||
" requester = group\n",
|
||||
" elif n == self.S3_LOG_USER_AGENT:\n",
|
||||
" user_agent = group\n",
|
||||
" elif n == self.S3_LOG_OPERATION:\n",
|
||||
" operation = group\n",
|
||||
" else:\n",
|
||||
" pass\n",
|
||||
"\n",
|
||||
" except Exception:\n",
|
||||
" yield ((\"Error while parsing line: %s\", line), 1)\n",
|
||||
" else:\n",
|
||||
" yield ((date_time, requester, user_agent, operation), 1)\n",
|
||||
"\n",
|
||||
" def reducer(self, key, values):\n",
|
||||
" output = list(key)\n",
|
||||
" output = self.DELIMITER.join(output) + \\\n",
|
||||
" self.DELIMITER + \\\n",
|
||||
" str(sum(values))\n",
|
||||
"\n",
|
||||
" yield None, output\n",
|
||||
"\n",
|
||||
" def steps(self):\n",
|
||||
" return [\n",
|
||||
" self.mr(mapper=self.mapper,\n",
|
||||
" reducer=self.reducer)\n",
|
||||
" ]\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"if __name__ == '__main__':\n",
|
||||
" MrS3LogParser.run()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Running Amazon Elastic MapReduce (EMR) Jobs"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Run an Amazon EMR job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"!python mr-mr_s3_log_parser.py -r emr s3://bucket-source/ --output-dir=s3://bucket-dest/"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Run a MapReduce job locally on the specified input file, sending the results to the specified output file:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"!python mr_s3_log_parser.py input_data.txt > output_data.txt"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Unit Testing S3 Logs"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Accompanying unit test:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"%%file test_mr_s3_log_parser.py\n",
|
||||
"\n",
|
||||
"from StringIO import StringIO\n",
|
||||
"import unittest2 as unittest\n",
|
||||
"from mr_s3_log_parser import MrS3LogParser\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"class MrTestsUtil:\n",
|
||||
"\n",
|
||||
" def run_mr_sandbox(self, mr_job, stdin):\n",
|
||||
" # inline runs the job in the same process so small jobs tend to\n",
|
||||
" # run faster and stack traces are simpler\n",
|
||||
" # --no-conf prevents options from local mrjob.conf from polluting\n",
|
||||
" # the testing environment\n",
|
||||
" # \"-\" reads from standard in\n",
|
||||
" mr_job.sandbox(stdin=stdin)\n",
|
||||
"\n",
|
||||
" # make_runner ensures job cleanup is performed regardless of\n",
|
||||
" # success or failure\n",
|
||||
" with mr_job.make_runner() as runner:\n",
|
||||
" runner.run()\n",
|
||||
" for line in runner.stream_output():\n",
|
||||
" key, value = mr_job.parse_output_line(line)\n",
|
||||
" yield value\n",
|
||||
"\n",
|
||||
" \n",
|
||||
"class TestMrS3LogParser(unittest.TestCase):\n",
|
||||
"\n",
|
||||
" mr_job = None\n",
|
||||
" mr_tests_util = None\n",
|
||||
"\n",
|
||||
" RAW_LOG_LINE_INVALID = \\\n",
|
||||
" '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \\\n",
|
||||
" '00000388225bcc00000 ' \\\n",
|
||||
" 's3-storage [22/Jul/2013:21:03:27 +0000] ' \\\n",
|
||||
" '00.111.222.33 ' \\\n",
|
||||
"\n",
|
||||
" RAW_LOG_LINE_VALID = \\\n",
|
||||
" '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \\\n",
|
||||
" '00000388225bcc00000 ' \\\n",
|
||||
" 's3-storage [22/Jul/2013:21:03:27 +0000] ' \\\n",
|
||||
" '00.111.222.33 ' \\\n",
|
||||
" 'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \\\n",
|
||||
" 'REST.HEAD.OBJECT user/file.pdf ' \\\n",
|
||||
" '\"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \\\n",
|
||||
" '00000SDZk ' \\\n",
|
||||
" 'HTTP/1.1\" 200 - - 4000272 18 - \"-\" ' \\\n",
|
||||
" '\"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0\" ' \\\n",
|
||||
" '00000XMHZJp6DjM9x5JVEAMo8MG00000'\n",
|
||||
"\n",
|
||||
" DATE_TIME_ZONE_INVALID = \"AB/Jul/2013:21:04:17 +0000\"\n",
|
||||
" DATE_TIME_ZONE_VALID = \"22/Jul/2013:21:04:17 +0000\"\n",
|
||||
" DATE_VALID = \"2013-07-22\"\n",
|
||||
" DATE_TIME_VALID = \"2013-07-22 21:04:17\"\n",
|
||||
" TIME_ZONE_VALID = \"+0000\"\n",
|
||||
"\n",
|
||||
" def __init__(self, *args, **kwargs):\n",
|
||||
" super(TestMrS3LogParser, self).__init__(*args, **kwargs)\n",
|
||||
" self.mr_job = MrS3LogParser(['-r', 'inline', '--no-conf', '-'])\n",
|
||||
" self.mr_tests_util = MrTestsUtil()\n",
|
||||
"\n",
|
||||
" def test_invalid_log_lines(self):\n",
|
||||
" stdin = StringIO(self.RAW_LOG_LINE_INVALID)\n",
|
||||
"\n",
|
||||
" for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):\n",
|
||||
" self.assertEqual(result.find(\"Error\"), 0)\n",
|
||||
"\n",
|
||||
" def test_valid_log_lines(self):\n",
|
||||
" stdin = StringIO(self.RAW_LOG_LINE_VALID)\n",
|
||||
"\n",
|
||||
" for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):\n",
|
||||
" self.assertEqual(result.find(\"Error\"), -1)\n",
|
||||
"\n",
|
||||
" def test_clean_date_time_zone(self):\n",
|
||||
" date, date_time, time_zone_parsed = \\\n",
|
||||
" self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID)\n",
|
||||
" self.assertEqual(date, self.DATE_VALID)\n",
|
||||
" self.assertEqual(date_time, self.DATE_TIME_VALID)\n",
|
||||
" self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID)\n",
|
||||
"\n",
|
||||
" # Use a lambda to delay the calling of clean_date_time_zone so that\n",
|
||||
" # assertRaises has enough time to handle it properly\n",
|
||||
" self.assertRaises(ValueError,\n",
|
||||
" lambda: self.mr_job.clean_date_time_zone(\n",
|
||||
" self.DATE_TIME_ZONE_INVALID))\n",
|
||||
"\n",
|
||||
"if __name__ == '__main__':\n",
|
||||
" unittest.main()\n",
|
||||
"\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Running S3 Logs Unit Test"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Run the mrjob test:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"!python test_mr_s3_log_parser.py -v"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Sample .mrjob.conf"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"runners:\n",
|
||||
" emr:\n",
|
||||
" aws_access_key_id: __ACCESS_KEY__\n",
|
||||
" aws_secret_access_key: __SECRET_ACCESS_KEY__\n",
|
||||
" aws_region: us-east-1\n",
|
||||
" ec2_key_pair: EMR\n",
|
||||
" ec2_key_pair_file: ~/.ssh/EMR.pem\n",
|
||||
" ssh_tunnel_to_job_tracker: true\n",
|
||||
" ec2_master_instance_type: m3.xlarge\n",
|
||||
" ec2_instance_type: m3.xlarge\n",
|
||||
" num_ec2_instances: 5\n",
|
||||
" s3_scratch_uri: s3://bucket/tmp/\n",
|
||||
" s3_log_uri: s3://bucket/tmp/logs/\n",
|
||||
" enable_emr_debugging: True\n",
|
||||
" bootstrap:\n",
|
||||
" - sudo apt-get install -y python-pip\n",
|
||||
" - sudo pip install --upgrade simplejson"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 2",
|
||||
"language": "python",
|
||||
"name": "python2"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 2
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython2",
|
||||
"version": "2.7.9"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 0
|
||||
}
|
137
mapreduce/mr_s3_log_parser.py
Normal file
137
mapreduce/mr_s3_log_parser.py
Normal file
|
@ -0,0 +1,137 @@
|
|||
|
||||
import time
|
||||
from mrjob.job import MRJob
|
||||
from mrjob.protocol import RawValueProtocol, ReprProtocol
|
||||
import re
|
||||
|
||||
|
||||
class MrS3LogParser(MRJob):
|
||||
"""Parses the logs from S3 based on the S3 logging format:
|
||||
http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html
|
||||
|
||||
Aggregates a user's daily requests by user agent and operation
|
||||
|
||||
Outputs date_time, requester, user_agent, operation, count
|
||||
"""
|
||||
|
||||
LOGPATS = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \
|
||||
r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \
|
||||
r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \
|
||||
r'("([^"]+)"|-) ("([^"]+)"|-)'
|
||||
NUM_ENTRIES_PER_LINE = 17
|
||||
logpat = re.compile(LOGPATS)
|
||||
|
||||
(S3_LOG_BUCKET_OWNER,
|
||||
S3_LOG_BUCKET,
|
||||
S3_LOG_DATE_TIME,
|
||||
S3_LOG_IP,
|
||||
S3_LOG_REQUESTER_ID,
|
||||
S3_LOG_REQUEST_ID,
|
||||
S3_LOG_OPERATION,
|
||||
S3_LOG_KEY,
|
||||
S3_LOG_HTTP_METHOD,
|
||||
S3_LOG_HTTP_STATUS,
|
||||
S3_LOG_S3_ERROR,
|
||||
S3_LOG_BYTES_SENT,
|
||||
S3_LOG_OBJECT_SIZE,
|
||||
S3_LOG_TOTAL_TIME,
|
||||
S3_LOG_TURN_AROUND_TIME,
|
||||
S3_LOG_REFERER,
|
||||
S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)
|
||||
|
||||
DELIMITER = '\t'
|
||||
|
||||
# We use RawValueProtocol for input to be format agnostic
|
||||
# and avoid any type of parsing errors
|
||||
INPUT_PROTOCOL = RawValueProtocol
|
||||
|
||||
# We use RawValueProtocol for output so we can output raw lines
|
||||
# instead of (k, v) pairs
|
||||
OUTPUT_PROTOCOL = RawValueProtocol
|
||||
|
||||
# Encode the intermediate records using repr() instead of JSON, so the
|
||||
# record doesn't get Unicode-encoded
|
||||
INTERNAL_PROTOCOL = ReprProtocol
|
||||
|
||||
def clean_date_time_zone(self, raw_date_time_zone):
|
||||
"""Converts entry 22/Jul/2013:21:04:17 +0000 to the format
|
||||
'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into
|
||||
a database such as Redshift or RDS
|
||||
|
||||
Note: requires the chars "[ ]" to be stripped prior to input
|
||||
Returns the converted datetime annd timezone
|
||||
or None for both values if failed
|
||||
|
||||
TODO: Needs to combine timezone with date as one field
|
||||
"""
|
||||
date_time = None
|
||||
time_zone_parsed = None
|
||||
|
||||
# TODO: Probably cleaner to parse this with a regex
|
||||
date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")]
|
||||
time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1:
|
||||
raw_date_time_zone.find("+") - 1]
|
||||
time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):]
|
||||
|
||||
try:
|
||||
date_struct = time.strptime(date_parsed, "%d/%b/%Y")
|
||||
converted_date = time.strftime("%Y-%m-%d", date_struct)
|
||||
date_time = converted_date + " " + time_parsed
|
||||
|
||||
# Throws a ValueError exception if the operation fails that is
|
||||
# caught by the calling function and is handled appropriately
|
||||
except ValueError as error:
|
||||
raise ValueError(error)
|
||||
else:
|
||||
return converted_date, date_time, time_zone_parsed
|
||||
|
||||
def mapper(self, _, line):
|
||||
line = line.strip()
|
||||
match = self.logpat.search(line)
|
||||
|
||||
date_time = None
|
||||
requester = None
|
||||
user_agent = None
|
||||
operation = None
|
||||
|
||||
try:
|
||||
for n in range(self.NUM_ENTRIES_PER_LINE):
|
||||
group = match.group(1 + n)
|
||||
|
||||
if n == self.S3_LOG_DATE_TIME:
|
||||
date, date_time, time_zone_parsed = \
|
||||
self.clean_date_time_zone(group)
|
||||
# Leave the following line of code if
|
||||
# you want to aggregate by date
|
||||
date_time = date + " 00:00:00"
|
||||
elif n == self.S3_LOG_REQUESTER_ID:
|
||||
requester = group
|
||||
elif n == self.S3_LOG_USER_AGENT:
|
||||
user_agent = group
|
||||
elif n == self.S3_LOG_OPERATION:
|
||||
operation = group
|
||||
else:
|
||||
pass
|
||||
|
||||
except Exception:
|
||||
yield (("Error while parsing line: %s", line), 1)
|
||||
else:
|
||||
yield ((date_time, requester, user_agent, operation), 1)
|
||||
|
||||
def reducer(self, key, values):
|
||||
output = list(key)
|
||||
output = self.DELIMITER.join(output) + \
|
||||
self.DELIMITER + \
|
||||
str(sum(values))
|
||||
|
||||
yield None, output
|
||||
|
||||
def steps(self):
|
||||
return [
|
||||
self.mr(mapper=self.mapper,
|
||||
reducer=self.reducer)
|
||||
]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
MrS3LogParser.run()
|
87
mapreduce/test_mr_s3_log_parser.py
Normal file
87
mapreduce/test_mr_s3_log_parser.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
|
||||
from StringIO import StringIO
|
||||
import unittest2 as unittest
|
||||
from mr_s3_log_parser import MrS3LogParser
|
||||
|
||||
|
||||
class MrTestsUtil:
|
||||
|
||||
def run_mr_sandbox(self, mr_job, stdin):
|
||||
# inline runs the job in the same process so small jobs tend to
|
||||
# run faster and stack traces are simpler
|
||||
# --no-conf prevents options from local mrjob.conf from polluting
|
||||
# the testing environment
|
||||
# "-" reads from standard in
|
||||
mr_job.sandbox(stdin=stdin)
|
||||
|
||||
# make_runner ensures job cleanup is performed regardless of
|
||||
# success or failure
|
||||
with mr_job.make_runner() as runner:
|
||||
runner.run()
|
||||
for line in runner.stream_output():
|
||||
key, value = mr_job.parse_output_line(line)
|
||||
yield value
|
||||
|
||||
|
||||
class TestMrS3LogParser(unittest.TestCase):
|
||||
|
||||
mr_job = None
|
||||
mr_tests_util = None
|
||||
|
||||
RAW_LOG_LINE_INVALID = \
|
||||
'00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \
|
||||
'00000388225bcc00000 ' \
|
||||
's3-storage [22/Jul/2013:21:03:27 +0000] ' \
|
||||
'00.111.222.33 ' \
|
||||
|
||||
RAW_LOG_LINE_VALID = \
|
||||
'00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \
|
||||
'00000388225bcc00000 ' \
|
||||
's3-storage [22/Jul/2013:21:03:27 +0000] ' \
|
||||
'00.111.222.33 ' \
|
||||
'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \
|
||||
'REST.HEAD.OBJECT user/file.pdf ' \
|
||||
'"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \
|
||||
'00000SDZk ' \
|
||||
'HTTP/1.1" 200 - - 4000272 18 - "-" ' \
|
||||
'"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0" ' \
|
||||
'00000XMHZJp6DjM9x5JVEAMo8MG00000'
|
||||
|
||||
DATE_TIME_ZONE_INVALID = "AB/Jul/2013:21:04:17 +0000"
|
||||
DATE_TIME_ZONE_VALID = "22/Jul/2013:21:04:17 +0000"
|
||||
DATE_VALID = "2013-07-22"
|
||||
DATE_TIME_VALID = "2013-07-22 21:04:17"
|
||||
TIME_ZONE_VALID = "+0000"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TestMrS3LogParser, self).__init__(*args, **kwargs)
|
||||
self.mr_job = MrS3LogParser(['-r', 'inline', '--no-conf', '-'])
|
||||
self.mr_tests_util = MrTestsUtil()
|
||||
|
||||
def test_invalid_log_lines(self):
|
||||
stdin = StringIO(self.RAW_LOG_LINE_INVALID)
|
||||
|
||||
for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):
|
||||
self.assertEqual(result.find("Error"), 0)
|
||||
|
||||
def test_valid_log_lines(self):
|
||||
stdin = StringIO(self.RAW_LOG_LINE_VALID)
|
||||
|
||||
for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):
|
||||
self.assertEqual(result.find("Error"), -1)
|
||||
|
||||
def test_clean_date_time_zone(self):
|
||||
date, date_time, time_zone_parsed = \
|
||||
self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID)
|
||||
self.assertEqual(date, self.DATE_VALID)
|
||||
self.assertEqual(date_time, self.DATE_TIME_VALID)
|
||||
self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID)
|
||||
|
||||
# Use a lambda to delay the calling of clean_date_time_zone so that
|
||||
# assertRaises has enough time to handle it properly
|
||||
self.assertRaises(ValueError,
|
||||
lambda: self.mr_job.clean_date_time_zone(
|
||||
self.DATE_TIME_ZONE_INVALID))
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user