2015-04-16 03:18:15 +08:00
{
" cells " : [
2015-06-19 09:07:36 +08:00
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
2015-11-01 19:44:00 +08:00
" This notebook was prepared by [Donne Martin](http://donnemartin.com). Source and license info is on [GitHub](https://github.com/donnemartin/data-science-ipython-notebooks). "
2015-06-19 09:07:36 +08:00
]
} ,
2015-04-16 03:18:15 +08:00
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
2015-05-31 17:38:04 +08:00
" # Python Hadoop MapReduce: Analyzing AWS S3 Bucket Logs with mrjob \n " ,
" \n " ,
" * [Introduction](#Introduction) \n " ,
" * [Setup](#Setup) \n " ,
" * [Processing S3 Logs](#Processing-S3-Logs) \n " ,
" * [Running Amazon Elastic MapReduce Jobs](#Running-Amazon-Elastic-MapReduce-Jobs) \n " ,
" * [Unit Testing S3 Logs](#Unit-Testing-S3-Logs) \n " ,
" * [Running S3 Logs Unit Test](#Running-S3-Logs-Unit-Test) \n " ,
" * [Sample Config File](#Sample-Config-File) "
2015-04-16 03:18:15 +08:00
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" ## Introduction \n " ,
" \n " ,
" [mrjob](https://pythonhosted.org/mrjob/) lets you write MapReduce jobs in Python 2.5+ and run them on several platforms. You can: \n " ,
" \n " ,
" * Write multi-step MapReduce jobs in pure Python \n " ,
" * Test on your local machine \n " ,
" * Run on a Hadoop cluster \n " ,
" * Run in the cloud using Amazon Elastic MapReduce (EMR) "
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" ## Setup \n " ,
" \n " ,
" From PyPI: \n " ,
" \n " ,
" ``pip install mrjob`` \n " ,
" \n " ,
" From source: \n " ,
" \n " ,
" ``python setup.py install`` \n " ,
" \n " ,
2015-05-31 17:38:04 +08:00
" See [Sample Config File](#Sample-Config-File) section for additional config details. "
2015-04-16 03:18:15 +08:00
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" ## Processing S3 Logs \n " ,
" \n " ,
" Sample mrjob code that processes log files on Amazon S3 based on the [S3 logging format](http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html): "
]
} ,
{
" cell_type " : " code " ,
" execution_count " : null ,
" metadata " : {
" collapsed " : false
} ,
" outputs " : [ ] ,
" source " : [
" %% file mr_s3_log_parser.py \n " ,
" \n " ,
" import time \n " ,
" from mrjob.job import MRJob \n " ,
" from mrjob.protocol import RawValueProtocol, ReprProtocol \n " ,
" import re \n " ,
" \n " ,
" \n " ,
" class MrS3LogParser(MRJob): \n " ,
" \" \" \" Parses the logs from S3 based on the S3 logging format: \n " ,
" http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html \n " ,
" \n " ,
" Aggregates a user ' s daily requests by user agent and operation \n " ,
" \n " ,
" Outputs date_time, requester, user_agent, operation, count \n " ,
" \" \" \" \n " ,
" \n " ,
" LOGPATS = r ' ( \\ S+) ( \\ S+) \\ [(.*?) \\ ] ( \\ S+) ( \\ S+) ' \\ \n " ,
" r ' ( \\ S+) ( \\ S+) ( \\ S+) ( \" ([^ \" ]+) \" |-) ' \\ \n " ,
" r ' ( \\ S+) ( \\ S+) ( \\ S+) ( \\ S+) ( \\ S+) ( \\ S+) ' \\ \n " ,
" r ' ( \" ([^ \" ]+) \" |-) ( \" ([^ \" ]+) \" |-) ' \n " ,
" NUM_ENTRIES_PER_LINE = 17 \n " ,
" logpat = re.compile(LOGPATS) \n " ,
" \n " ,
" (S3_LOG_BUCKET_OWNER, \n " ,
" S3_LOG_BUCKET, \n " ,
" S3_LOG_DATE_TIME, \n " ,
" S3_LOG_IP, \n " ,
" S3_LOG_REQUESTER_ID, \n " ,
" S3_LOG_REQUEST_ID, \n " ,
" S3_LOG_OPERATION, \n " ,
" S3_LOG_KEY, \n " ,
" S3_LOG_HTTP_METHOD, \n " ,
" S3_LOG_HTTP_STATUS, \n " ,
" S3_LOG_S3_ERROR, \n " ,
" S3_LOG_BYTES_SENT, \n " ,
" S3_LOG_OBJECT_SIZE, \n " ,
" S3_LOG_TOTAL_TIME, \n " ,
" S3_LOG_TURN_AROUND_TIME, \n " ,
" S3_LOG_REFERER, \n " ,
" S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE) \n " ,
" \n " ,
" DELIMITER = ' \\ t ' \n " ,
" \n " ,
" # We use RawValueProtocol for input to be format agnostic \n " ,
" # and avoid any type of parsing errors \n " ,
" INPUT_PROTOCOL = RawValueProtocol \n " ,
" \n " ,
" # We use RawValueProtocol for output so we can output raw lines \n " ,
" # instead of (k, v) pairs \n " ,
" OUTPUT_PROTOCOL = RawValueProtocol \n " ,
" \n " ,
" # Encode the intermediate records using repr() instead of JSON, so the \n " ,
" # record doesn ' t get Unicode-encoded \n " ,
" INTERNAL_PROTOCOL = ReprProtocol \n " ,
" \n " ,
" def clean_date_time_zone(self, raw_date_time_zone): \n " ,
" \" \" \" Converts entry 22/Jul/2013:21:04:17 +0000 to the format \n " ,
" ' YYYY-MM-DD HH:MM:SS ' which is more suitable for loading into \n " ,
" a database such as Redshift or RDS \n " ,
" \n " ,
" Note: requires the chars \" [ ] \" to be stripped prior to input \n " ,
" Returns the converted datetime annd timezone \n " ,
" or None for both values if failed \n " ,
" \n " ,
" TODO: Needs to combine timezone with date as one field \n " ,
" \" \" \" \n " ,
" date_time = None \n " ,
" time_zone_parsed = None \n " ,
" \n " ,
" # TODO: Probably cleaner to parse this with a regex \n " ,
" date_parsed = raw_date_time_zone[:raw_date_time_zone.find( \" : \" )] \n " ,
" time_parsed = raw_date_time_zone[raw_date_time_zone.find( \" : \" ) + 1: \n " ,
" raw_date_time_zone.find( \" + \" ) - 1] \n " ,
" time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find( \" + \" ):] \n " ,
" \n " ,
" try: \n " ,
" date_struct = time.strptime(date_parsed, \" %d / % b/ % Y \" ) \n " ,
" converted_date = time.strftime( \" % Y- % m- %d \" , date_struct) \n " ,
" date_time = converted_date + \" \" + time_parsed \n " ,
" \n " ,
" # Throws a ValueError exception if the operation fails that is \n " ,
" # caught by the calling function and is handled appropriately \n " ,
" except ValueError as error: \n " ,
" raise ValueError(error) \n " ,
" else: \n " ,
" return converted_date, date_time, time_zone_parsed \n " ,
" \n " ,
" def mapper(self, _, line): \n " ,
" line = line.strip() \n " ,
" match = self.logpat.search(line) \n " ,
" \n " ,
" date_time = None \n " ,
" requester = None \n " ,
" user_agent = None \n " ,
" operation = None \n " ,
" \n " ,
" try: \n " ,
" for n in range(self.NUM_ENTRIES_PER_LINE): \n " ,
" group = match.group(1 + n) \n " ,
" \n " ,
" if n == self.S3_LOG_DATE_TIME: \n " ,
" date, date_time, time_zone_parsed = \\ \n " ,
" self.clean_date_time_zone(group) \n " ,
" # Leave the following line of code if \n " ,
" # you want to aggregate by date \n " ,
" date_time = date + \" 00:00:00 \" \n " ,
" elif n == self.S3_LOG_REQUESTER_ID: \n " ,
" requester = group \n " ,
" elif n == self.S3_LOG_USER_AGENT: \n " ,
" user_agent = group \n " ,
" elif n == self.S3_LOG_OPERATION: \n " ,
" operation = group \n " ,
" else: \n " ,
" pass \n " ,
" \n " ,
" except Exception: \n " ,
" yield (( \" Error while parsing line: %s \" , line), 1) \n " ,
" else: \n " ,
" yield ((date_time, requester, user_agent, operation), 1) \n " ,
" \n " ,
" def reducer(self, key, values): \n " ,
" output = list(key) \n " ,
" output = self.DELIMITER.join(output) + \\ \n " ,
" self.DELIMITER + \\ \n " ,
" str(sum(values)) \n " ,
" \n " ,
" yield None, output \n " ,
" \n " ,
" def steps(self): \n " ,
" return [ \n " ,
" self.mr(mapper=self.mapper, \n " ,
" reducer=self.reducer) \n " ,
" ] \n " ,
" \n " ,
" \n " ,
" if __name__ == ' __main__ ' : \n " ,
" MrS3LogParser.run() "
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
2015-05-31 17:38:04 +08:00
" ## Running Amazon Elastic MapReduce Jobs "
2015-04-16 03:18:15 +08:00
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
2015-05-31 17:38:04 +08:00
" Run an Amazon Elastic MapReduce (EMR) job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist): "
2015-04-16 03:18:15 +08:00
]
} ,
{
" cell_type " : " code " ,
" execution_count " : null ,
" metadata " : {
" collapsed " : false
} ,
" outputs " : [ ] ,
" source " : [
2015-08-01 06:44:42 +08:00
" !python mr_s3_log_parser.py -r emr s3://bucket-source/ --output-dir=s3://bucket-dest/ "
2015-04-16 03:18:15 +08:00
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" Run a MapReduce job locally on the specified input file, sending the results to the specified output file: "
]
} ,
{
" cell_type " : " code " ,
" execution_count " : null ,
" metadata " : {
" collapsed " : false
} ,
" outputs " : [ ] ,
" source " : [
" !python mr_s3_log_parser.py input_data.txt > output_data.txt "
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" ## Unit Testing S3 Logs "
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" Accompanying unit test: "
]
} ,
{
" cell_type " : " code " ,
" execution_count " : null ,
" metadata " : {
" collapsed " : false
} ,
" outputs " : [ ] ,
" source " : [
" %% file test_mr_s3_log_parser.py \n " ,
" \n " ,
" from StringIO import StringIO \n " ,
" import unittest2 as unittest \n " ,
" from mr_s3_log_parser import MrS3LogParser \n " ,
" \n " ,
" \n " ,
" class MrTestsUtil: \n " ,
" \n " ,
" def run_mr_sandbox(self, mr_job, stdin): \n " ,
" # inline runs the job in the same process so small jobs tend to \n " ,
" # run faster and stack traces are simpler \n " ,
" # --no-conf prevents options from local mrjob.conf from polluting \n " ,
" # the testing environment \n " ,
" # \" - \" reads from standard in \n " ,
" mr_job.sandbox(stdin=stdin) \n " ,
" \n " ,
" # make_runner ensures job cleanup is performed regardless of \n " ,
" # success or failure \n " ,
" with mr_job.make_runner() as runner: \n " ,
" runner.run() \n " ,
" for line in runner.stream_output(): \n " ,
" key, value = mr_job.parse_output_line(line) \n " ,
" yield value \n " ,
" \n " ,
" \n " ,
" class TestMrS3LogParser(unittest.TestCase): \n " ,
" \n " ,
" mr_job = None \n " ,
" mr_tests_util = None \n " ,
" \n " ,
" RAW_LOG_LINE_INVALID = \\ \n " ,
" ' 00000fe9688b6e57f75bd2b7f7c1610689e8f01000000 ' \\ \n " ,
" ' 00000388225bcc00000 ' \\ \n " ,
" ' s3-storage [22/Jul/2013:21:03:27 +0000] ' \\ \n " ,
" ' 00.111.222.33 ' \\ \n " ,
" \n " ,
" RAW_LOG_LINE_VALID = \\ \n " ,
" ' 00000fe9688b6e57f75bd2b7f7c1610689e8f01000000 ' \\ \n " ,
" ' 00000388225bcc00000 ' \\ \n " ,
" ' s3-storage [22/Jul/2013:21:03:27 +0000] ' \\ \n " ,
" ' 00.111.222.33 ' \\ \n " ,
" ' arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \\ \n " ,
" ' REST.HEAD.OBJECT user/file.pdf ' \\ \n " ,
" ' \" HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000 ' \\ \n " ,
" ' 00000SDZk ' \\ \n " ,
" ' HTTP/1.1 \" 200 - - 4000272 18 - \" - \" ' \\ \n " ,
" ' \" Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0 \" ' \\ \n " ,
" ' 00000XMHZJp6DjM9x5JVEAMo8MG00000 ' \n " ,
" \n " ,
" DATE_TIME_ZONE_INVALID = \" AB/Jul/2013:21:04:17 +0000 \" \n " ,
" DATE_TIME_ZONE_VALID = \" 22/Jul/2013:21:04:17 +0000 \" \n " ,
" DATE_VALID = \" 2013-07-22 \" \n " ,
" DATE_TIME_VALID = \" 2013-07-22 21:04:17 \" \n " ,
" TIME_ZONE_VALID = \" +0000 \" \n " ,
" \n " ,
" def __init__(self, *args, **kwargs): \n " ,
" super(TestMrS3LogParser, self).__init__(*args, **kwargs) \n " ,
" self.mr_job = MrS3LogParser([ ' -r ' , ' inline ' , ' --no-conf ' , ' - ' ]) \n " ,
" self.mr_tests_util = MrTestsUtil() \n " ,
" \n " ,
" def test_invalid_log_lines(self): \n " ,
" stdin = StringIO(self.RAW_LOG_LINE_INVALID) \n " ,
" \n " ,
" for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin): \n " ,
" self.assertEqual(result.find( \" Error \" ), 0) \n " ,
" \n " ,
" def test_valid_log_lines(self): \n " ,
" stdin = StringIO(self.RAW_LOG_LINE_VALID) \n " ,
" \n " ,
" for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin): \n " ,
" self.assertEqual(result.find( \" Error \" ), -1) \n " ,
" \n " ,
" def test_clean_date_time_zone(self): \n " ,
" date, date_time, time_zone_parsed = \\ \n " ,
" self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID) \n " ,
" self.assertEqual(date, self.DATE_VALID) \n " ,
" self.assertEqual(date_time, self.DATE_TIME_VALID) \n " ,
" self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID) \n " ,
" \n " ,
" # Use a lambda to delay the calling of clean_date_time_zone so that \n " ,
" # assertRaises has enough time to handle it properly \n " ,
" self.assertRaises(ValueError, \n " ,
" lambda: self.mr_job.clean_date_time_zone( \n " ,
" self.DATE_TIME_ZONE_INVALID)) \n " ,
" \n " ,
" if __name__ == ' __main__ ' : \n " ,
" unittest.main() \n " ,
" \n "
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" ## Running S3 Logs Unit Test "
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
" Run the mrjob test: "
]
} ,
{
" cell_type " : " code " ,
" execution_count " : null ,
" metadata " : {
" collapsed " : false
} ,
" outputs " : [ ] ,
" source " : [
" !python test_mr_s3_log_parser.py -v "
]
} ,
{
" cell_type " : " markdown " ,
" metadata " : { } ,
" source " : [
2015-05-31 17:38:04 +08:00
" ## Sample Config File "
2015-04-16 03:18:15 +08:00
]
} ,
{
" cell_type " : " code " ,
" execution_count " : null ,
" metadata " : {
" collapsed " : false
} ,
" outputs " : [ ] ,
" source " : [
" runners: \n " ,
" emr: \n " ,
" aws_access_key_id: __ACCESS_KEY__ \n " ,
" aws_secret_access_key: __SECRET_ACCESS_KEY__ \n " ,
" aws_region: us-east-1 \n " ,
" ec2_key_pair: EMR \n " ,
" ec2_key_pair_file: ~/.ssh/EMR.pem \n " ,
" ssh_tunnel_to_job_tracker: true \n " ,
" ec2_master_instance_type: m3.xlarge \n " ,
" ec2_instance_type: m3.xlarge \n " ,
" num_ec2_instances: 5 \n " ,
" s3_scratch_uri: s3://bucket/tmp/ \n " ,
" s3_log_uri: s3://bucket/tmp/logs/ \n " ,
" enable_emr_debugging: True \n " ,
" bootstrap: \n " ,
" - sudo apt-get install -y python-pip \n " ,
" - sudo pip install --upgrade simplejson "
]
}
] ,
" metadata " : {
" kernelspec " : {
" display_name " : " Python 2 " ,
" language " : " python " ,
" name " : " python2 "
} ,
" language_info " : {
" codemirror_mode " : {
" name " : " ipython " ,
" version " : 2
} ,
" file_extension " : " .py " ,
" mimetype " : " text/x-python " ,
" name " : " python " ,
" nbconvert_exporter " : " python " ,
" pygments_lexer " : " ipython2 " ,
2015-06-19 09:07:36 +08:00
" version " : " 2.7.10 "
2015-04-16 03:18:15 +08:00
}
} ,
" nbformat " : 4 ,
" nbformat_minor " : 0
}