# -*- coding: utf-8 -*-
"""
:summary: All data export, import is done through here.
:author: Francis.horsman@gmail.com
"""
import copy
import os
import pickle
from abc import abstractmethod, ABCMeta
import yaml
import msgpack
import six
from .defaults import DEFAULT_EXPORT_FORMAT, ExportFormat, OptionsKey
from .errors import UnknownExportCodec, ExporterConfigurationError, \
ImportFailure
from .helper import Helper
from .utils import iExporter, json, EventLevel, event_creator
_SUPPORTED_CODECS = {}
class CodecRegistry(type):
def __new__(mcs, name, bases, dct):
codec_format = dct.get('CODEC_FORMAT', None)
if not bases == (object,) and not codec_format:
raise ExporterConfigurationError(
'%s is not configured correctly, it must specify a'
'CODEC_FORMAT class attribute' % name)
impl = dct.get('CODEC_IMPL')
if impl is not None:
dct['impl'] = impl
cls = super(CodecRegistry, mcs).__new__(mcs, name, bases, dct)
if name != 'ExporterCodec':
_SUPPORTED_CODECS[codec_format] = cls
return cls
@six.add_metaclass(CodecRegistry)
@six.add_metaclass(ABCMeta)
class ExporterCodec(object):
@abstractmethod
def loads(self, d, **kwargs):
raise NotImplementedError
@abstractmethod
def dumps(self, d, **kwargs):
raise NotImplementedError
[docs]class JsonCodec(ExporterCodec):
"""
Codec designed to import and export using JSON.
"""
CODEC_IMPL = json
CODEC_FORMAT = ExportFormat.JSON
@staticmethod
def loads(d, **kwargs):
return JsonCodec.impl.loads(d, **kwargs)
@staticmethod
def dumps(d, **kwargs):
return JsonCodec.impl.dumps(d, **kwargs)
[docs]class PickleCodec(ExporterCodec):
"""
Codec designed to import and export using PICKLE.
"""
CODEC_IMPL = pickle
CODEC_FORMAT = ExportFormat.PICKLE
@staticmethod
def loads(d, **kwargs):
return PickleCodec.impl.loads(d, **kwargs)
@staticmethod
def dumps(d, **kwargs):
return PickleCodec.impl.dumps(d, **kwargs)
[docs]class YamlCodec(ExporterCodec):
"""
Codec designed to import and export using YAML.
"""
CODEC_IMPL = yaml
CODEC_FORMAT = ExportFormat.YAML
@staticmethod
def loads(d, **kwargs):
return YamlCodec.impl.load(d, **kwargs)
@staticmethod
def dumps(d, **kwargs):
return YamlCodec.impl.safe_dump(d, default_flow_style=False,
allow_unicode=True, **kwargs)
[docs]class MsgpackCodec(ExporterCodec):
"""
Codec designed to import and export using MSGPACK.
"""
CODEC_IMPL = msgpack
CODEC_FORMAT = ExportFormat.MSGPACK
@classmethod
def loads(cls, d, **kwargs):
return cls.impl.unpackb(d, **kwargs)
@classmethod
def dumps(cls, d, **kwargs):
return cls.impl.packb(d, **kwargs)
class CodecFactory(object):
def __init__(self):
self.supported_codecs = copy.deepcopy(_SUPPORTED_CODECS)
def __call__(self, name):
return self.get_codec(name)
def get_codec(self, name):
try:
return self.supported_codecs[name]
except KeyError:
raise UnknownExportCodec(name)
class Exporter(Helper):
NAME = 'exporter'
def __init__(self, core, **kwargs):
Helper.__init__(self, core, **kwargs)
self.codec_factory = CodecFactory()
def export(self, export_path=None):
export_path = export_path or self.profile.misc.generate_rcfile
if export_path is not None:
self._export(export_path)
return True
@event_creator(NAME, EventLevel.ERROR, EventLevel.OK,
OptionsKey.GENERATE_RCFILE)
def _export(self, path):
self.dump(path, self.profile)
def load(self, where, codec=None, guess=False):
return self.loads(open(where, 'r').read(), codec=codec, guess=guess)
def loads(self, what, codec=None, guess=False):
if not guess:
codec = codec if codec is not None else ExportFormat.JSON
# Now call the codec to render the data.
return self.codec_factory.get_codec(codec).loads(what)
else:
if codec is not None:
return self.codec_factory.get_codec(codec).loads(what)
# Now try all the codecs until we succeed:
for codec in self.codec_factory.supported_codecs:
try:
return self.codec_factory.get_codec(codec).loads(what)
except Exception as err:
pass
raise ImportFailure('No codec found to decode: %s' % what)
def dump(self, where, what, codec=None):
txt = self.dumps(what, codec=codec)
path = os.path.realpath(where)
msg = 'exporting rcfile data to: %s' % path
self.executor.file_write(where, txt, msg)
def dumps(self, what, codec=None):
codec = codec if codec is not None else DEFAULT_EXPORT_FORMAT
# Get the data if supported:
if isinstance(what, iExporter):
what = getattr(what, '_export_data')
# Now call the codec to render the data.
return self.codec_factory.get_codec(codec).dumps(what)
def __str__(self):
supported_codecs = ', '.join(
sorted(self.codec_factory.supported_codecs.keys()))
return 'Exporter codecs supported: %s' % supported_codecs
if __name__ == '__main__': # pragma no cover
pass