from __future__ import absolute_import, unicode_literals, print_function
import itertools
import unittest
import mock
import responses
from six import StringIO
from tests.manager_tests import mock_utils
from manager.job import TxJob
from manager.manager import TxManager
from manager.module import TxModule
[docs]class ManagerTest(unittest.TestCase):
MOCK_API_URL = "https://api.example.com"
MOCK_CALLBACK_URL = "https://callback.example.com/"
MOCK_GOGS_URL = "https://mock.gogs.io"
mock_job_db = None
mock_module_db = None
mock_db = None
mock_gogs = None
mock_lambda = None
patches = []
@classmethod
[docs] def setUpClass(cls):
"""
Create mock AWS handlers, and apply corresponding monkey patches
"""
cls.mock_job_db = mock_utils.mock_db_handler(data={
0: {
"job_id": 0,
"status": "started",
"resource_type": "obs",
"input_format": "markdown",
"output_format": "html"
},
1: {
"job_id": 1,
"status": "requested",
"resource_type": "obs",
"input_format": "markdown",
"output_format": "html"
},
2: {
"job_id": 2,
"status": "requested",
"resource_type": "ulb",
"input_format": "usfm",
"output_format": "html",
"callback": ManagerTest.MOCK_CALLBACK_URL,
},
3: {
"job_id": 3,
"status": "requested",
"resource_type": "other",
"input_format": "markdown",
"output_format": "html"
},
4: {
"job_id": 4,
"status": "requested",
"resource_type": "unsupported",
"input_format": "markdown",
"output_format": "html"
}
}, keyname="job_id")
cls.mock_module_db = mock_utils.mock_db_handler(data={
"module1": {
"name": "module1",
"resource_types": ["obs", "ulb"],
"input_format": "markdown",
"output_format": "html"
},
"module2": {
"name": "module2",
"resource_types": ["ulb"],
"input_format": "usfm",
"output_format": "html"
},
"module3": {
"name": "module3",
"resource_types": ["other", "yet_another"],
"input_format": "markdown",
"output_format": "html"
}
}, keyname="name")
cls.mock_db = mock.MagicMock(
side_effect=itertools.cycle([cls.mock_job_db, cls.mock_module_db]))
cls.mock_gogs = mock.MagicMock(
return_value=mock_utils.mock_gogs_handler(["token1", "token2"]))
cls.mock_lambda = mock.MagicMock(
return_value=mock_utils.mock_lambda_handler(["module2"], ["module1"]))
ManagerTest.patches = (
mock.patch("manager.manager.DynamoDBHandler", cls.mock_db),
mock.patch("manager.manager.GogsHandler", cls.mock_gogs),
mock.patch("manager.manager.LambdaHandler", cls.mock_lambda)
)
for patch in ManagerTest.patches:
patch.start()
[docs] def setUp(self):
ManagerTest.mock_job_db.reset_mock()
ManagerTest.mock_module_db.reset_mock()
ManagerTest.mock_db.reset_mock()
ManagerTest.mock_gogs.reset_mock()
ManagerTest.mock_lambda.reset_mock()
@classmethod
[docs] def tearDownClass(cls):
for patch in ManagerTest.patches:
patch.stop()
[docs] def test_setup_job(self):
"""
Successful call of setup_job
"""
tx_manager = TxManager(gogs_url=self.MOCK_GOGS_URL)
data = {
"gogs_user_token": "token1",
"cdn_bucket": "test_cdn_bucket",
"source": "test_source",
"resource_type": "obs",
"input_format": "markdown",
"output_format": "html"
}
tx_manager.setup_job(data)
# assert an entry was aded to job database
args, kwargs = self.call_args(ManagerTest.mock_job_db.insert_item, num_args=1)
arg = args[0]
self.assertIsInstance(arg, dict)
self.assertEqual(arg["convert_module"], "module1")
self.assertEqual(arg["resource_type"], "obs")
self.assertEqual(arg["cdn_bucket"], "test_cdn_bucket")
[docs] def test_setup_job_no_converter(self):
"""
Call setup_job when there is no applicable converter.
"""
tx_manager = TxManager()
data = {
"gogs_user_token": "token1",
"cdn_bucket": "test_cdn_bucket",
"source": "test_source",
"resource_type": "unrecognized_resource_type",
"input_format": "markdown",
"output_format": "html"
}
self.assertRaises(Exception, tx_manager.setup_job, data)
[docs] def test_start_job1(self):
"""
Call start job in job 1 from mock data. Should be a successful
invocation with warnings.
"""
tx_manager = TxManager()
tx_manager.start_job(1)
# assert that correct lambda function was invoked
args, kwargs = self.call_args(ManagerTest.mock_lambda().invoke, num_args=2)
module_name = args[0]
self.assertEqual(module_name, "module1")
payload = args[1]
self.assertIsInstance(payload, dict)
self.assertIn("data", payload)
self.assertIn("job", payload["data"])
# job1's entry in database should have been updated
args, kwargs = self.call_args(ManagerTest.mock_job_db.update_item, num_args=2)
keys = args[0]
self.assertIsInstance(keys, dict)
self.assertIn("job_id", keys)
self.assertEqual(keys["job_id"], 1)
data = args[1]
self.assertIsInstance(data, dict)
self.assertIn("errors", data)
self.assertEqual(len(data["errors"]), 0)
self.assertIn("warnings", data)
self.assertTrue(len(data["warnings"]) > 0)
@responses.activate
[docs] def test_start_job2(self):
"""
Call start_job in job 2 from mock data. Should be a successful
invocation without warnings.
"""
# mock out job 2's callback
responses.add(responses.POST, ManagerTest.MOCK_CALLBACK_URL)
tx_manager = TxManager()
tx_manager.start_job(2)
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, ManagerTest.MOCK_CALLBACK_URL)
# assert that correct lambda function was invoked
args, kwargs = self.call_args(ManagerTest.mock_lambda().invoke, num_args=2)
module_name = args[0]
self.assertEqual(module_name, "module2")
payload = args[1]
self.assertIsInstance(payload, dict)
self.assertIn("data", payload)
self.assertIn("job", payload["data"])
# job2's entry in database should have been updated
ManagerTest.mock_job_db.update_item.assert_called()
args, kwargs = self.call_args(ManagerTest.mock_job_db.update_item, num_args=2)
keys = args[0]
self.assertIsInstance(keys, dict)
self.assertIn("job_id", keys)
self.assertEqual(keys["job_id"], 2)
data = args[1]
self.assertIsInstance(data, dict)
self.assertIn("errors", data)
self.assertEqual(len(data["errors"]), 0)
[docs] def test_start_job3(self):
"""
Call start_job on job 3 from mock data. Invocation should result in an error
"""
manager = TxManager()
manager.start_job(3)
ManagerTest.mock_lambda().invoke.assert_called()
# assert that correct lambda function was invoked
args, kwargs = self.call_args(ManagerTest.mock_lambda().invoke, num_args=2)
module_name = args[0]
self.assertEqual(module_name, "module3")
payload = args[1]
self.assertIsInstance(payload, dict)
self.assertIn("data", payload)
self.assertIn("job", payload["data"])
# job3's entry in database should have been updated
ManagerTest.mock_job_db.update_item.assert_called()
args, kwargs = self.call_args(ManagerTest.mock_job_db.update_item, num_args=2)
keys = args[0]
self.assertIn("job_id", keys)
self.assertEqual(keys["job_id"], 3)
self.assertIsInstance(keys, dict)
data = args[1]
self.assertIsInstance(data, dict)
self.assertIn("errors", data)
self.assertTrue(len(data["errors"]) > 0)
[docs] def test_start_job_failure(self):
"""
Call start_job with non-runnable/non-existent jobs
"""
tx_manager = TxManager()
tx_manager.start_job(0)
tx_manager.start_job(4)
tx_manager.start_job(5)
# no lambda function should have been invoked
ManagerTest.mock_lambda().invoke.assert_not_called()
# last existent job (4) should be updated in database to include error
# messages
args, kwargs = self.call_args(ManagerTest.mock_job_db.update_item, num_args=2)
keys = args[0]
self.assertIsInstance(keys, dict)
self.assertIn("job_id", keys)
self.assertEqual(keys["job_id"], 4)
data = args[1]
self.assertIsInstance(data, dict)
self.assertIn("job_id", data)
self.assertEqual(data["job_id"], 4)
self.assertIn("errors", data)
self.assertTrue(len(data["errors"]) > 0)
[docs] def test_list(self):
"""
Test list_jobs and list_endpoint methods
"""
tx_manager = TxManager(api_url=self.MOCK_API_URL, gogs_url=self.MOCK_GOGS_URL)
jobs = tx_manager.list_jobs({"gogs_user_token": "token2"}, True)
expected = [TxJob(job).get_db_data()
for job in ManagerTest.mock_job_db.mock_data.values()]
self.assertEqual(jobs, expected)
self.assertRaises(Exception, tx_manager.list_jobs, {"bad_key": "token1"})
self.assertRaises(Exception, tx_manager.list_jobs, {"gogs_user_token": "bad_token"})
endpoints = tx_manager.list_endpoints()
self.assertIsInstance(endpoints, dict)
self.assertIn("version", endpoints)
self.assertEqual(endpoints["version"], "1")
self.assertIn("links", endpoints)
self.assertIsInstance(endpoints["links"], list)
for link_data in endpoints["links"]:
self.assertIsInstance(link_data, dict)
self.assertIn("href", link_data)
self.assertEqual(self.MOCK_API_URL + "/tx/job", link_data["href"])
self.assertIn("rel", link_data)
self.assertIsInstance(link_data["rel"], unicode)
self.assertIn("method", link_data)
self.assertIn(link_data["method"],
["GET", "POST", "PUT", "PATCH", "DELETE"])
[docs] def test_register_module(self):
manager = TxManager()
data = {
"name": "module1",
"type": "conversion",
"resource_types": ["obs"],
"input_format": "markdown",
"output_format": "html"
}
manager.register_module(data)
ManagerTest.mock_module_db.insert_item.assert_called()
args, kwargs = self.call_args(ManagerTest.mock_module_db.insert_item, num_args=1)
self.assertEqual(args[0], TxModule(data).get_db_data())
for key in data:
# should raise an expection if data is missing a required field
missing = data.copy()
del missing[key]
self.assertRaises(Exception, manager.register_module, missing)
[docs] def test_debug_print(self):
for quiet in (True, False):
manager = TxManager(quiet=quiet)
mock_stdout = StringIO()
with mock.patch("sys.stdout", mock_stdout):
manager.debug_print("Hello world")
if quiet:
self.assertEqual(mock_stdout.getvalue(), "")
else:
self.assertEqual(mock_stdout.getvalue(), "Hello world\n")
[docs] def test_get_update_delete_job(self):
"""
Test [get/update/delete]_job methods
"""
manager = TxManager()
# get_job
job = manager.get_job(0)
self.assertIsInstance(job, TxJob)
self.assertEqual(job.job_id, 0)
self.assertEqual(job.status, "started")
# update_job
manager.update_job(job)
args, kwargs = self.call_args(ManagerTest.mock_job_db.update_item,
num_args=2)
self.assertEqual(args[0], {"job_id": 0})
self.assertEqual(args[1], job.get_db_data())
# delete_job
manager.delete_job(job)
args, kwargs = self.call_args(ManagerTest.mock_job_db.delete_item,
num_args=1)
self.assertEqual(args[0], {"job_id": 0})
[docs] def test_get_update_delete_module(self):
"""
Test [get/update/delete]_module methods
"""
manager = TxManager()
# get_module
module = manager.get_module("module1")
self.assertIsInstance(module, TxModule)
self.assertEqual(module.name, "module1")
self.assertEqual(module.input_format, "markdown")
# update_module
manager.update_module(module)
args, kwargs = self.call_args(ManagerTest.mock_module_db.update_item,
num_args=2)
self.assertEqual(args[0], {"name": "module1"})
self.assertEqual(args[1], module.get_db_data())
# delete_module
manager.delete_module(module)
args, kwargs = self.call_args(ManagerTest.mock_module_db.delete_item,
num_args=1)
self.assertEqual(args[0], {"name": "module1"})
# helper methods #
[docs] def call_args(self, mock_object, num_args, num_kwargs=0):
"""
:param mock_object: mock object that is expected to have been called
:param num_args: expected number of (non-keyword) arguments
:param num_kwargs: expected number of keyword arguments
:return: (args, kwargs) of last invocation of mock_object
"""
mock_object.assert_called()
args, kwargs = mock_object.call_args
self.assertEqual(len(args), num_args)
self.assertEqual(len(kwargs), num_kwargs)
return args, kwargs
if __name__ == "__main__":
unittest.main()