Source code for manager.manager

from __future__ import unicode_literals, print_function
import json
import hashlib
import requests
import logging
from datetime import datetime
from datetime import timedelta
from bs4 import BeautifulSoup
from aws_tools.dynamodb_handler import DynamoDBHandler
from gogs_tools.gogs_handler import GogsHandler
from job import TxJob
from module import TxModule


[docs]class TxManager(object): JOB_TABLE_NAME = 'tx-job' MODULE_TABLE_NAME = 'tx-module' def __init__(self, api_url=None, gogs_url=None, cdn_url=None, cdn_bucket=None, quiet=False, aws_access_key_id=None, aws_secret_access_key=None, job_table_name=None, module_table_name=None): """ :param string api_url: :param string gogs_url: :param string cdn_url: :param string cdn_bucket: :param bool quiet: :param string aws_access_key_id: :param string aws_secret_access_key: :param string job_table_name: :param string module_table_name: """ self.api_url = api_url self.gogs_url = gogs_url self.cdn_url = cdn_url self.cdn_bucket = cdn_bucket self.quiet = quiet self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.job_table_name = job_table_name self.module_table_name = module_table_name if not self.job_table_name: self.job_table_name = TxManager.JOB_TABLE_NAME if not self.module_table_name: self.module_table_name = TxManager.MODULE_TABLE_NAME self.job_db_handler = None self.module_db_handler = None self.gogs_handler = None self.logger = logging.getLogger('tx-manager') self.setup_resources()
[docs] def setup_resources(self): if self.job_table_name: self.job_db_handler = DynamoDBHandler(self.job_table_name) if self.module_table_name: self.module_db_handler = DynamoDBHandler(self.module_table_name) if self.gogs_url: self.gogs_handler = GogsHandler(self.gogs_url)
[docs] def debug_print(self, message): if not self.quiet: print(message)
[docs] def get_user(self, user_token): return self.gogs_handler.get_user(user_token)
[docs] def get_converter_module(self, job): modules = self.query_modules() for module in modules: if job.resource_type in module.resource_types: if job.input_format in module.input_format: if job.output_format in module.output_format: return module return None
[docs] def setup_job(self, data): if 'gogs_user_token' not in data: raise Exception('"gogs_user_token" not given.') user = self.get_user(data['gogs_user_token']) if not user or not user.username: raise Exception('Invalid user_token. User not found.') del data['gogs_user_token'] data['user'] = user.username job = TxJob(data, self.quiet) if not job.cdn_bucket: if not self.cdn_bucket: raise Exception('"cdn_bucket" not given.') else: job.cdn_bucket = self.cdn_bucket if not job.source: raise Exception('"source" url not given.') if not job.resource_type: raise Exception('"resource_type" not given.') if not job.input_format: raise Exception('"input_format" not given.') if not job.output_format: raise Exception('"output_format" not given.') module = self.get_converter_module(job) if not module: raise Exception('No converter was found to convert {0} from {1} to {2}'.format(job.resource_type, job.input_format, job.output_format)) job.convert_module = module.name created_at = datetime.utcnow() expires_at = created_at + timedelta(days=1) eta = created_at + timedelta(seconds=20) job.created_at = created_at.strftime("%Y-%m-%dT%H:%M:%SZ") job.expires_at = expires_at.strftime("%Y-%m-%dT%H:%M:%SZ") job.eta = eta.strftime("%Y-%m-%dT%H:%M:%SZ") job.status = 'requested' job.message = 'Conversion requested...' job_id = hashlib.sha256('{0}-{1}-{2}'.format(user.username, user.email, created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))).hexdigest() job.job_id = job_id # All conversions must result in a ZIP of the converted file(s) output_file = 'tx/job/{0}.zip'.format(job.job_id) job.output = '{0}/{1}'.format(self.cdn_url, output_file) job.cdn_file = output_file job.links = { "href": "{0}/tx/job/{1}".format(self.api_url, job_id), "rel": "self", "method": "GET" } # Saving this to the DynamoDB will start trigger a DB stream which will call # tx-manager again with the job info (see run() function) self.insert_job(job) return { "job": job.get_db_data(), "links": [ { "href": "{0}/tx/job".format(self.api_url), "rel": "list", "method": "GET" }, { "href": "{0}/tx/job".format(self.api_url), "rel": "create", "method": "POST" }, ], }
[docs] def list_jobs(self, data, must_be_authenticated=True): if must_be_authenticated: if 'gogs_user_token' not in data: raise Exception('"gogs_user_token" not given.') user = self.get_user(data['gogs_user_token']) if not user: raise Exception('Invalid user_token. User not found.') data['user'] = user.username del data['gogs_user_token'] jobs = self.query_jobs(data) ret = [] if jobs and len(jobs): for job in jobs: ret.append(job.get_db_data()) return ret
[docs] def list_endpoints(self): return { "version": "1", "links": [ { "href": "{0}/tx/job".format(self.api_url), "rel": "list", "method": "GET" }, { "href": "{0}/tx/job".format(self.api_url), "rel": "create", "method": "POST" }, ] }
[docs] def start_job(self, job_id): job = self.get_job(job_id) if not job.job_id: job.job_id = job_id job.success = False job.message = 'No job with ID {} has been requested'.format(job_id) return job.get_db_data() # Job doesn't exist, return # Only start the job if the status is 'requested' and a started timestamp hasn't been set if job.status != 'requested' or job.started_at: return job.get_db_data() # Job already started, return job.started_at = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") job.status = 'started' job.message = 'Conversion started...' job.log_message('Started job {0} at {1}'.format(job_id, job.started_at)) success = False try: self.update_job(job) module = self.get_converter_module(job) if not module: raise Exception('No converter was found to convert {0} from {1} to {2}' .format(job.resource_type, job.input_format, job.output_format)) job.converter_module = module.name self.update_job(job) payload = { 'job': job.get_db_data(), } job.log_message('Telling module {0} to convert {1} and put at {2}'.format(module.name, job.source, job.output)) headers = {"content-type": "application/json"} url = module.public_links[0] print("Payload to {0}:".format(url)) print(json.dumps(payload)) response = requests.post(url, json=payload, headers=headers) print('finished.') print("Response from {0}:".format(module.name)) print(response) json_data = response.json() if json_data: json_data = response.json() if 'errorMessage' in json_data: error = json_data['errorMessage'] if error.startswith('Bad Request: '): error = error[len('Bad Request: '):] job.error_message(error) if 'Payload' in json_data: payload = json_data['Payload'] print('Payload:') print(json.dumps(payload)) for message in payload['log']: if message: job.log_message(message) for message in payload['errors']: if message: job.error_message(message) for message in payload['warnings']: if message: job.warning_message(message) success = payload['success'] if payload['errors']: job.log_message('{0} function returned with errors.'.format(module.name)) elif payload['warnings']: job.log_message('{0} function returned with warnings.'.format(module.name)) elif payload['log']: job.log_message('{0} function returned.'.format(module.name)) except Exception as e: job.error_message('Failed with message: {0}'.format(e.message)) job.ended_at = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") if not success or len(job.errors): job.success = False job.status = "failed" message = "Conversion failed" elif len(job.warnings) > 0: job.success = True job.status = "warnings" message = "Conversion successful with warnings" else: job.success = True job.status = "success" message = "Conversion successful" job.message = message job.log_message(message) job.log_message('Finished job {0} at {1}'.format(job.job_id, job.ended_at)) self.update_job(job) callback_payload = job.get_db_data() callback_payload["message"] = message if job.callback: self.do_callback(job.callback, callback_payload) return job.get_db_data()
@staticmethod
[docs] def do_callback(url, payload): if url.startswith('http'): headers = {"content-type": "application/json"} print('Making callback to {0} with payload:'.format(url)) print(payload) requests.post(url, json=payload, headers=headers) print('finished.')
@staticmethod
[docs] def make_api_gateway_for_module(module): # lambda_func_name = module['name'] # AWS_LAMBDA_API_ID = '7X97xCLPDE16Jep5Zv85N6zy28wcQfJz79E2H3ln' # # of 'tx-manager_api_key' # # or fkcr7r4dz9 # # or 7X97xCLPDE16Jep5Zv85N6zy28wcQfJz79E2H3ln # AWS_REGION = 'us-west-2' # # api_client = boto3.client('apigateway') # aws_lambda = boto3.client('lambda') # # ## create resource # resource_resp = api_client.create_resource( # restApiId=AWS_LAMBDA_API_ID, # parentId='foo', # resource id for the Base API path # pathPart=lambda_func_name # ) # # ## create POST method # put_method_resp = api_client.put_method( # restApiId=AWS_LAMBDA_API_ID, # resourceId=resource_resp['id'], # httpMethod="POST", # authorizationType="NONE", # apiKeyRequired=True, # ) # # lambda_version = aws_lambda.meta.service_model.api_version # # uri_data = { # "aws-region": AWS_REGION, # "api-version": lambda_version, # "aws-acct-id": "xyzABC", # "lambda-function-name": lambda_func_name, # } # # uri = "arn:aws:apigateway:{aws-region}:lambda:path/{api-version}/functions/arn:aws:lambda:{aws-region}: # {aws-acct-id}:function:{lambda-function-name}/invocations".format(**uri_data) # # ## create integration # integration_resp = api_client.put_integration( # restApiId=AWS_LAMBDA_API_ID, # resourceId=resource_resp['id'], # httpMethod="POST", # type="AWS", # integrationHttpMethod="POST", # uri=uri, # ) # # api_client.put_integration_response( # restApiId=AWS_LAMBDA_API_ID, # resourceId=resource_resp['id'], # httpMethod="POST", # statusCode="200", # selectionPattern=".*" # ) # # ## create POST method response # api_client.put_method_response( # restApiId=AWS_LAMBDA_API_ID, # resourceId=resource_resp['id'], # httpMethod="POST", # statusCode="200", # ) # # uri_data['aws-api-id'] = AWS_LAMBDA_API_ID # source_arn = "arn:aws:execute-api:{aws-region}:{aws-acct-id}: # {aws-api-id}/*/POST/{lambda-function-name}".format(**uri_data) # # aws_lambda.add_permission( # FunctionName=lambda_func_name, # StatementId=uuid.uuid4().hex, # Action="lambda:InvokeFunction", # Principal="apigateway.amazonaws.com", # SourceArn=source_arn # ) # # # state 'your stage name' was already created via API Gateway GUI # api_client.create_deployment( # restApiId=AWS_LAMBDA_API_ID, # stageName="your stage name", # ) return
[docs] def register_module(self, data): module = TxModule(data, self.quiet) if not module.name: raise Exception('"name" not given.') if not module.type: raise Exception('"type" not given.') if not module.input_format: raise Exception('"input_format" not given.') if not module.output_format: raise Exception('"output_format" not given.') if not module.resource_types: raise Exception('"resource_types" not given.', exc_info=1) module.public_links.append("{0}/tx/convert/{1}".format(self.api_url, module.name)) self.insert_module(module) self.make_api_gateway_for_module(module) # Todo: develop this function return module.get_db_data()
[docs] def insert_job(self, job): job_data = job.get_db_data() self.job_db_handler.insert_item(job_data)
[docs] def query_jobs(self, data=None): items = self.job_db_handler.query_items(data) jobs = [] if items and len(items): for item in items: jobs.append(TxJob(item)) return jobs
[docs] def get_job(self, job_id): return TxJob(self.job_db_handler.get_item({'job_id': job_id}))
[docs] def update_job(self, job): return self.job_db_handler.update_item({'job_id': job.job_id}, job.get_db_data())
[docs] def delete_job(self, job): return self.job_db_handler.delete_item({'job_id': job.job_id})
[docs] def insert_module(self, module): module_data = module.get_db_data() self.module_db_handler.insert_item(module_data)
[docs] def query_modules(self, data=None): items = self.module_db_handler.query_items(data) modules = [] if items and len(items): for item in items: modules.append(TxModule(item)) return modules
[docs] def get_module(self, name): return TxModule(self.module_db_handler.get_item({'name': name}))
[docs] def update_module(self, module): return self.module_db_handler.update_item({'name': module.name}, module.get_db_data())
[docs] def delete_module(self, module): return self.module_db_handler.delete_item({'name': module.name})
[docs] def generate_dashboard(self): """ Generate page with metrics indicating configuration of tx-manager. :param dict event: :param context: :param DynamicDBHandler: :param logger: """ self.logger.info("Start: generateDashboard") dashboard = { 'title': 'tX-Manager Dashboard', 'body': 'No modules found' } items = sorted(self.module_db_handler.query_items(), key=lambda k: k['name']) if items and len(items): self.logger.info(" Found: " + str(len(items)) + " item[s] in tx-module") body = BeautifulSoup('<h1>TX-Manager Dashboard</h1><h2>Module Attributes</h2><br><table></table>', 'html.parser') for item in items: # self.logger.info(json.dumps(item)) self.logger.info(item["name"]) body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '"><td class="hdr" colspan="2">' + str(item["name"]) + '</td></tr>', 'html.parser')) # TBD the following code almosts walks the db record replacing next 11 lines # for attr, val in item: # if (attr != 'name') and (len(attr) > 0): # rec += ' <tr><td class="lbl">' + attr.replace("_", " ").title() + ':</td><td>' + "lst(val)" + "</td></tr>\n" # rec += '<tr><td colspan="2"></td></tr>' body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-type" class="module-type"><td class="lbl">Type:</td><td>' + str(item["type"]) + '</td></tr>', 'html.parser')) body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-input" class="module-input"><td class="lbl">Input Format:</td><td>' + json.dumps(item["input_format"]) + '</td></tr>', 'html.parser')) body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-output" class="module-output"><td class="lbl">Output Format:</td><td>' + json.dumps(item["output_format"]) + '</td></tr>', 'html.parser')) body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-resource" class="module-resource"><td class="lbl">Resource Types:</td><td>' + json.dumps(item["resource_types"]) + '</td></tr>', 'html.parser')) body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-version" class="module-version"><td class="lbl">Version:</td><td>' + str(item["version"]) + '</td></tr>', 'html.parser')) if len(item["options"]) > 0: body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-options" class="module-options"><td class="lbl">Options:</td><td>' + json.dumps(item["options"]) + '</td></tr>', 'html.parser')) if len(item["private_links"]) > 0: body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-private-links" class="module-private-links"><td class="lbl">Private Links:</td><td>' + json.dumps(item["private_links"]) + '</td></tr>', 'html.parser')) if len(item["public_links"]) > 0: body.table.append(BeautifulSoup( '<tr id="' + item['name'] + '-public-links" class="module-public-links"><td class="lbl">Public Links:</td><td>' + json.dumps(item["public_links"]) + '</td></tr>', 'html.parser')) dashboard['body'] = body.prettify('UTF-8') else: self.logger.info("No modules found.") return dashboard