Source code for converters.converter

from __future__ import print_function, unicode_literals
import os
import tempfile
from libraries.aws_tools.s3_handler import S3Handler
from libraries.general_tools.url_utils import download_file
from libraries.general_tools.file_utils import unzip, add_contents_to_zip, remove_tree, remove
from shutil import copy
import logging
from convert_logger import ConvertLogger
from abc import ABCMeta, abstractmethod


[docs]class Converter(object): __metaclass__ = ABCMeta EXCLUDED_FILES = ["license.md", "package.json", "project.json", 'readme.md'] def __init__(self, source, resource, cdn_bucket=None, cdn_file=None, options=None): """ :param string source: :param string resource: :param string cdn_bucket: :param string cdn_file: :param dict options: """ self.log = ConvertLogger() self.options = {} self.source = source self.resource = resource self.cdn_bucket = cdn_bucket self.cdn_file = cdn_file self.options = options self.logger = logging.getLogger('converter') self.logger.addHandler(logging.NullHandler()) if not self.options: self.options = {} self.download_dir = tempfile.mkdtemp(prefix='download_') self.files_dir = tempfile.mkdtemp(prefix='files_') self.input_zip_file = None # If set, won't download the repo archive. Used for testing self.output_dir = tempfile.mkdtemp(prefix='output_') self.output_zip_file = tempfile.mktemp(prefix="{0}_".format(resource), suffix='.zip')
[docs] def close(self): """delete temp files""" remove_tree(self.download_dir) remove_tree(self.files_dir) remove_tree(self.output_dir) remove(self.output_zip_file)
@abstractmethod
[docs] def convert(self): """ Dummy function for converters. Returns true if the resource could be converted :return bool: """ raise NotImplementedError()
[docs] def run(self): """ Call the converters """ success = False try: if not self.input_zip_file or not os.path.exists(self.input_zip_file): # No input zip file yet, so we need to download the archive self.download_archive() # unzip the input archive self.logger.debug("Unzipping {0} to {1}".format(self.input_zip_file, self.files_dir)) unzip(self.input_zip_file, self.files_dir) # convert method called self.logger.debug("Converting files...") if self.convert(): self.logger.debug("Was able to convert {0}".format(self.resource)) # zip the output dir to the output archive self.logger.debug("Adding files in {0} to {1}".format(self.output_dir, self.output_zip_file)) add_contents_to_zip(self.output_zip_file, self.output_dir) remove_tree(self.output_dir) # upload the output archive either to cdn_bucket or to a file (no cdn_bucket) self.logger.debug("Uploading archive to {0}/{1}".format(self.cdn_bucket, self.cdn_file)) self.upload_archive() remove(self.output_zip_file) self.logger.debug("Uploaded") success = True else: self.log.error('Resource {0} currently not supported.'.format(self.resource)) except Exception as e: self.log.error('Conversion process ended abnormally: {0}'.format(e.message)) result = { 'success': success and len(self.log.logs['error']) == 0, 'info': self.log.logs['info'], 'warnings': self.log.logs['warning'], 'errors': self.log.logs['error'] } self.logger.debug(result) return result
[docs] def download_archive(self): archive_url = self.source filename = self.source.rpartition('/')[2] self.input_zip_file = os.path.join(self.download_dir, filename) if not os.path.isfile(self.input_zip_file): try: download_file(archive_url, self.input_zip_file) finally: if not os.path.isfile(self.input_zip_file): raise Exception("Failed to download {0}".format(archive_url))
[docs] def upload_archive(self): if self.cdn_bucket: cdn_handler = S3Handler(self.cdn_bucket) cdn_handler.upload_file(self.output_zip_file, self.cdn_file) elif self.cdn_file and os.path.isdir(os.path.dirname(self.cdn_file)): copy(self.output_zip_file, self.cdn_file)