Source code for converters.converter

from __future__ import print_function, unicode_literals
import os
import tempfile
from aws_tools.s3_handler import S3Handler
from general_tools.url_utils import download_file
from general_tools.file_utils import unzip, add_contents_to_zip, remove_tree, remove
from shutil import copy
from convert_logger import ConvertLogger


[docs]class Converter(object): EXCLUDED_FILES = ["license.md", "manifest.json", "package.json", "project.json", 'readme.md'] def __init__(self, source, resource, cdn_bucket=None, cdn_file=None, options=None): self.logger = ConvertLogger() self.options = {} self.source = source self.resource = resource self.cdn_bucket = cdn_bucket self.cdn_file = cdn_file self.options = options if not self.options: self.options = {} self.download_dir = tempfile.mkdtemp(prefix='download_') self.files_dir = tempfile.mkdtemp(prefix='files_') self.input_zip_file = None # If set, won't download the repo archive. Used for testing self.output_dir = tempfile.mkdtemp(prefix='output_') self.output_zip_file = tempfile.mktemp(prefix="{0}_".format(resource), suffix='.zip')
[docs] def close(self): # delete temp files remove_tree(self.download_dir) remove_tree(self.files_dir) remove_tree(self.output_dir) remove(self.output_zip_file)
[docs] def run(self): # Custom converters need to add a `convert_<resource>(self)` method for every resource it converts convert_method = getattr(self, "convert_{0}".format(self.resource), None) if convert_method and callable(convert_method): try: if not self.input_zip_file or not os.path.exists(self.input_zip_file): # No input zip file yet, so we need to download the archive self.download_archive() # unzip the input archive unzip(self.input_zip_file, self.files_dir) # convert method called convert_method() # zip the output dir to the output archive add_contents_to_zip(self.output_zip_file, self.output_dir) # upload the output archive either to cdn_bucket or to a file (no cdn_bucket) self.upload_archive() except Exception as e: self.logger.error('Conversion process ended abnormally') else: self.logger.error('Resource "{0}" not currently supported'.format(self.resource)) return { 'status': len(self.logger.logs['error']) == 0, 'info': self.logger.logs['info'], 'warnings': self.logger.logs['warning'], 'errors': self.logger.logs['error'] }
[docs] def download_archive(self): archive_url = self.source filename = self.source.rpartition('/')[2] self.input_zip_file = os.path.join(self.download_dir, filename) if not os.path.isfile(self.input_zip_file): try: download_file(archive_url, self.input_zip_file) finally: if not os.path.isfile(self.input_zip_file): raise Exception("Failed to download {0}".format(archive_url))
[docs] def upload_archive(self): if self.cdn_bucket: cdn_handler = S3Handler(self.cdn_bucket) cdn_handler.upload_file(self.output_zip_file, self.cdn_file) elif self.cdn_file and os.path.isdir(os.path.dirname(self.cdn_file)): copy(self.output_zip_file, self.cdn_file)
[docs] def get_files(self): files = [] for root, dirs, filenames in os.walk(self.files_dir): for filename in filenames: if filename.lower() not in Converter.EXCLUDED_FILES: filepath = os.path.join(root, filename) files.append(filepath) return files