# -*- coding: utf-8 -*-
"""
:summary: All command execution for git, python and shell goes in here.
:author: Francis.horsman@gmail.com
"""
import sh
from sh import git, python
from .defaults import OPTIONS_YES
from .errors import InteractiveAbort, InteractiveDeny
from .helper import Helper
from .task import TaskState
from .utils import get_env_str, new_env, EventLevel, event_creator, \
EventData
def __dummy_tasks():
pass
_DUMMY_TASKS = __dummy_tasks
_DUMMY_TASKS.current_task = None
class UserInputEvent(EventData):
KEYS = ['action', 'command', 'message', 'response']
def __init__(self, action=None, command=None, message=None, response=None):
EventData.__init__(self,
dict(action=action, command=command, message=message,
response=response))
[docs]class Executor(Helper):
"""
The executor controls centralized access to all externals commands.
"""
NAME = 'executor'
def __init__(self, core, logger=None, **kwargs):
assert core.monitor
Helper.__init__(self, core, logger=logger, **kwargs)
def get_git_env(self, d=None, home=None):
d = {} if d is None else d
home = home if home is not None else self.profile.vcs.home
d.update(dict(HOME=home))
return new_env(d) if self.profile.build.git_propagate_env else d
def get_python_env(self, d=None, home=None):
d = {} if d is None else d
home = home if home is not None else self.profile.build.home
d.update(dict(HOME=home))
return new_env(d) if self.profile.build.python_propagate_env else d
def _get_user_input(self, msg):
return raw_input(msg).strip()
def _proceed(self, msg, fatal=True, **kwargs):
"""
Wait until we are allowed to proceed (if interactive release).
:param fatal: True - always raise InteractiveAbort on error,
False - always raise InteractiveDeny on error,
else - never raise,
"""
if self.profile.misc.interactive:
tasks = self.task or _DUMMY_TASKS
self.logger.warn('Interactive request to proceed to user: %s' % msg)
# Prompt the user:
attention = 'Execute the following command Y/n:'
t = ' '.join([self.profile.misc.interactive_prompt, attention, msg])
current_task = tasks.current_task
if current_task:
tasks.current_task.state = TaskState.RUNNING_WAITING_ON_INPUT
try:
handler = self.core.handlers.user_input or self._get_user_input
option = handler(t)
if not option or option[0] in OPTIONS_YES:
self.events.executor(level=EventLevel.OK,
**UserInputEvent(action='proceed',
command='user_input',
message=t,
response=option))
return
elif option:
if fatal is True:
self.logger.warn(
'Interactive user requests an abort '
'with message: %s' % option)
self.events.executor(level=EventLevel.FATAL,
**UserInputEvent(action='abort',
command='user_input',
message=t,
response=option))
raise InteractiveAbort(msg, response=option)
elif fatal is False:
self.logger.warn(
'Interactive user denies command execution '
'with message: %s' % option)
self.events.executor(level=EventLevel.OK,
**UserInputEvent(action='deny',
command='user_input',
message=t,
response=option))
raise InteractiveDeny(msg, response=option)
finally:
if current_task:
tasks.current_task.state = TaskState.RUNNING
[docs] def git(self, *args, **kwargs):
"""
Run a Git command.
:param args: Args passed directly to Git
:param kwargs: 'cwd' = str (see sh _cwd special command kwarg)
:param kwargs: 'env' = str (see sh _env special command kwarg)
:return: The result from executing the Git command.
"""
cwd = kwargs.get('cwd', None)
cwd = self.profile.vcs.git_repo if cwd is None else cwd
env = kwargs.get('env', None)
env = env if env is not None else self.get_git_env()
total_env = get_env_str(env)
my_kwargs = dict(_cwd=cwd, _env=env)
self.logger.debug('running Git with cwd: %s' % cwd)
self.logger.debug('running Git with env: %s' % total_env)
args_str = ' '.join(args)
self._proceed('git %s' % args_str)
command = args[0]
command_args = list(args[1:])
@event_creator(Executor.NAME, EventLevel.ERROR, EventLevel.OK, 'git',
dict(cwd=cwd, env=total_env, command=command,
args=command_args))
def _inner(self):
if self.profile.misc.dry_run:
self.logger.info(
'simulated: git %s (%s)' % (command, command_args))
else:
return git(*args, **my_kwargs)
return _inner(self)
[docs] def python(self, *args, **kwargs):
"""
Run a Python command.
:param args: Args passed directly to Python
:param kwargs: 'cwd' = str (see sh _cwd special command kwarg)
:param kwargs: 'env' = str (see sh _env special command kwarg)
:return: The result from executing the Python command.
"""
cwd = kwargs.get('cwd', None)
cwd = self.profile.vcs.git_repo if cwd is None else cwd
env = kwargs.get('env', None)
env = env if env is not None else self.get_python_env()
total_env = get_env_str(env)
my_kwargs = dict(_cwd=cwd, _env=env)
self.logger.debug('running Python with cwd: %s' % cwd)
self.logger.debug('running Python with env: %s' % total_env)
args_str = ' '.join(args)
self._proceed('python %s' % args_str)
filename = args[0]
command_args = list(args[1:])
@event_creator(Executor.NAME, EventLevel.ERROR, EventLevel.OK, 'python',
dict(cwd=cwd, env=total_env, filename=filename,
args=command_args))
def _inner(self):
if self.profile.misc.dry_run:
self.logger.info(
'simulated: python %s (%s)' % (filename, command_args))
else:
return python(*args, **my_kwargs)
return _inner(self)
[docs] def shell(self, *args, **kwargs):
"""
Run a Shell command.
:param args: Args passed directly to the Shell
:param kwargs: 'cwd' = str (see sh _cwd special command kwarg)
:param kwargs: 'env' = str (see sh _env special command kwarg)
:return: The result from executing the Shell command.
"""
cwd = kwargs.get('cwd', None)
cwd = self.profile.vcs.git_repo if cwd is None else cwd
env = kwargs.get('env', None)
total_env = get_env_str(env)
my_kwargs = dict(_cwd=cwd, _env=env)
self.logger.debug('running Shell with cwd: %s' % cwd)
self.logger.debug('running Shell with env: %s' % total_env)
args_str = ' '.join(args)
self._proceed('shell %s' % args_str)
command = args[0]
command_args = list(args[1:])
@event_creator(Executor.NAME, EventLevel.ERROR, EventLevel.OK, 'shell',
dict(cwd=cwd, env=total_env, command=command,
args=command_args))
def _inner(self):
if self.profile.misc.dry_run:
self.logger.info(
'simulated: %s (%s)' % (command, command_args))
else:
return sh(command)(*command_args, **my_kwargs)
return _inner(self)
def file_write(self, filename, payload, msg, **kwargs):
self._proceed(msg, **kwargs)
if self.profile.misc.dry_run:
self.logger.info('simulated: write to %s' % filename)
else:
open(filename, 'w').write(payload)
if __name__ == '__main__': # pragma no cover
pass