Source code for freedom.core

# -*- coding: utf-8 -*-
"""
:summary: The Release process work-flow management.

:author: francis.horsman@gmail.com
"""

import collections
from datetime import datetime as dt
import itertools
import sys

from six import u

from .actions import Actions
from .builder import Builder
from .errors import InteractiveAbort, Exit
from .events import Events
from .execute import Executor
from .export import Exporter
from .handlers import Handlers
from .history import History
from .monitor import Monitor
from .profiles import NewProfile
from .publish import Publisher
from .task import Task
from .utils import get_env_str
from .vcs import Git


[docs]class releaser(object): """ The releaser work-flow. """ def __init__(self, logger=None, handlers=None, **kwargs): self.exporter = Exporter(self) self.profile = self._compile_profile(kwargs) self.monitor = Monitor(self) self.logger = logger or self.monitor.get_logger('releaser') self.logger.info(self._dump_args()) self.logger.info( 'working directory: %s' % self.profile.vcs.git_repo) self.logger.info('primary env: %s' % get_env_str()) self.handlers = Handlers(self, handlers) try: # Create important helpers: for helper in [Events(), Executor]: setattr(self, getattr(helper, 'NAME'), helper(self)) if self.exporter.export() is True: self.logger.warn('generated rcfile and exiting') raise Exit() self.task = Task(self) if not self._check_task_status(): return # Create secondary helpers: for helper in [Git, History, Builder, Publisher, Actions]: setattr(self, getattr(helper, 'NAME'), helper(self)) self.logger.debug(str(self.exporter)) self.logger.info('resultant env: %s' % get_env_str()) except InteractiveAbort as err: self._handle_interactive_abort(err) raise err finally: self.logger.debug('User loggers: %s' % getattr(self, Monitor.NAME).get_logger_names()) def _dump_args(self): return '\n'.join( itertools.chain(['Args:'], [(u(self.profile.dump))])) def _compile_profile(self, kwargs): return NewProfile.build(self, **kwargs) def _check_task_status(self): if self.task.finished: self.logger.warn( 'No actions to perform, Finished release processing.') if self.profile.monitor.verbose: sys.stdout.write('Nothing to do\n') return False return True def _handle_interactive_abort(self, err): err.releaser = self self.logger.fatal( 'FATAL error: Interactive session aborted the release process.') err.additional = 'See the log file: %s' % \ self.monitor.get_logger_filename() def execute(self): return self() def __call__(self): time_start = dt.utcnow() self.logger.info('Release commencing @ %s' % time_start.isoformat()) try: self.logger.warn('Starting release processing') try: collections.deque(self.task.run(), maxlen=0) finally: self.logger.warn('Finished release processing.') except InteractiveAbort as err: self._handle_interactive_abort(err) raise err finally: time_end = dt.utcnow() duration = time_end - time_start try: duration = '%s seconds' % duration.total_seconds() except: duration = str(duration) self.logger.info('Release finished @ %s, duration: %s' % ( dt.utcnow().isoformat(), duration))
if __name__ == '__main__': # pragma no cover pass

Related Topics