from __future__ import print_function, unicode_literals
import urllib
import copy
import os
import tempfile
import requests
import logging
import json
from datetime import datetime
from libraries.general_tools.file_utils import unzip, write_file, add_contents_to_zip, remove_tree
from libraries.general_tools.url_utils import download_file
from libraries.resource_container.ResourceContainer import RC, BIBLE_RESOURCE_TYPES
from libraries.client.preprocessors import do_preprocess
from libraries.aws_tools.s3_handler import S3Handler
from libraries.models.manifest import TxManifest
from libraries.aws_tools.dynamodb_handler import DynamoDBHandler
from libraries.models.job import TxJob
from libraries.aws_tools.lambda_handler import LambdaHandler
[docs]class ClientWebhook(object):
MANIFEST_TABLE_NAME = 'tx-manifest'
JOB_TABLE_NAME = 'tx-job'
def __init__(self, commit_data=None, api_url=None, pre_convert_bucket=None, cdn_bucket=None,
gogs_url=None, gogs_user_token=None, manifest_table_name=None, job_table_name=None, prefix=''):
"""
:param dict commit_data:
:param string api_url:
:param string pre_convert_bucket:
:param string cdn_bucket:
:param string gogs_url:
:param string gogs_user_token:
"""
self.commit_data = commit_data
self.api_url = api_url
self.pre_convert_bucket = pre_convert_bucket
self.cdn_bucket = cdn_bucket
self.gogs_url = gogs_url
self.gogs_user_token = gogs_user_token
self.manifest_table_name = manifest_table_name
self.job_table_name = job_table_name
self.prefix = prefix
self.logger = logging.getLogger()
if self.pre_convert_bucket:
# we use us-west-2 for our s3 buckets
self.source_url_base = 'https://s3-us-west-2.amazonaws.com/{0}'.format(self.pre_convert_bucket)
else:
self.source_url_base = None
self.run_linter_function = '{0}tx_run_linter'.format(self.prefix)
self.cdn_handler = None
self.preconvert_handler = None
self.manifest_db_handler = None
self.job_db_handler = None
self.lambda_handler = None
if not self.manifest_table_name:
self.manifest_table_name = ClientWebhook.MANIFEST_TABLE_NAME
if not self.job_table_name:
self.job_table_name = ClientWebhook.JOB_TABLE_NAME
# move everything down one directory level for simple delete
self.intermediate_dir = 'tx-manager'
self.base_temp_dir = os.path.join(tempfile.gettempdir(), self.intermediate_dir)
self.setup_resources()
[docs] def setup_resources(self):
if self.manifest_table_name:
self.manifest_db_handler = DynamoDBHandler(self.manifest_table_name)
if self.job_table_name:
self.job_db_handler = DynamoDBHandler(self.job_table_name)
self.lambda_handler = LambdaHandler()
[docs] def process_webhook(self):
try:
os.makedirs(self.base_temp_dir)
except:
pass
commit_id = self.commit_data['after']
commit = None
for commit in self.commit_data['commits']:
if commit['id'] == commit_id:
break
commit_id = commit_id[:10] # Only use the short form
commit_url = commit['url']
commit_message = commit['message']
if self.gogs_url not in commit_url:
raise Exception('Repos can only belong to {0} to use this webhook client.'.format(self.gogs_url))
repo_name = self.commit_data['repository']['name']
repo_owner = self.commit_data['repository']['owner']['username']
compare_url = self.commit_data['compare_url']
if 'pusher' in self.commit_data:
pusher = self.commit_data['pusher']
else:
pusher = {'username': commit['author']['username']}
pusher_username = pusher['username']
if not self.cdn_handler:
self.cdn_handler = S3Handler(self.cdn_bucket)
if not self.preconvert_handler:
self.preconvert_handler = S3Handler(self.pre_convert_bucket)
# 1) Download and unzip the repo files
repo_dir = self.get_repo_files(commit_url, repo_name)
# Get the resource container
rc = RC(repo_dir, repo_name)
# Save manifest to manifest table
manifest_data = {
'repo_name_lower': repo_name.lower(),
'user_name_lower': repo_owner.lower(),
'repo_name': repo_name,
'user_name': repo_owner,
'lang_code': rc.resource.language.identifier,
'resource_id': rc.resource.identifier,
'resource_type': rc.resource.type,
'title': rc.resource.title,
'last_updated': datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
'manifest': json.dumps(rc.as_dict()),
'manifest_lower': json.dumps(rc.as_dict()).lower(),
}
# First see if manifest already exists in DB and update it if it is (repo_name will not be None after load)
tx_manifest = TxManifest(db_handler=self.manifest_db_handler).load({
'repo_name_lower': repo_name.lower(),
'user_name_lower': repo_owner.lower(),
})
if tx_manifest.repo_name_lower:
self.logger.debug('Updating manifest in manifest table: {0}'.format(manifest_data))
tx_manifest.update(manifest_data)
else:
tx_manifest.populate(manifest_data)
self.logger.debug('Inserting manifest into manifest table: {0}'.format(tx_manifest.get_db_data()))
tx_manifest.insert()
# Preprocess the files
output_dir = tempfile.mkdtemp(dir=self.base_temp_dir, prefix='output_')
results, preprocessor = do_preprocess(rc, repo_dir, output_dir)
# 3) Zip up the massaged files
# commit_id is a unique ID for this lambda call, so using it to not conflict with other requests
zip_filepath = tempfile.mktemp(dir=self.base_temp_dir, suffix='.zip')
self.logger.debug('Zipping files from {0} to {1}...'.format(output_dir, zip_filepath))
add_contents_to_zip(zip_filepath, output_dir)
self.logger.debug('finished.')
# 4) Upload zipped file to the S3 bucket
file_key = self.upload_zip_file(commit_id, zip_filepath)
if not preprocessor.isMultipleJobs():
# Send job request to tx-manager
identifier, job = self.send_job_request_to_tx_manager(commit_id, file_key, rc, repo_name, repo_owner)
# Compile data for build_log.json
build_log_json = self.create_build_log(commit_id, commit_message, commit_url, compare_url, job,
pusher_username, repo_name, repo_owner)
# Upload build_log.json to S3:
s3_commit_key = 'u/{0}'.format(identifier)
self.clear_commit_directory_in_cdn(s3_commit_key)
self.upload_build_log_to_s3(build_log_json, s3_commit_key)
# Download the project.json file for this repo (create it if doesn't exist) and update it
self.update_project_json(commit_id, job, repo_name, repo_owner)
# Send lint request
lint_results = self.send_lint_request_to_run_linter(job, rc, commit_url)
job = TxJob(job.job_id, db_handler=self.job_db_handler)
if 'success' in lint_results and lint_results['success']:
job.warnings += lint_results['warnings']
job.update('warnings')
# Upload build_log.json to S3 again:
build_log_json = self.create_build_log(commit_id, commit_message, commit_url, compare_url, job,
pusher_username, repo_name, repo_owner)
self.upload_build_log_to_s3(build_log_json, s3_commit_key)
remove_tree(self.base_temp_dir) # cleanup
if len(job.errors) > 0:
raise Exception('; '.join(job.errors))
else:
return build_log_json
# -------------------------
# multiple book project
# -------------------------
books = preprocessor.getBookList()
self.logger.debug('Splitting job into separate parts for books: ' + ','.join(books))
errors = []
build_logs = []
jobs = []
master_identifier = self.create_new_identifier(repo_owner, repo_name, commit_id)
master_s3_commit_key = 'u/{0}'.format(master_identifier)
self.clear_commit_directory_in_cdn(master_s3_commit_key)
book_count = len(books)
last_job_id = '0'
for i in range(0, book_count):
book = books[i]
part_id = '{0}_of_{1}'.format(i, book_count)
# 3) Zip up the massaged files for just the one book
self.logger.debug('Adding job for {0} part {1}'.format(book, part_id))
# Send job request to tx-manager
source_url = self.build_multipart_source(file_key, book)
identifier, job = self.send_job_request_to_tx_manager(commit_id, source_url, rc, repo_name, repo_owner,
count=book_count, part=i, book=book)
jobs.append(job)
last_job_id = job.job_id
build_log_json = self.create_build_log(commit_id, commit_message, commit_url, compare_url, job,
pusher_username, repo_name, repo_owner)
part = str(i)
if len(book) > 0:
build_log_json['book'] = book
build_log_json['part'] = part
# Upload build_log.json to S3:
self.upload_build_log_to_s3(build_log_json, master_s3_commit_key, part + "/")
errors += job.errors
build_logs.append(build_log_json)
# Download the project.json file for this repo (create it if doesn't exist) and update it
self.update_project_json(commit_id, jobs[0], repo_name, repo_owner)
source_url = self.source_url_base + "/preconvert/" + commit_id + '.zip'
build_logs_json = copy.copy(build_log_json)
build_logs_json['multiple'] = True
build_logs_json['build_logs'] = build_logs
build_logs_json['job_id'] = last_job_id
build_logs_json['source'] = source_url
errors = []
warnings = []
for i in range(0, book_count):
build_log = build_logs[i]
errors += build_log['errors']
warnings += build_log['warnings']
build_logs_json['errors'] = errors
build_logs_json['warnings'] = warnings
# Upload build_log.json to S3:
self.upload_build_log_to_s3(build_logs_json, master_s3_commit_key)
# Send lint request
job = TxJob(last_job_id, db_handler=self.job_db_handler)
lint_results = self.send_lint_request_to_run_linter(job, rc, source_url)
job = TxJob(last_job_id, db_handler=self.job_db_handler) # Load again in case changed elsewhere
if lint_results['success']:
job.warnings += lint_results['warnings']
job.update('warnings')
# Upload build_log.json to S3 again:
build_logs_json['warnings'] += lint_results['warnings']
self.upload_build_log_to_s3(build_logs_json, master_s3_commit_key)
remove_tree(self.base_temp_dir) # cleanup
if len(errors) > 0:
raise Exception('; '.join(errors))
else:
return build_logs_json
[docs] def build_multipart_source(self, file_key, book):
params = urllib.urlencode({'convert_only': book})
source_url = '{0}?{1}'.format(file_key, params)
return source_url
[docs] def clear_commit_directory_in_cdn(self, s3_commit_key):
# clear out the commit directory in the cdn bucket for this project revision
for obj in self.cdn_handler.get_objects(prefix=s3_commit_key):
self.logger.debug('Removing file: ' + obj.key)
self.cdn_handler.delete_file(obj.key)
[docs] def upload_build_log_to_s3(self, build_log_json, s3_commit_key, part=''):
build_log_file = os.path.join(self.base_temp_dir, 'build_log.json')
write_file(build_log_file, build_log_json)
upload_key = '{0}/{1}build_log.json'.format(s3_commit_key, part)
self.logger.debug('Saving build log to ' + upload_key)
self.cdn_handler.upload_file(build_log_file, upload_key)
# self.logger.debug('build log contains: ' + json.dumps(build_log_json))
[docs] def create_build_log(self, commit_id, commit_message, commit_url, compare_url, job, pusher_username, repo_name,
repo_owner):
build_log_json = job.get_db_data()
build_log_json['repo_name'] = repo_name
build_log_json['repo_owner'] = repo_owner
build_log_json['commit_id'] = commit_id
build_log_json['committed_by'] = pusher_username
build_log_json['commit_url'] = commit_url
build_log_json['compare_url'] = compare_url
build_log_json['commit_message'] = commit_message
return build_log_json
[docs] def update_project_json(self, commit_id, job, repo_name, repo_owner):
project_json_key = 'u/{0}/{1}/project.json'.format(repo_owner, repo_name)
project_json = self.cdn_handler.get_json(project_json_key)
project_json['user'] = repo_owner
project_json['repo'] = repo_name
project_json['repo_url'] = 'https://git.door43.org/{0}/{1}'.format(repo_owner, repo_name)
commit = {
'id': commit_id,
'created_at': job.created_at,
'status': job.status,
'success': job.success,
'started_at': None,
'ended_at': None
}
if 'commits' not in project_json:
project_json['commits'] = []
commits = []
for c in project_json['commits']:
if c['id'] != commit_id:
commits.append(c)
commits.append(commit)
project_json['commits'] = commits
project_file = os.path.join(self.base_temp_dir, 'project.json')
write_file(project_file, project_json)
self.cdn_handler.upload_file(project_file, project_json_key)
[docs] def upload_zip_file(self, commit_id, zip_filepath):
file_key = 'preconvert/{0}.zip'.format(commit_id)
self.logger.debug('Uploading {0} to {1}/{2}...'.format(zip_filepath, self.pre_convert_bucket, file_key))
try:
self.preconvert_handler.upload_file(zip_filepath, file_key)
except Exception as e:
self.logger.error('Failed to upload zipped repo up to server')
self.logger.exception(e)
finally:
self.logger.debug('finished.')
return file_key
[docs] def get_repo_files(self, commit_url, repo_name):
temp_dir = tempfile.mkdtemp(dir=self.base_temp_dir, prefix='{0}_'.format(repo_name))
self.download_repo(commit_url, temp_dir)
repo_dir = os.path.join(temp_dir, repo_name.lower())
if not os.path.isdir(repo_dir):
repo_dir = temp_dir
return repo_dir
[docs] def send_job_request_to_tx_manager(self, commit_id, file_key, rc, repo_name, repo_owner,
count=0, part=0, book=None, warnings=None):
source_url = self.source_url_base + "/" + file_key
callback_url = self.api_url + '/client/callback'
tx_manager_job_url = self.api_url + '/tx/job'
identifier = self.create_new_identifier(repo_owner, repo_name, commit_id, count, part, book)
payload = {
"identifier": identifier,
"gogs_user_token": self.gogs_user_token,
"resource_type": rc.resource.identifier,
"input_format": rc.resource.file_ext,
"output_format": "html",
"source": source_url,
"callback": callback_url,
"warning": warnings
}
return self.add_payload_to_tx_converter(callback_url, identifier, payload, rc, source_url, tx_manager_job_url)
[docs] def create_new_identifier(self, repo_owner, repo_name, commit_id, count=0, part=0, book=None):
if not count:
identifier = "{0}/{1}/{2}".format(repo_owner, repo_name,
commit_id) # The way to know which repo/commit goes to this job request
else: # if this is part of a multipart job
# The way to know which repo/commit goes to this job request
identifier = "{0}/{1}/{2}/{3}/{4}/{5}".format(repo_owner, repo_name, commit_id, count, part, book)
return identifier
[docs] def add_payload_to_tx_converter(self, callback_url, identifier, payload, rc, source_url, tx_manager_job_url):
headers = {"content-type": "application/json"}
self.logger.debug('Making request to tX-Manager URL {0} with payload:'.format(tx_manager_job_url))
# remove token from printout, so it will not show in integration testing logs on Travis, etc.
log_payload = payload.copy()
log_payload["gogs_user_token"] = "DUMMY"
self.logger.debug(log_payload)
response = requests.post(tx_manager_job_url, json=payload, headers=headers)
self.logger.debug('finished.')
# Fake job in case tx-manager returns an error, can still build the build_log.json
job = TxJob({
'identifier': identifier,
'resource_type': rc.resource.identifier,
'input_format': rc.resource.file_ext,
'output_format': 'html',
'source': source_url,
'callback': callback_url,
'message': 'Conversion started...',
'status': 'requested',
'success': None,
'created_at': datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
'log': [],
'warnings': [],
'errors': []
}, db_handler=self.job_db_handler)
if response.status_code != requests.codes.ok:
job.status = 'failed'
job.success = False
job.message = 'Failed to convert'
if response.text:
# noinspection PyBroadException
try:
json_data = json.loads(response.text)
if 'errorMessage' in json_data:
error = json_data['errorMessage']
if error.startswith('Bad Request: '):
error = error[len('Bad Request: '):]
job.errors.append(error)
except:
pass
else:
json_data = json.loads(response.text)
if 'job' not in json_data:
job.status = 'failed'
job.success = False
job.message = 'Failed to convert'
job.errors.append('tX Manager did not return any info about the job request.')
else:
job.populate(json_data['job'])
return identifier, job
[docs] def send_lint_request_to_run_linter(self, job, rc, commit_url):
payload = {
'data': {
'job_id': job.job_id,
'commit_data': self.commit_data,
'rc': rc.as_dict(),
},
'vars': {
'prefix': self.prefix,
}
}
if job.resource_type in BIBLE_RESOURCE_TYPES or job.resource_type == 'obs':
# Need to give the massaged source since it maybe was in chunks originally
payload['data']['source_url'] = job.source
else:
payload['data']['source_url'] = commit_url.replace('commit', 'archive') + '.zip'
return self.send_payload_to_run_linter(payload)
[docs] def send_payload_to_run_linter(self, payload):
self.logger.debug('Making request linter lambda with payload:')
self.logger.debug(payload)
response = self.lambda_handler.invoke(function_name=self.run_linter_function, payload=payload)
self.logger.debug('finished.')
if 'Payload' in response:
return json.loads(response['Payload'].read())
else:
return {'success': False, 'warnings': []}
[docs] def download_repo(self, commit_url, repo_dir):
"""
Downloads and unzips a git repository from Github or git.door43.org
:param str|unicode commit_url: The URL of the repository to download
:param str|unicode repo_dir: The directory where the downloaded file should be unzipped
:return: None
"""
repo_zip_url = commit_url.replace('commit', 'archive') + '.zip'
repo_zip_file = os.path.join(self.base_temp_dir, repo_zip_url.rpartition(os.path.sep)[2])
try:
self.logger.debug('Downloading {0}...'.format(repo_zip_url))
# if the file already exists, remove it, we want a fresh copy
if os.path.isfile(repo_zip_file):
os.remove(repo_zip_file)
download_file(repo_zip_url, repo_zip_file)
finally:
self.logger.debug('finished.')
try:
self.logger.debug('Unzipping {0}...'.format(repo_zip_file))
unzip(repo_zip_file, repo_dir)
finally:
self.logger.debug('finished.')
# clean up the downloaded zip file
if os.path.isfile(repo_zip_file):
os.remove(repo_zip_file)