from __future__ import unicode_literals, print_function
import json
import hashlib
import requests
import logging
from datetime import datetime
from datetime import timedelta
from libraries.aws_tools.lambda_handler import LambdaHandler
from bs4 import BeautifulSoup
from libraries.aws_tools.dynamodb_handler import DynamoDBHandler
from libraries.door43_tools.page_metrics import PageMetrics
from libraries.gogs_tools.gogs_handler import GogsHandler
from libraries.models.job import TxJob
from libraries.models.module import TxModule
[docs]class TxManager(object):
JOB_TABLE_NAME = 'tx-job'
MODULE_TABLE_NAME = 'tx-module'
MAX_FAILURES = 10
def __init__(self, api_url=None, gogs_url=None, cdn_url=None, cdn_bucket=None,
aws_access_key_id=None, aws_secret_access_key=None,
job_table_name=None, module_table_name=None, language_stats_table_name=None,
prefix=''):
"""
:param string api_url:
:param string gogs_url:
:param string cdn_url:
:param string cdn_bucket:
:param string aws_access_key_id:
:param string aws_secret_access_key:
:param string job_table_name:
:param string module_table_name:
:param string prefix:
"""
self.api_url = api_url
self.gogs_url = gogs_url
self.cdn_url = cdn_url
self.cdn_bucket = cdn_bucket
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.job_table_name = job_table_name
self.module_table_name = module_table_name
self.prefix = prefix
self.language_stats_table_name = language_stats_table_name
if not self.job_table_name:
self.job_table_name = TxManager.JOB_TABLE_NAME
if not self.module_table_name:
self.module_table_name = TxManager.MODULE_TABLE_NAME
self.job_db_handler = None
self.module_db_handler = None
self.gogs_handler = None
self.lambda_handler = None
self.jobs_total = 0
self.jobs_warnings = 0
self.jobs_failures = 0
self.jobs_success = 0
self.language_views = None
self.language_dates = None
self.logger = logging.getLogger()
self.setup_resources()
[docs] def setup_resources(self):
if self.job_table_name:
self.job_db_handler = DynamoDBHandler(self.job_table_name)
if self.module_table_name:
self.module_db_handler = DynamoDBHandler(self.module_table_name)
if self.gogs_url:
self.gogs_handler = GogsHandler(self.gogs_url)
self.lambda_handler = LambdaHandler(self.aws_access_key_id, self.aws_secret_access_key)
[docs] def get_user(self, user_token):
return self.gogs_handler.get_user(user_token)
[docs] def get_converter_module(self, job):
tx_modules = TxModule(db_handler=self.module_db_handler).query()
for tx_module in tx_modules:
if job.resource_type in tx_module.resource_types:
if job.input_format in tx_module.input_format:
if job.output_format in tx_module.output_format:
return tx_module
return None
[docs] def setup_job(self, data):
if 'gogs_user_token' not in data:
raise Exception('"gogs_user_token" not given.')
user = self.get_user(data['gogs_user_token'])
if not user or not user.username:
raise Exception('Invalid user_token. User not found.')
del data['gogs_user_token']
data['user'] = user.username
job = TxJob(data, db_handler=self.job_db_handler)
if not job.cdn_bucket:
if not self.cdn_bucket:
raise Exception('"cdn_bucket" not given.')
else:
job.cdn_bucket = self.cdn_bucket
if not job.source:
raise Exception('"source" url not given.')
if not job.resource_type:
raise Exception('"resource_type" not given.')
if not job.input_format:
raise Exception('"input_format" not given.')
if not job.output_format:
raise Exception('"output_format" not given.')
tx_module = self.get_converter_module(job)
if not tx_module:
raise Exception('No converter was found to convert {0} from {1} to {2}'.format(job.resource_type,
job.input_format,
job.output_format))
job.convert_module = tx_module.name
created_at = datetime.utcnow()
expires_at = created_at + timedelta(days=1)
eta = created_at + timedelta(seconds=20)
job.created_at = created_at.strftime("%Y-%m-%dT%H:%M:%SZ")
job.expires_at = expires_at.strftime("%Y-%m-%dT%H:%M:%SZ")
job.eta = eta.strftime("%Y-%m-%dT%H:%M:%SZ")
job.status = 'requested'
job.message = 'Conversion requested...'
job.job_id = hashlib.sha256('{0}-{1}-{2}'.format(user.username,
user.email,
created_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))).hexdigest()
# All conversions must result in a ZIP of the converted file(s)
output_file = 'tx/job/{0}.zip'.format(job.job_id)
job.output = '{0}/{1}'.format(self.cdn_url, output_file)
job.cdn_file = output_file
job.links = {
"href": "{0}/tx/job/{1}".format(self.api_url, job.job_id),
"rel": "self",
"method": "GET"
}
# Saving this to the DynamoDB will start trigger a DB stream which will call
# tx-manager again with the job info (see run() function)
job.insert()
return {
"job": job.get_db_data(),
"links": [
{
"href": "{0}/tx/job".format(self.api_url),
"rel": "list",
"method": "GET"
},
{
"href": "{0}/tx/job".format(self.api_url),
"rel": "create",
"method": "POST"
},
],
}
[docs] def get_job_count(self):
"""
get number of jobs in database - one caveat is that this value may be off since AWS only updates it every 6 hours
:return:
"""
return self.job_db_handler.get_item_count()
[docs] def list_jobs(self, data, must_be_authenticated=True):
if must_be_authenticated:
if 'gogs_user_token' not in data:
raise Exception('"gogs_user_token" not given.')
user = self.get_user(data['gogs_user_token'])
if not user:
raise Exception('Invalid user_token. User not found.')
data['user'] = user.username
del data['gogs_user_token']
jobs = TxJob(db_handler=self.job_db_handler).query(data)
return jobs
[docs] def list_endpoints(self):
return {
"version": "1",
"links": [
{
"href": "{0}/tx/job".format(self.api_url),
"rel": "list",
"method": "GET"
},
{
"href": "{0}/tx/job".format(self.api_url),
"rel": "create",
"method": "POST"
},
]
}
[docs] def start_job(self, job_id):
job = TxJob(job_id, db_handler=self.job_db_handler)
if not job.job_id:
job.job_id = job_id
job.success = False
job.message = 'No job with ID {} has been requested'.format(job_id)
return job.get_db_data() # Job doesn't exist, return
# Only start the job if the status is 'requested' and a started timestamp hasn't been set
if job.status != 'requested' or job.started_at:
return job.get_db_data() # Job already started, return
job.started_at = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
job.status = 'started'
job.message = 'Conversion started...'
job.log_message('Started job {0} at {1}'.format(job_id, job.started_at))
success = False
try:
job.update(['started_at', 'status', 'message', 'log'])
tx_module = self.get_converter_module(job)
if not tx_module:
raise Exception('No converter was found to convert {0} from {1} to {2}'
.format(job.resource_type, job.input_format, job.output_format))
job.convert_module = tx_module.name
job.update('convert_module')
payload = {
'data': {
'job': job.get_db_data(),
}
}
converter_function = '{0}tx_convert_{1}'.format(self.prefix, tx_module.name)
job.log_message('Telling module {0} to convert {1} and put at {2}'.format(converter_function,
job.source,
job.output))
self.logger.debug("Payload to {0}:".format(converter_function))
self.logger.debug(json.dumps(payload))
response = self.lambda_handler.invoke(converter_function, payload)
self.logger.debug('finished.')
# Get a new job since the webhook may have updated warnings
job = TxJob(job_id, db_handler=self.job_db_handler)
if 'errorMessage' in response:
error = response['errorMessage']
if error.startswith('Bad Request: '):
error = error[len('Bad Request: '):]
job.error_message(error)
self.logger.debug('Received error message from {0}: {1}'.format(converter_function, error))
elif 'Payload' in response:
json_data = json.loads(response['Payload'].read())
self.logger.debug("Payload from {0}: {1}".format(converter_function, json_data))
# The 'Payload' of the response could result in a few different formats:
# 1) It could be that an exception was thrown in the converter code, which the API Gateway puts
# into a json array with "errorMessage" containing the exception message, which we handled above.
# 2) If a "success" key is in the payload, that means our code finished with
# the expected results (see converters/converter.py's run() return value).
# 3) The other possibility is for the Lambda function to not finish executing
# (e.g. exceeds its 5 minute execution limit). We don't currently handle this possibility.
# Todo: Handle lambda function returning due to exceeding 5 minutes execution limit
if 'success' in json_data:
success = json_data['success']
for message in json_data['info']:
if message:
job.log_message(message)
for message in json_data['errors']:
if message:
job.error_message(message)
for message in json_data['warnings']:
if message:
job.warning_message(message)
if len(json_data['errors']):
job.log_message('{0} function returned with errors.'.format(tx_module.name))
elif len(json_data['warnings']):
job.log_message('{0} function returned with warnings.'.format(tx_module.name))
else:
job.log_message('{0} function returned successfully.'.format(tx_module.name))
else:
job.error_message('Conversion failed for unknown reason.')
else:
job.error_message('Conversion failed for unknown reason.')
except Exception as e:
job.error_message('Failed with message: {0}'.format(e.message))
job.ended_at = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
if not success or len(job.errors):
job.success = False
job.status = "failed"
message = "Conversion failed"
self.logger.debug("Conversion failed, success: {0}, errors: {1}".format(success, job.errors))
elif len(job.warnings) > 0:
job.success = True
job.status = "warnings"
message = "Conversion successful with warnings"
else:
job.success = True
job.status = "success"
message = "Conversion successful"
job.message = message
job.log_message(message)
job.log_message('Finished job {0} at {1}'.format(job.job_id, job.ended_at))
job.update()
callback_payload = job.get_db_data()
callback_payload["message"] = message
if job.callback:
self.do_callback(job.callback, callback_payload)
return job.get_db_data()
[docs] def do_callback(self, url, payload):
if url.startswith('http'):
headers = {"content-type": "application/json"}
self.logger.debug('Making callback to {0} with payload:'.format(url))
self.logger.debug(payload)
requests.post(url, json=payload, headers=headers)
self.logger.debug('finished.')
[docs] def make_api_gateway_for_module(self, module):
# lambda_func_name = module['name']
# AWS_LAMBDA_API_ID = '7X97xCLPDE16Jep5Zv85N6zy28wcQfJz79E2H3ln'
# # of 'tx-manager_api_key'
# # or fkcr7r4dz9
# # or 7X97xCLPDE16Jep5Zv85N6zy28wcQfJz79E2H3ln
# AWS_REGION = 'us-west-2'
#
# api_client = boto3.client('apigateway')
# aws_lambda = boto3.client('lambda')
#
# ## create resource
# resource_resp = api_client.create_resource(
# restApiId=AWS_LAMBDA_API_ID,
# parentId='foo', # resource id for the Base API path
# pathPart=lambda_func_name
# )
#
# ## create POST method
# put_method_resp = api_client.put_method(
# restApiId=AWS_LAMBDA_API_ID,
# resourceId=resource_resp['id'],
# httpMethod="POST",
# authorizationType="NONE",
# apiKeyRequired=True,
# )
#
# lambda_version = aws_lambda.meta.service_model.api_version
#
# uri_data = {
# "aws-region": AWS_REGION,
# "api-version": lambda_version,
# "aws-acct-id": "xyzABC",
# "lambda-function-name": lambda_func_name,
# }
#
# uri = "arn:aws:apigateway:{aws-region}:lambda:path/{api-version}/functions/arn:aws:lambda:{aws-region}:
# {aws-acct-id}:function:{lambda-function-name}/invocations".format(**uri_data)
#
# ## create integration
# integration_resp = api_client.put_integration(
# restApiId=AWS_LAMBDA_API_ID,
# resourceId=resource_resp['id'],
# httpMethod="POST",
# type="AWS",
# integrationHttpMethod="POST",
# uri=uri,
# )
#
# api_client.put_integration_response(
# restApiId=AWS_LAMBDA_API_ID,
# resourceId=resource_resp['id'],
# httpMethod="POST",
# statusCode="200",
# selectionPattern=".*"
# )
#
# ## create POST method response
# api_client.put_method_response(
# restApiId=AWS_LAMBDA_API_ID,
# resourceId=resource_resp['id'],
# httpMethod="POST",
# statusCode="200",
# )
#
# uri_data['aws-api-id'] = AWS_LAMBDA_API_ID
# source_arn = "arn:aws:execute-api:{aws-region}:{aws-acct-id}:
# {aws-api-id}/*/POST/{lambda-function-name}".format(**uri_data)
#
# aws_lambda.add_permission(
# FunctionName=lambda_func_name,
# StatementId=uuid.uuid4().hex,
# Action="lambda:InvokeFunction",
# Principal="apigateway.amazonaws.com",
# SourceArn=source_arn
# )
#
# # state 'your stage name' was already created via API Gateway GUI
# api_client.create_deployment(
# restApiId=AWS_LAMBDA_API_ID,
# stageName="your stage name",
# )
return
[docs] def register_module(self, data):
tx_module = TxModule(data=data, db_handler=self.module_db_handler)
if not tx_module.name:
raise Exception('"name" not given.')
if not tx_module.type:
raise Exception('"type" not given.')
if not tx_module.input_format:
raise Exception('"input_format" not given.')
if not tx_module.output_format:
raise Exception('"output_format" not given.')
if not tx_module.resource_types:
raise Exception('"resource_types" not given.')
tx_module.public_links.append("{0}/tx/convert/{1}".format(self.api_url, tx_module.name))
tx_module.insert()
self.make_api_gateway_for_module(tx_module) # Todo: develop this function
return tx_module.get_db_data()
[docs] def generate_dashboard(self, max_failures=MAX_FAILURES):
"""
Generate page with metrics indicating configuration of tx-manager.
:param int max_failures:
"""
self.logger.debug("Start: generateDashboard")
dashboard = {
'title': 'tX-Manager Dashboard',
'body': 'No modules found'
}
items = sorted(self.module_db_handler.query_items(), key=lambda k: k['name'])
if items and len(items):
module_names = []
for item in items:
module_names.append(item["name"])
registered_jobs = self.list_jobs({ "convert_module" : { "condition" : "is_in", "value" : module_names}
}, False)
total_job_count = self.get_job_count()
registered_job_count = len(registered_jobs)
if registered_job_count > total_job_count: # sanity check since AWS can be slow to update job count reported in table (every 6 hours)
total_job_count = registered_job_count
self.logger.debug("Found: " + str(len(items)) + " item[s] in tx-module")
body = BeautifulSoup('<h1>TX-Manager Dashboard</h1><h2>Module Attributes</h2><br><table id="status"></table>',
'html.parser')
for item in items:
module_name = item["name"]
self.logger.debug(module_name)
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '"><td class="hdr" colspan="2">' + str(module_name) + '</td></tr>',
'html.parser'))
self.get_jobs_counts_for_module(registered_jobs, module_name)
# TBD the following code almosts walks the db record replacing next 11 lines
# for attr, val in item:
# if (attr != 'name') and (len(attr) > 0):
# rec += ' <tr><td class="lbl">' + attr.replace("_", " ").title() + ':</td><td>' + "lst(val)" + "</td></tr>\n"
# rec += '<tr><td colspan="2"></td></tr>'
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-type" class="module-type"><td class="lbl">Type:</td><td>' +
str(item["type"]) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-input" class="module-input"><td class="lbl">Input Format:</td><td>' +
json.dumps(item["input_format"]) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-output" class="module-output"><td class="lbl">Output Format:</td><td>' +
json.dumps(item["output_format"]) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-resource" class="module-resource"><td class="lbl">Resource Types:</td><td>' +
json.dumps(item["resource_types"]) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-version" class="module-version"><td class="lbl">Version:</td><td>' +
str(item["version"]) + '</td></tr>',
'html.parser'))
if len(item["options"]) > 0:
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-options" class="module-options"><td class="lbl">Options:</td><td>' +
json.dumps(item["options"]) + '</td></tr>',
'html.parser'))
if len(item["private_links"]) > 0:
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-private-links" class="module-private-links"><td class="lbl">Private Links:</td><td>' +
json.dumps(item["private_links"]) + '</td></tr>',
'html.parser'))
if len(item["public_links"]) > 0:
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-public-links" class="module-public-links"><td class="lbl">Public Links:</td><td>' +
json.dumps(item["public_links"]) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-job-success" class="module-public-links"><td class="lbl">Job Successes:</td><td>' +
str(self.jobs_success) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-job-warning" class="module-public-links"><td class="lbl">Job Warnings:</td><td>' +
str(self.jobs_warnings) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-job-failure" class="module-public-links"><td class="lbl">Job Failures:</td><td>' +
str(self.jobs_failures) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="' + module_name + '-job-total" class="module-public-links"><td class="lbl">Jobs Total:</td><td>' +
str(self.jobs_total) + '</td></tr>',
'html.parser'))
self.get_jobs_counts(registered_jobs)
body.table.append(BeautifulSoup(
'<tr id="totals"><td class="hdr" colspan="2">Total Jobs</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="totals-job-success" class="module-public-links"><td class="lbl">Success:</td><td>' +
str(self.jobs_success) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="totals-job-warning" class="module-public-links"><td class="lbl">Warnings:</td><td>' +
str(self.jobs_warnings) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="totals-job-failure" class="module-public-links"><td class="lbl">Failures:</td><td>' +
str(self.jobs_failures) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="totals-job-unregistered" class="module-public-links"><td class="lbl">Unregistered:</td><td>' +
str(total_job_count - self.jobs_total) + '</td></tr>',
'html.parser'))
body.table.append(BeautifulSoup(
'<tr id="totals-job-total" class="module-public-links"><td class="lbl">Total:</td><td>' +
str(total_job_count) + '</td></tr>',
'html.parser'))
# build job failures table
job_failures = self.get_job_failures(registered_jobs, max_failures)
body.append(BeautifulSoup('<h2>Failed Jobs</h2>', 'html.parser'))
failure_table = BeautifulSoup('<table id="failed" cellpadding="4" border="1" style="border-collapse:collapse"></table>','html.parser')
failure_table.table.append(BeautifulSoup('''
<tr id="header">
<th class="hdr">Time</th>
<th class="hdr">Errors</th>
<th class="hdr">Repo</th>
<th class="hdr">PreConvert</th>
<th class="hdr">Converted</th>
<th class="hdr">Destination</th>''',
'html.parser'))
gogs_url = self.gogs_url
if gogs_url is None:
gogs_url = 'https://git.door43.org'
for i in range(0, len(job_failures)):
item = job_failures[i]
try:
identifier = item.identifier
owner_name, repo_name, commit_id = identifier.split('/')[:3]
source_sub_path = '{0}/{1}'.format(owner_name, repo_name)
cdn_bucket = item.cdn_bucket
destination_url = 'https://{0}/u/{1}/{2}/{3}/build_log.json'.format(cdn_bucket, owner_name, repo_name, commit_id)
repo_url = gogs_url + "/" + source_sub_path
preconverted_url = item.source
converted_url = item.output
failure_table.table.append(BeautifulSoup(
'<tr id="failure-' + str(i) + '" class="module-job-id">'
+ '<td>' + item.created_at + '</td>'
+ '<td>' + ','.join(item.errors) + '</td>'
+ '<td><a href="' + repo_url + '">' + source_sub_path + '</a></td>'
+ '<td><a href="' + preconverted_url + '">' + preconverted_url.rsplit('/', 1)[1] + '</a></td>'
+ '<td><a href="' + converted_url + '">' + item.job_id + '.zip</a></td>'
+ '<td><a href="' + destination_url + '">Build Log</a></td>'
+ '</tr>',
'html.parser'))
except:
pass
body.append(failure_table)
self.build_language_popularity_tables(body, max_failures)
dashboard['body'] = body.prettify('UTF-8')
else:
self.logger.debug("No modules found.")
return dashboard
[docs] def build_language_popularity_tables(self, body, max_count):
vc = PageMetrics(language_stats_table_name=self.language_stats_table_name)
vc.init_language_stats_table(None)
self.language_views = vc.get_language_views_sorted_by_count()
self.language_dates = vc.get_language_views_sorted_by_date()
self.generate_highest_views_lang_table(body, self.language_views, max_count)
self.generate_most_recent_lang_table(body, self.language_dates, max_count)
[docs] def generate_most_recent_lang_table(self, body, dates, max_count):
body.append(BeautifulSoup('<h2>Recent Languages</h2>', 'html.parser'))
language_recent_table = BeautifulSoup(
'<table id="language-recent" cellpadding="4" border="1" style="border-collapse:collapse"></table>',
'html.parser')
language_recent_table.table.append(BeautifulSoup('''
<tr id="header">
<th class="hdr">Updated</th>
<th class="hdr">Language Code</th>''',
'html.parser'))
if dates is not None:
for i in range(0, max_count):
if i >= len(dates):
break
item = dates[i]
try:
language_recent_table.table.append(BeautifulSoup(
'<tr id="recent-' + str(i) + '" class="module-job-id">'
+ '<td>' + item['last_updated'] + '</td>'
+ '<td>' + item['lang_code'] + '</td>'
+ '</tr>',
'html.parser'))
except:
pass
body.append(language_recent_table)
[docs] def generate_highest_views_lang_table(self, body, views, max_count):
body.append(BeautifulSoup('<h2>Popular Languages</h2>', 'html.parser'))
language_popularity_table = BeautifulSoup(
'<table id="language-popularity" cellpadding="4" border="1" style="border-collapse:collapse"></table>',
'html.parser')
language_popularity_table.table.append(BeautifulSoup('''
<tr id="header">
<th class="hdr">Views</th>
<th class="hdr">Language Code</th>''',
'html.parser'))
if views is not None:
for i in range(0, max_count):
if i >= len(views):
break
item = views[i]
try:
language_popularity_table.table.append(BeautifulSoup(
'<tr id="popular-' + str(i) + '" class="module-job-id">'
+ '<td>' + str(item['views']) + '</td>'
+ '<td>' + item['lang_code'] + '</td>'
+ '</tr>',
'html.parser'))
except:
pass
body.append(language_popularity_table)
[docs] def get_jobs_counts_for_module(self, jobs, module_name):
self.jobs_warnings = 0
self.jobs_failures = 0
self.jobs_success = 0
self.jobs_total = 0
for job in jobs:
name = job.convert_module
if name == module_name:
self.jobs_total += 1
self.update_job_status(job)
[docs] def get_jobs_counts(self, jobs):
self.jobs_total = len(jobs)
self.jobs_warnings = 0
self.jobs_failures = 0
self.jobs_success = 0
for job in jobs:
self.update_job_status(job)
[docs] def update_job_status(self, job):
status = job.status
if status == "failed":
self.jobs_failures += 1
elif status == 'warnings':
self.jobs_warnings += 1
elif status != "success":
self.jobs_failures += 1
else:
self.jobs_success += 1
[docs] def get_job_failures(self, jobs, max_count):
failed_jobs = []
not_error = ['success', 'warnings']
for job in jobs:
status = job.status
if (status not in not_error):
failed_jobs.append(job)
failed_jobs = sorted(failed_jobs, key=lambda k: k.created_at, reverse=True)
top_failed_jobs = failed_jobs[:max_count]
return top_failed_jobs