mirror of
https://github.com/donnemartin/data-science-ipython-notebooks.git
synced 2024-03-22 13:30:56 +08:00
Added sample mrjob mapper and reducer to parse logs on s3 following the standard bucket logging format.
This commit is contained in:
parent
d4ab154643
commit
1403cf4134
158
aws/aws.ipynb
158
aws/aws.ipynb
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"metadata": {
|
||||
"name": "",
|
||||
"signature": "sha256:760f0227418945ff60ae747898cfec6f9614a279e133bfb3ef96560860b3ce0d"
|
||||
"signature": "sha256:0cdb20316206ad6c3f940e94e66a032ed0c4eb65efde6e22ae63148c58f75547"
|
||||
},
|
||||
"nbformat": 3,
|
||||
"nbformat_minor": 0,
|
||||
|
@ -354,7 +354,7 @@
|
|||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Run a MapReduce job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):"
|
||||
"Run an Amazon EMR job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -384,6 +384,160 @@
|
|||
"metadata": {},
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Sample mrjob code that processes log files on Amazon S3 based on the [S3 logging format](http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html):"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"collapsed": false,
|
||||
"input": [
|
||||
"%%file mr_s3_log_parser.py\n",
|
||||
"\n",
|
||||
"import time\n",
|
||||
"from mrjob.job import MRJob\n",
|
||||
"from mrjob.protocol import RawValueProtocol, ReprProtocol\n",
|
||||
"import re\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"class MrS3LogParser(MRJob):\n",
|
||||
" \"\"\"Parses the logs from S3 based on the S3 logging format:\n",
|
||||
" http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html\n",
|
||||
" \n",
|
||||
" Aggregates a user's daily requests by user agent and operation\n",
|
||||
" \n",
|
||||
" Outputs date_time, requester, user_agent, operation, count\n",
|
||||
" \"\"\"\n",
|
||||
"\n",
|
||||
" LOGPATS = r'(\\S+) (\\S+) \\[(.*?)\\] (\\S+) (\\S+) ' \\\n",
|
||||
" r'(\\S+) (\\S+) (\\S+) (\"([^\"]+)\"|-) ' \\\n",
|
||||
" r'(\\S+) (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) ' \\\n",
|
||||
" r'(\"([^\"]+)\"|-) (\"([^\"]+)\"|-)'\n",
|
||||
" NUM_ENTRIES_PER_LINE = 17\n",
|
||||
" logpat = re.compile(LOGPATS)\n",
|
||||
"\n",
|
||||
" (S3_LOG_BUCKET_OWNER, \n",
|
||||
" S3_LOG_BUCKET, \n",
|
||||
" S3_LOG_DATE_TIME,\n",
|
||||
" S3_LOG_IP, \n",
|
||||
" S3_LOG_REQUESTER_ID, \n",
|
||||
" S3_LOG_REQUEST_ID,\n",
|
||||
" S3_LOG_OPERATION, \n",
|
||||
" S3_LOG_KEY, \n",
|
||||
" S3_LOG_HTTP_METHOD,\n",
|
||||
" S3_LOG_HTTP_STATUS, \n",
|
||||
" S3_LOG_S3_ERROR, \n",
|
||||
" S3_LOG_BYTES_SENT,\n",
|
||||
" S3_LOG_OBJECT_SIZE, \n",
|
||||
" S3_LOG_TOTAL_TIME, \n",
|
||||
" S3_LOG_TURN_AROUND_TIME,\n",
|
||||
" S3_LOG_REFERER, \n",
|
||||
" S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)\n",
|
||||
"\n",
|
||||
" DELIMITER = '\\t'\n",
|
||||
"\n",
|
||||
" # We use RawValueProtocol for input to be format agnostic\n",
|
||||
" # and avoid any type of parsing errors\n",
|
||||
" INPUT_PROTOCOL = RawValueProtocol\n",
|
||||
"\n",
|
||||
" # We use RawValueProtocol for output so we can output raw lines\n",
|
||||
" # instead of (k, v) pairs\n",
|
||||
" OUTPUT_PROTOCOL = RawValueProtocol\n",
|
||||
"\n",
|
||||
" # Encode the intermediate records using repr() instead of JSON, so the\n",
|
||||
" # record doesn't get Unicode-encoded\n",
|
||||
" INTERNAL_PROTOCOL = ReprProtocol\n",
|
||||
"\n",
|
||||
" def clean_date_time_zone(self, raw_date_time_zone):\n",
|
||||
" \"\"\"Converts entry 22/Jul/2013:21:04:17 +0000 to the format\n",
|
||||
" 'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into\n",
|
||||
" a database such as Redshift or RDS\n",
|
||||
"\n",
|
||||
" Note: requires the chars \"[ ]\" to be stripped prior to input\n",
|
||||
" Returns the converted datetime annd timezone\n",
|
||||
" or None for both values if failed\n",
|
||||
"\n",
|
||||
" TODO: Needs to combine timezone with date as one field\n",
|
||||
" \"\"\"\n",
|
||||
" date_time = None\n",
|
||||
" time_zone_parsed = None\n",
|
||||
"\n",
|
||||
" # TODO: Probably cleaner to parse this with a regex\n",
|
||||
" date_parsed = raw_date_time_zone[:raw_date_time_zone.find(\":\")]\n",
|
||||
" time_parsed = raw_date_time_zone[raw_date_time_zone.find(\":\") + 1:\n",
|
||||
" raw_date_time_zone.find(\"+\") - 1]\n",
|
||||
" time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find(\"+\"):]\n",
|
||||
"\n",
|
||||
" try:\n",
|
||||
" date_struct = time.strptime(date_parsed, \"%d/%b/%Y\")\n",
|
||||
" converted_date = time.strftime(\"%Y-%m-%d\", date_struct)\n",
|
||||
" date_time = converted_date + \" \" + time_parsed\n",
|
||||
"\n",
|
||||
" # Throws a ValueError exception if the operation fails that is\n",
|
||||
" # caught by the calling function and is handled appropriately\n",
|
||||
" except ValueError as error:\n",
|
||||
" raise ValueError(error)\n",
|
||||
" else:\n",
|
||||
" return converted_date, date_time, time_zone_parsed\n",
|
||||
"\n",
|
||||
" def mapper(self, _, line):\n",
|
||||
" line = line.strip()\n",
|
||||
" match = self.logpat.search(line)\n",
|
||||
"\n",
|
||||
" date_time = None\n",
|
||||
" requester = None\n",
|
||||
" user_agent = None\n",
|
||||
" operation = None\n",
|
||||
"\n",
|
||||
" try:\n",
|
||||
" for n in range(self.NUM_ENTRIES_PER_LINE):\n",
|
||||
" group = match.group(1 + n)\n",
|
||||
"\n",
|
||||
" if n == self.S3_LOG_DATE_TIME:\n",
|
||||
" date, date_time, time_zone_parsed = \\\n",
|
||||
" self.clean_date_time_zone(group)\n",
|
||||
" # Leave the following line of code if \n",
|
||||
" # you want to aggregate by date\n",
|
||||
" date_time = date + \" 00:00:00\"\n",
|
||||
" elif n == self.S3_LOG_REQUESTER_ID:\n",
|
||||
" requester = group\n",
|
||||
" elif n == self.S3_LOG_USER_AGENT:\n",
|
||||
" user_agent = group\n",
|
||||
" elif n == self.S3_LOG_OPERATION:\n",
|
||||
" operation = group\n",
|
||||
" else:\n",
|
||||
" pass\n",
|
||||
"\n",
|
||||
" except Exception:\n",
|
||||
" yield ((\"Error while parsing line: %s\", line), 1)\n",
|
||||
" else:\n",
|
||||
" yield ((date_time, requester, user_agent, operation), 1)\n",
|
||||
"\n",
|
||||
" def reducer(self, key, values):\n",
|
||||
" output = list(key)\n",
|
||||
" output = self.DELIMITER.join(output) + \\\n",
|
||||
" self.DELIMITER + \\\n",
|
||||
" str(sum(values))\n",
|
||||
"\n",
|
||||
" yield None, output\n",
|
||||
"\n",
|
||||
" def steps(self):\n",
|
||||
" return [\n",
|
||||
" self.mr(mapper=self.mapper,\n",
|
||||
" reducer=self.reducer)\n",
|
||||
" ]\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"if __name__ == '__main__':\n",
|
||||
" MrS3LogParser.run()"
|
||||
],
|
||||
"language": "python",
|
||||
"metadata": {},
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
|
|
Loading…
Reference in New Issue
Block a user