Added unit test for sample mrjob mapper and reducer to parse logs on s3.

This commit is contained in:
Donne Martin 2015-04-05 08:14:53 -04:00
parent 8d1d56fc22
commit 818cf705c4

View File

@ -1,7 +1,7 @@
{
"metadata": {
"name": "",
"signature": "sha256:0cdb20316206ad6c3f940e94e66a032ed0c4eb65efde6e22ae63148c58f75547"
"signature": "sha256:71af98f7155af43ed0d04aeee32dfc65b677bd43e2000e52770d010c81b0095a"
},
"nbformat": 3,
"nbformat_minor": 0,
@ -347,7 +347,14 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"<h2 id=\"mrjob\">mrjob</h2>"
"<h2 id=\"mrjob\">mrjob</h2>\n",
"\n",
"[mrjob](https://pythonhosted.org/mrjob/) lets you write MapReduce jobs in Python 2.5+ and run them on several platforms. You can:\n",
"\n",
"* Write multi-step MapReduce jobs in pure Python\n",
"* Test on your local machine\n",
"* Run on a Hadoop cluster\n",
"* Run in the cloud using Amazon Elastic MapReduce (EMR)"
]
},
{
@ -538,6 +545,128 @@
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Accompanying unit test:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%file test_mr_s3_log_parser.py\n",
"\n",
"from StringIO import StringIO\n",
"import unittest2 as unittest\n",
"from mr_s3_log_parser import MrS3LogParser\n",
"\n",
"\n",
"class MrTestsUtil:\n",
"\n",
" def run_mr_sandbox(self, mr_job, stdin):\n",
" # inline runs the job in the same process so small jobs tend to\n",
" # run faster and stack traces are simpler\n",
" # --no-conf prevents options from local mrjob.conf from polluting\n",
" # the testing environment\n",
" # \"-\" reads from standard in\n",
" mr_job.sandbox(stdin=stdin)\n",
"\n",
" # make_runner ensures job cleanup is performed regardless of\n",
" # success or failure\n",
" with mr_job.make_runner() as runner:\n",
" runner.run()\n",
" for line in runner.stream_output():\n",
" key, value = mr_job.parse_output_line(line)\n",
" yield value\n",
"\n",
" \n",
"class TestMrS3LogParser(unittest.TestCase):\n",
"\n",
" mr_job = None\n",
" mr_tests_util = None\n",
"\n",
" RAW_LOG_LINE_INVALID = \\\n",
" '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \\\n",
" '00000388225bcc00000 ' \\\n",
" 's3-storage [22/Jul/2013:21:03:27 +0000] ' \\\n",
" '00.111.222.33 ' \\\n",
"\n",
" RAW_LOG_LINE_VALID = \\\n",
" '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \\\n",
" '00000388225bcc00000 ' \\\n",
" 's3-storage [22/Jul/2013:21:03:27 +0000] ' \\\n",
" '00.111.222.33 ' \\\n",
" 'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \\\n",
" 'REST.HEAD.OBJECT user/file.pdf ' \\\n",
" '\"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \\\n",
" '00000SDZk ' \\\n",
" 'HTTP/1.1\" 200 - - 4000272 18 - \"-\" ' \\\n",
" '\"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0\" ' \\\n",
" '00000XMHZJp6DjM9x5JVEAMo8MG00000'\n",
"\n",
" DATE_TIME_ZONE_INVALID = \"AB/Jul/2013:21:04:17 +0000\"\n",
" DATE_TIME_ZONE_VALID = \"22/Jul/2013:21:04:17 +0000\"\n",
" DATE_VALID = \"2013-07-22\"\n",
" DATE_TIME_VALID = \"2013-07-22 21:04:17\"\n",
" TIME_ZONE_VALID = \"+0000\"\n",
"\n",
" def __init__(self, *args, **kwargs):\n",
" super(TestMrS3LogParser, self).__init__(*args, **kwargs)\n",
" self.mr_job = MrS3LogParser(['-r', 'inline', '--no-conf', '-'])\n",
" self.mr_tests_util = MrTestsUtil()\n",
"\n",
" def test_invalid_log_lines(self):\n",
" stdin = StringIO(self.RAW_LOG_LINE_INVALID)\n",
"\n",
" for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):\n",
" self.assertEqual(result.find(\"Error\"), 0)\n",
"\n",
" def test_valid_log_lines(self):\n",
" stdin = StringIO(self.RAW_LOG_LINE_VALID)\n",
"\n",
" for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):\n",
" self.assertEqual(result.find(\"Error\"), -1)\n",
"\n",
" def test_clean_date_time_zone(self):\n",
" date, date_time, time_zone_parsed = \\\n",
" self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID)\n",
" self.assertEqual(date, self.DATE_VALID)\n",
" self.assertEqual(date_time, self.DATE_TIME_VALID)\n",
" self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID)\n",
"\n",
" # Use a lambda to delay the calling of clean_date_time_zone so that\n",
" # assertRaises has enough time to handle it properly\n",
" self.assertRaises(ValueError,\n",
" lambda: self.mr_job.clean_date_time_zone(\n",
" self.DATE_TIME_ZONE_INVALID))\n",
"\n",
"if __name__ == '__main__':\n",
" unittest.main()\n",
"\n"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the mrjob test:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"!python test_mr_s3_log_parser.py -v"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},