1
0
mirror of https://github.com/deadc0de6/dotdrop.git synced 2026-02-04 17:24:46 +00:00

Merge pull request #118 from davla/import-config

Import config feature
This commit is contained in:
deadc0de
2019-04-28 19:00:24 +02:00
committed by GitHub
7 changed files with 1081 additions and 98 deletions

View File

@@ -14,6 +14,7 @@ from dotdrop.logger import Logger
class Cmd:
eq_ignore = ('log',)
def __init__(self, key, action):
"""constructor
@@ -31,7 +32,17 @@ class Cmd:
return 'cmd({})'.format(self.__str__())
def __eq__(self, other):
return self.__dict__ == other.__dict__
self_dict = {
k: v
for k, v in self.__dict__.items()
if k not in self.eq_ignore
}
other_dict = {
k: v
for k, v in other.__dict__.items()
if k not in self.eq_ignore
}
return self_dict == other_dict
def __hash__(self):
return hash(self.key) ^ hash(self.action)

View File

@@ -5,9 +5,13 @@ Copyright (c) 2017, deadc0de6
yaml config file manager
"""
import yaml
import itertools
import os
import shlex
from functools import partial
from glob import iglob
import yaml
# local import
from dotdrop.dotfile import Dotfile
@@ -34,11 +38,15 @@ class Cfg:
key_imp_link = 'link_on_import'
key_dotfile_link = 'link_dotfile_default'
key_workdir = 'workdir'
# import keys
key_import_vars = 'import_variables'
key_import_actions = 'import_actions'
key_cmpignore = 'cmpignore'
key_upignore = 'upignore'
key_import_configs = 'import_configs'
# actions keys
key_actions = 'actions'
key_actions_pre = 'pre'
@@ -100,6 +108,7 @@ class Cfg:
raise ValueError('config file path undefined')
if not os.path.exists(cfgpath):
raise ValueError('config file does not exist: {}'.format(cfgpath))
# make sure to have an absolute path to config file
self.cfgpath = os.path.abspath(cfgpath)
self.debug = debug
@@ -148,6 +157,9 @@ class Cfg:
if not self._load_config(profile=profile):
raise ValueError('config is not valid')
def __eq__(self, other):
return self.cfgpath == other.cfgpath
def eval_dotfiles(self, profile, variables, debug=False):
"""resolve dotfiles src/dst/actions templating for this profile"""
t = Templategen(variables=variables)
@@ -220,34 +232,40 @@ class Cfg:
return False
# parse the profiles
self.lnk_profiles = self.content[self.key_profiles]
if self.lnk_profiles is None:
# ensures self.lnk_profiles is a dict
# ensures self.lnk_profiles is a dict
if not isinstance(self.content[self.key_profiles], dict):
self.content[self.key_profiles] = {}
self.lnk_profiles = self.content[self.key_profiles]
for k, v in self.lnk_profiles.items():
if not v:
continue
if self.key_profiles_dots in v and \
v[self.key_profiles_dots] is None:
# if has the dotfiles entry but is empty
# ensures it's an empty list
v[self.key_profiles_dots] = []
self.lnk_profiles = self.content[self.key_profiles]
for p in filter(bool, self.lnk_profiles.values()):
# Ensures that the dotfiles entry is an empty list when not given
# or none
p.setdefault(self.key_profiles_dots, [])
if p[self.key_profiles_dots] is None:
p[self.key_profiles_dots] = []
# make sure we have an absolute dotpath
self.curdotpath = self.lnk_settings[self.key_dotpath]
self.lnk_settings[self.key_dotpath] = \
self._abs_path(self.curdotpath)
self.lnk_settings[self.key_dotpath] = self._abs_path(self.curdotpath)
# make sure we have an absolute workdir
self.curworkdir = self.lnk_settings[self.key_workdir]
self.lnk_settings[self.key_workdir] = \
self._abs_path(self.curworkdir)
self.lnk_settings[self.key_workdir] = self._abs_path(self.curworkdir)
# load external variables/dynvariables
if self.key_import_vars in self.lnk_settings:
paths = self.lnk_settings[self.key_import_vars]
try:
paths = self.lnk_settings[self.key_import_vars] or []
self._load_ext_variables(paths, profile=profile)
except KeyError:
pass
# load global upignore
if self.key_upignore in self.lnk_settings:
self.upignores = self.lnk_settings[self.key_upignore] or []
# load global cmpignore
if self.key_cmpignore in self.lnk_settings:
self.cmpignores = self.lnk_settings[self.key_cmpignore] or []
# load global upignore
if self.key_upignore in self.lnk_settings:
@@ -258,42 +276,97 @@ class Cfg:
self.cmpignores = self.lnk_settings[self.key_cmpignore] or []
# parse external actions
if self.key_import_actions in self.lnk_settings:
for path in self.lnk_settings[self.key_import_actions]:
try:
ext_actions = self.lnk_settings[self.key_import_actions] or ()
for path in ext_actions:
path = self._abs_path(path)
if self.debug:
self.log.dbg('loading actions from {}'.format(path))
content = self._load_yaml(path)
if self.key_actions in content and \
content[self.key_actions] is not None:
self._load_actions(content[self.key_actions])
# If external actions are None, replaces them with empty dict
try:
external_actions = content[self.key_actions] or {}
self._load_actions(external_actions)
except KeyError:
pass
except KeyError:
pass
# parse external configs
try:
ext_configs = self.lnk_settings[self.key_import_configs] or ()
try:
iglob('./*', recursive=True)
find_glob = partial(iglob, recursive=True)
except TypeError:
from platform import python_version
msg = ('Recursive globbing is not available on Python {}: '
.format(python_version()))
if any('**' in config for config in ext_configs):
msg += "import_configs won't work"
self.log.err(msg)
return False
msg = 'upgrade to version >3.5 if you want to use this feature'
self.log.warn(msg)
find_glob = iglob
ext_configs = itertools.chain.from_iterable(
find_glob(self._abs_path(config))
for config in ext_configs
)
for config in ext_configs:
self._merge_cfg(config)
except KeyError:
pass
# parse local actions
if self.key_actions in self.content and \
self.content[self.key_actions] is not None:
self._load_actions(self.content[self.key_actions])
# If local actions are None, replaces them with empty dict
try:
local_actions = self.content[self.key_actions] or {}
self._load_actions(local_actions)
except KeyError:
pass
# parse read transformations
if self.key_trans_r in self.content and \
self.content[self.key_trans_r] is not None:
for k, v in self.content[self.key_trans_r].items():
self.trans_r[k] = Transform(k, v)
# If read transformations are None, replaces them with empty dict
try:
read_trans = self.content[self.key_trans_r] or {}
self.trans_r.update({
k: Transform(k, v)
for k, v
in read_trans.items()
})
except KeyError:
pass
# parse write transformations
if self.key_trans_w in self.content and \
self.content[self.key_trans_w] is not None:
for k, v in self.content[self.key_trans_w].items():
self.trans_w[k] = Transform(k, v)
# If write transformations are None, replaces them with empty dict
try:
read_trans = self.content[self.key_trans_w] or {}
self.trans_w.update({
k: Transform(k, v)
for k, v
in read_trans.items()
})
except KeyError:
pass
# parse the dotfiles
# and construct the dict of objects per dotfile key
if not self.content[self.key_dotfiles]:
# ensures the dotfiles entry is a dict
# parse the dotfiles and construct the dict of objects per dotfile key
# ensures the dotfiles entry is a dict
if not isinstance(self.content[self.key_dotfiles], dict):
self.content[self.key_dotfiles] = {}
for k in self.content[self.key_dotfiles].keys():
v = self.content[self.key_dotfiles][k]
src = os.path.normpath(v[self.key_dotfiles_src])
dotfiles = self.content[self.key_dotfiles]
noempty_default = self.lnk_settings[self.key_ignoreempty]
dotpath = self.content['config']['dotpath']
for k, v in dotfiles.items():
src = v[self.key_dotfiles_src]
if dotpath not in src:
src = os.path.join(dotpath, src)
src = os.path.normpath(self._abs_path(src))
dst = os.path.normpath(v[self.key_dotfiles_dst])
# Fail if both `link` and `link_children` present
@@ -306,7 +379,7 @@ class Cfg:
# fix it
v = self._fix_dotfile_link(k, v)
self.content[self.key_dotfiles][k] = v
dotfiles[k] = v
# get link type
link = self._get_def_link()
@@ -314,13 +387,10 @@ class Cfg:
link = self._string_to_linktype(v[self.key_dotfiles_link])
# get ignore empty
noempty = v[self.key_dotfiles_noempty] if \
self.key_dotfiles_noempty \
in v else self.lnk_settings[self.key_ignoreempty]
noempty = v.get(self.key_dotfiles_noempty, noempty_default)
# parse actions
itsactions = v[self.key_dotfiles_actions] if \
self.key_dotfiles_actions in v else []
itsactions = v.get(self.key_dotfiles_actions, [])
actions = self._parse_actions(itsactions, profile=profile)
if self.debug:
self.log.dbg('action for {}'.format(k))
@@ -329,8 +399,7 @@ class Cfg:
self.log.dbg('- {}: {}'.format(t, action))
# parse read transformation
itstrans_r = v[self.key_dotfiles_trans_r] if \
self.key_dotfiles_trans_r in v else None
itstrans_r = v.get(self.key_dotfiles_trans_r)
trans_r = None
if itstrans_r:
if type(itstrans_r) is list:
@@ -347,8 +416,7 @@ class Cfg:
return False
# parse write transformation
itstrans_w = v[self.key_dotfiles_trans_w] if \
self.key_dotfiles_trans_w in v else None
itstrans_w = v.get(self.key_dotfiles_trans_w)
trans_w = None
if itstrans_w:
if type(itstrans_w) is list:
@@ -373,13 +441,11 @@ class Cfg:
trans_w = None
# parse cmpignore pattern
cmpignores = v[self.key_dotfiles_cmpignore] if \
self.key_dotfiles_cmpignore in v else []
cmpignores = v.get(self.key_dotfiles_cmpignore, [])
cmpignores.extend(self.cmpignores)
# parse upignore pattern
upignores = v[self.key_dotfiles_upignore] if \
self.key_dotfiles_upignore in v else []
upignores = v.get(self.key_dotfiles_upignore, [])
upignores.extend(self.upignores)
# create new dotfile
@@ -390,16 +456,14 @@ class Cfg:
upignore=upignores)
# assign dotfiles to each profile
for k, v in self.lnk_profiles.items():
self.prodots[k] = []
if not v:
self.prodots = {k: [] for k in self.lnk_profiles.keys()}
for name, profile in self.lnk_profiles.items():
if not profile:
continue
if self.key_profiles_dots not in v:
# ensures is a list
v[self.key_profiles_dots] = []
if not v[self.key_profiles_dots]:
dots = profile[self.key_profiles_dots]
if not dots:
continue
dots = v[self.key_profiles_dots]
if self.key_all in dots:
# add all if key ALL is used
self.prodots[k] = list(self.dotfiles.values())
@@ -410,10 +474,11 @@ class Cfg:
msg = 'unknown dotfile \"{}\" for {}'.format(d, k)
self.log.err(msg)
continue
self.prodots[k].append(self.dotfiles[d])
self.prodots[name].append(self.dotfiles[d])
profile_names = self.lnk_profiles.keys()
# handle "import" (from file) for each profile
for k in self.lnk_profiles.keys():
for k in profile_names:
dots = self._get_imported_dotfiles_keys(k)
for d in dots:
if d not in self.dotfiles:
@@ -423,23 +488,124 @@ class Cfg:
self.prodots[k].append(self.dotfiles[d])
# handle "include" (from other profile) for each profile
for k in self.lnk_profiles.keys():
for k in profile_names:
ret, dots = self._get_included_dotfiles(k)
if not ret:
return False
self.prodots[k].extend(dots)
# remove duplicates if any
for k in self.lnk_profiles.keys():
self.prodots[k] = list(set(self.prodots[k]))
self.prodots = {k: list(set(v)) for k, v in self.prodots.items()}
# print dotfiles for each profile
if self.debug:
for k in self.lnk_profiles.keys():
df = ','.join([d.key for d in self.prodots[k]])
df = ','.join(d.key for d in self.prodots[k])
self.log.dbg('dotfiles for \"{}\": {}'.format(k, df))
return True
def _merge_dict(self, ext_config, warning_prefix, self_member,
ext_member=None, traceback=False):
"""Merge into self a dictionary instance members from an external Cfg.
This method merges instance members of another Cfg instance into self.
It issues a warning for any key shared between self and the other Cfg.
It can adds an own=False porperty to any dictionary in the external
instance member before merging.
:param ext_config: The other Cfg to merge from.
:type ext_config: Cfg
:param warnign_prefix: The prefix to th warning messages.
:type warning_prefix: str
:param self_member: The member of self which will be augmented by the
external member. Or the self_member name as a string.
:type self_member: dict or str
:param ext_member: The member of ext_config which wil be merged in
self_member. When not given, self_member is assumed to be a string,
and self_member and ext_member are supposed to have the same name.
:type ext_member: dict or None
:param traceback: Whether to add own=False to ext_member dict values
before merging in.
:type traceback: bool
"""
if ext_member is None:
member_name = self_member
self_member = getattr(self, member_name)
ext_member = getattr(ext_config, member_name)
common_keys = (
key
for key in (set(self_member.keys())
.intersection(set(ext_member.keys())))
if not key.startswith('_') # filtering out internal variables
)
warning_msg = ('%s {} defined both in %s and %s: {} in %s used'
% (warning_prefix, self.cfgpath, ext_config.cfgpath,
self.cfgpath))
for key in common_keys:
self.log.warn(warning_msg.format(key, key))
if traceback:
# Assumes v to be a dict. So far it's only used for profiles,
# that are in fact dicts
merged = {
k: dict(v, own=False)
for k, v in ext_member.items()
}
else:
merged = ext_member.copy()
merged.update(self_member)
self_member.update(merged)
return self_member
def _merge_cfg(self, config_path):
"""Merge an external config.yaml file into self."""
# Parsing external config file
try:
ext_config = Cfg(config_path)
except ValueError:
raise ValueError(
'external config file not found: {}'.format(config_path))
# Merging in members from the external config file
self._merge_dict(ext_config=ext_config, warning_prefix='Dotfile',
self_member='dotfiles')
self._merge_dict(ext_config=ext_config, warning_prefix='Profile',
self_member='lnk_profiles', traceback=True)
self._merge_dict(ext_config=ext_config, warning_prefix='Action',
self_member='actions')
self._merge_dict(ext_config=ext_config,
warning_prefix='Transformation',
self_member='trans_r')
self._merge_dict(ext_config=ext_config,
warning_prefix='Write transformation',
self_member='trans_w')
self._merge_dict(ext_config=ext_config, warning_prefix='Profile',
self_member='prodots')
# variables are merged in ext_*variables so as not to be added in
# self.content. This needs an additional step to account for imported
# variables sharing a key with the ones defined in self.content.
variables = {
k: v
for k, v in ext_config._get_variables(None).items()
if k not in self.content[self.key_variables]
}
dyn_variables = {
k: v
for k, v in ext_config._get_dynvariables(None).items()
if k not in self.content[self.key_dynvariables]
}
self._merge_dict(ext_config=ext_config, warning_prefix='Variable',
self_member=self.ext_variables,
ext_member=variables)
self._merge_dict(ext_config=ext_config,
warning_prefix='Dynamic variable',
self_member=self.ext_dynvariables,
ext_member=dyn_variables)
def _load_ext_variables(self, paths, profile=None):
"""load external variables"""
variables = {}
@@ -707,11 +873,22 @@ class Cfg:
v[self.key_dotfiles_link] = new
return v
@classmethod
def _filter_not_own(cls, content):
"""Filters out from a dict its dict values with own=False."""
# This way it recursively explores only dicts. Since own=False is used
# only in profiles, which are in fact dicts, this is fine for now.
return {
k: cls._filter_not_own(v) if isinstance(v, dict) else v
for k, v in content.items()
if not isinstance(v, dict) or v.get('own', True)
}
def _save(self, content, path):
"""writes the config to file"""
ret = False
with open(path, 'w') as f:
ret = yaml.safe_dump(content, f,
ret = yaml.safe_dump(self._filter_not_own(content), f,
default_flow_style=False,
indent=2)
if ret:
@@ -782,9 +959,12 @@ class Cfg:
def _dotfile_exists(self, dotfile):
"""return True and the existing dotfile key
if it already exists, False and a new unique key otherwise"""
dsts = [(k, d.dst) for k, d in self.dotfiles.items()]
if dotfile.dst in [x[1] for x in dsts]:
return True, [x[0] for x in dsts if x[1] == dotfile.dst][0]
try:
return True, next(key
for key, d in self.dotfiles.items()
if d.dst == dotfile.dst)
except StopIteration:
pass
# return key for this new dotfile
path = os.path.expanduser(dotfile.dst)
keys = self.dotfiles.keys()

View File

@@ -34,6 +34,8 @@ PYTHONPATH=dotdrop ${nosebin} -s --with-coverage --cover-package=dotdrop
#PYTHONPATH=dotdrop python3 -m pytest tests
## execute bash script tests
for scr in tests-ng/*.sh; do
${scr}
done
[ "$1" = '--python-only' ] || {
for scr in tests-ng/*.sh; do
${scr}
done
}

View File

@@ -5,10 +5,13 @@ helpers for the unittests
"""
import os
import random
import shutil
import string
import random
import tempfile
from unittest import TestCase
import yaml
from dotdrop.options import Options, ENV_NODEBUG
from dotdrop.linktypes import LinkTypes
@@ -17,6 +20,30 @@ from dotdrop.utils import strip_home
TMPSUFFIX = '-dotdrop-tests'
class SubsetTestCase(TestCase):
def assertIsSubset(self, sub, sup):
for subKey, subValue in sub.items():
self.assertIn(subKey, sup)
supValue = sup[subKey]
if isinstance(subValue, str):
self.assertEqual(subValue, supValue)
continue
if isinstance(subValue, dict):
self.assertIsSubset(subValue, supValue)
continue
try:
iter(subValue)
self.assertTrue(all(
subItem in supValue
for subItem in subValue
))
except TypeError:
self.assertEqual(subValue, supValue)
def clean(path):
"""Delete file or directory"""
if not os.path.exists(path):
@@ -121,7 +148,6 @@ def load_options(confpath, profile):
o = Options(args=args)
o.profile = profile
o.dry = False
o.profile = profile
o.safe = True
o.install_diff = True
o.import_link = LinkTypes.NOLINK
@@ -149,8 +175,15 @@ def get_dotfile_from_yaml(dic, path):
return [d for d in dotfiles.values() if d['src'] == src][0]
def yaml_dashed_list(items, indent=0):
return ('\n'.join('{}- {}'.format(' ' * indent, item) for item in items)
+ '\n')
def create_fake_config(directory, configname='config.yaml',
dotpath='dotfiles', backup=True, create=True):
dotpath='dotfiles', backup=True, create=True,
import_configs=(), import_actions=(),
import_variables=()):
"""Create a fake config file"""
path = os.path.join(directory, configname)
workdir = os.path.join(directory, 'workdir')
@@ -160,7 +193,73 @@ def create_fake_config(directory, configname='config.yaml',
f.write(' create: {}\n'.format(str(create)))
f.write(' dotpath: {}\n'.format(dotpath))
f.write(' workdir: {}\n'.format(workdir))
if import_actions:
f.write(' import_actions:\n')
f.write(yaml_dashed_list(import_actions, 4))
if import_configs:
f.write(' import_configs:\n')
f.write(yaml_dashed_list(import_configs, 4))
if import_variables:
f.write(' import_variables:\n')
f.write(yaml_dashed_list(import_variables, 4))
f.write('dotfiles:\n')
f.write('profiles:\n')
f.write('actions:\n')
return path
def create_yaml_keyval(pairs, parent_dir=None, top_key=None):
if top_key:
pairs = {top_key: pairs}
if not parent_dir:
parent_dir = get_tempdir()
fd, file_name = tempfile.mkstemp(dir=parent_dir, suffix='.yaml', text=True)
with os.fdopen(fd, 'w') as f:
yaml.safe_dump(pairs, f)
return file_name
def populate_fake_config(config, dotfiles={}, profiles={}, actions={},
trans={}, trans_write={}, variables={},
dynvariables={}):
"""Adds some juicy content to config files"""
is_path = isinstance(config, str)
if is_path:
config_path = config
with open(config_path) as config_file:
config = yaml.safe_load(config_file)
config['dotfiles'] = dotfiles
config['profiles'] = profiles
config['actions'] = actions
config['trans'] = trans
config['trans_write'] = trans_write
config['variables'] = variables
config['dynvariables'] = dynvariables
if is_path:
with open(config_path, 'w') as config_file:
yaml.safe_dump(config, config_file, default_flow_style=False,
indent=2)
def file_in_yaml(yaml_file, path, link=False):
"""Return whether path is in the given yaml file as a dotfile."""
strip = get_path_strip_version(path)
if isinstance(yaml_file, str):
with open(yaml_file) as f:
yaml_conf = yaml.safe_load(f)
else:
yaml_conf = yaml_file
dotfiles = yaml_conf['dotfiles'].values()
in_src = strip in (x['src'] for x in dotfiles)
in_dst = path in (os.path.expanduser(x['dst']) for x in dotfiles)
if link:
has_link = get_dotfile_from_yaml(yaml_conf, path)['link']
return in_src and in_dst and has_link
return in_src and in_dst

View File

@@ -13,17 +13,19 @@ import yaml
from dotdrop.config import Cfg
from dotdrop.options import Options
from dotdrop.linktypes import LinkTypes
from tests.helpers import get_tempdir, clean, \
create_fake_config, _fake_args
from tests.helpers import (SubsetTestCase, _fake_args, clean,
create_fake_config, create_yaml_keyval, get_tempdir,
populate_fake_config)
class TestConfig(unittest.TestCase):
class TestConfig(SubsetTestCase):
CONFIG_BACKUP = False
CONFIG_CREATE = True
CONFIG_DOTPATH = 'dotfiles'
TMPSUFFIX = '.dotdrop'
CONFIG_NAME = 'config.yaml'
CONFIG_NAME_2 = 'config-2.yaml'
def test_config(self):
"""Test the config class"""
@@ -199,6 +201,394 @@ profiles:
conf = Cfg(confpath)
self.assertTrue(conf is not None)
def test_import_configs_merge(self):
"""Test import_configs when all config keys merge."""
tmp = get_tempdir()
self.assertTrue(os.path.exists(tmp))
self.addCleanup(clean, tmp)
vars_ed = {
'variables': {
'a_var_ed': '33',
},
'dynvariables': {
'a_dynvar_ed': 'echo 33',
},
}
vars_ing = {
'variables': {
'a_var_ing': 'dd',
},
'dynvariables': {
'a_dynvar_ing': 'echo dd',
},
}
vars_ed_file = create_yaml_keyval(vars_ed, tmp)
vars_ing_file = create_yaml_keyval(vars_ing, tmp)
actions_ed = {
'pre': {
'a_pre_action_ed': 'echo pre 22',
},
'post': {
'a_post_action_ed': 'echo post 22',
},
'a_action_ed': 'echo 22',
}
actions_ing = {
'pre': {
'a_pre_action_ing': 'echo pre aa',
},
'post': {
'a_post_action_ing': 'echo post aa',
},
'a_action_ing': 'echo aa',
}
actions_ed_file = create_yaml_keyval(actions_ed, tmp)
actions_ing_file = create_yaml_keyval(actions_ing, tmp)
imported = {
'config': {
'dotpath': 'importing',
'import_variables': [vars_ed_file],
'import_actions': [actions_ed_file],
},
'dotfiles': {
'f_vimrc': {'dst': '~/.vimrc', 'src': 'vimrc'},
},
'profiles': {
'host1': {
'dotfiles': ['f_vimrc'],
},
},
'actions': {
'pre': {
'a_pre_log_ed': 'echo pre 2',
},
'post': {
'a_post_log_ed': 'echo post 2',
},
'a_log_ed': 'echo 2',
},
'trans': {
't_log_ed': 'echo 3',
},
'trans_write': {
'tw_log_ed': 'echo 4',
},
'variables': {
'v_log_ed': '42',
},
'dynvariables': {
'dv_log_ed': 'echo 5',
},
}
importing = {
'config': {
'dotpath': 'importing',
'import_variables': [vars_ing_file],
'import_actions': [actions_ing_file],
},
'dotfiles': {
'f_xinitrc': {'dst': '~/.xinitrc', 'src': 'xinitrc'},
},
'profiles': {
'host2': {
'dotfiles': ['f_xinitrc'],
'include': ['host1'],
},
},
'actions': {
'pre': {
'a_pre_log_ing': 'echo pre a',
},
'post': {
'a_post_log_ing': 'echo post a',
},
'a_log_ing': 'echo a',
},
'trans': {
't_log_ing': 'echo b',
},
'trans_write': {
'tw_log_ing': 'echo c',
},
'variables': {
'v_log_ing': 'd',
},
'dynvariables': {
'dv_log_ing': 'echo e',
},
}
# create the imported base config file
imported_path = create_fake_config(tmp,
configname=self.CONFIG_NAME_2,
**imported['config'])
# create the importing base config file
importing_path = create_fake_config(tmp,
configname=self.CONFIG_NAME,
import_configs=('config-*.yaml',),
**importing['config'])
# edit the imported config
populate_fake_config(imported_path, **{
k: v
for k, v in imported.items()
if k != 'config'
})
# edit the importing config
populate_fake_config(importing_path, **{
k: v
for k, v in importing.items()
if k != 'config'
})
# do the tests
importing_cfg = Cfg(importing_path)
imported_cfg = Cfg(imported_path)
self.assertIsNotNone(importing_cfg)
self.assertIsNotNone(imported_cfg)
# test profiles
self.assertIsSubset(imported_cfg.lnk_profiles,
importing_cfg.lnk_profiles)
# test dotfiles
self.assertIsSubset(imported_cfg.dotfiles, importing_cfg.dotfiles)
# test actions
self.assertIsSubset(imported_cfg.actions['pre'],
importing_cfg.actions['pre'])
self.assertIsSubset(imported_cfg.actions['post'],
importing_cfg.actions['post'])
# test transactions
self.assertIsSubset(imported_cfg.trans_r, importing_cfg.trans_r)
self.assertIsSubset(imported_cfg.trans_w, importing_cfg.trans_w)
# test variables
imported_vars = {
k: v
for k, v in imported_cfg.get_variables(None).items()
if not k.startswith('_')
}
importing_vars = {
k: v
for k, v in importing_cfg.get_variables(None).items()
if not k.startswith('_')
}
self.assertIsSubset(imported_vars, importing_vars)
# test prodots
self.assertIsSubset(imported_cfg.prodots, importing_cfg.prodots)
def test_import_configs_override(self):
"""Test import_configs when some config keys overlap."""
tmp = get_tempdir()
self.assertTrue(os.path.exists(tmp))
self.addCleanup(clean, tmp)
vars_ed = {
'variables': {
'a_var': '33',
},
'dynvariables': {
'a_dynvar': 'echo 33',
},
}
vars_ing = {
'variables': {
'a_var': 'dd',
},
'dynvariables': {
'a_dynvar': 'echo dd',
},
}
vars_ed_file = create_yaml_keyval(vars_ed, tmp)
vars_ing_file = create_yaml_keyval(vars_ing, tmp)
actions_ed = {
'pre': {
'a_pre_action': 'echo pre 22',
},
'post': {
'a_post_action': 'echo post 22',
},
'a_action': 'echo 22',
}
actions_ing = {
'pre': {
'a_pre_action': 'echo pre aa',
},
'post': {
'a_post_action': 'echo post aa',
},
'a_action': 'echo aa',
}
actions_ed_file = create_yaml_keyval(actions_ed, tmp)
actions_ing_file = create_yaml_keyval(actions_ing, tmp)
imported = {
'config': {
'dotpath': 'imported',
'backup': False,
'import_variables': [vars_ed_file],
'import_actions': [actions_ed_file],
},
'dotfiles': {
'f_vimrc': {'dst': '~/.vimrc', 'src': 'vimrc'},
'f_xinitrc': {'dst': '~/.xinitrc', 'src': 'xinitrc',
'link': 'link'},
},
'profiles': {
'host1': {
'dotfiles': ['f_vimrc'],
},
'host2': {
'dotfiles': ['f_xinitrc'],
},
},
'actions': {
'pre': {
'a_pre_log': 'echo pre 2',
},
'post': {
'a_post_log': 'echo post 2',
},
'a_log': 'echo 2',
},
'trans': {
't_log': 'echo 3',
},
'trans_write': {
'tw_log': 'echo 4',
},
'variables': {
'v_log': '42',
},
'dynvariables': {
'dv_log': 'echo 5',
},
}
importing = {
'config': {
'dotpath': 'importing',
'backup': True,
'import_variables': [vars_ing_file],
'import_actions': [actions_ing_file],
},
'dotfiles': {
'f_xinitrc': {'dst': '~/.xinitrc', 'src': 'xinitrc'},
},
'profiles': {
'host2': {
'dotfiles': ['f_xinitrc'],
'include': ['host1'],
},
},
'actions': {
'pre': {
'a_pre_log': 'echo pre a',
},
'post': {
'a_post_log': 'echo post a',
},
'a_log': 'echo a',
},
'trans': {
't_log': 'echo b',
},
'trans_write': {
'tw_log': 'echo c',
},
'variables': {
'v_log': 'd',
},
'dynvariables': {
'dv_log': 'echo e',
},
}
# create the imported base config file
imported_path = create_fake_config(tmp,
configname=self.CONFIG_NAME_2,
**imported['config'])
# create the importing base config file
importing_path = create_fake_config(tmp,
configname=self.CONFIG_NAME,
import_configs=(imported_path,),
**importing['config'])
# edit the imported config
populate_fake_config(imported_path, **{
k: v
for k, v in imported.items()
if k != 'config'
})
# edit the importing config
populate_fake_config(importing_path, **{
k: v
for k, v in importing.items()
if k != 'config'
})
# do the tests
importing_cfg = Cfg(importing_path)
imported_cfg = Cfg(imported_path)
self.assertIsNotNone(importing_cfg)
self.assertIsNotNone(imported_cfg)
# test profiles
self.assertIsSubset(imported_cfg.lnk_profiles,
importing_cfg.lnk_profiles)
# test dotfiles
self.assertEqual(importing_cfg.dotfiles['f_vimrc'],
imported_cfg.dotfiles['f_vimrc'])
self.assertNotEqual(importing_cfg.dotfiles['f_xinitrc'],
imported_cfg.dotfiles['f_xinitrc'])
# test actions
self.assertFalse(any(
(imported_cfg.actions['pre'][key]
== importing_cfg.actions['pre'][key])
for key in imported_cfg.actions['pre']
))
self.assertFalse(any(
(imported_cfg.actions['post'][key]
== importing_cfg.actions['post'][key])
for key in imported_cfg.actions['post']
))
# test transactions
self.assertFalse(any(
imported_cfg.trans_r[key] == importing_cfg.trans_r[key]
for key in imported_cfg.trans_r
))
self.assertFalse(any(
imported_cfg.trans_w[key] == importing_cfg.trans_w[key]
for key in imported_cfg.trans_w
))
# test variables
imported_vars = imported_cfg.get_variables(None)
self.assertFalse(any(
imported_vars[k] == v
for k, v in importing_cfg.get_variables(None).items()
if not k.startswith('_')
))
# test prodots
self.assertEqual(imported_cfg.prodots['host1'],
importing_cfg.prodots['host1'])
self.assertNotEqual(imported_cfg.prodots['host2'],
importing_cfg.prodots['host2'])
self.assertTrue(set(imported_cfg.prodots['host1'])
< set(importing_cfg.prodots['host2']))
def main():
unittest.main()

View File

@@ -15,10 +15,10 @@ from dotdrop.dotdrop import cmd_list_files
from dotdrop.dotdrop import cmd_update
from dotdrop.linktypes import LinkTypes
from tests.helpers import get_path_strip_version, edit_content, \
load_options, create_random_file, \
clean, get_string, get_dotfile_from_yaml, \
get_tempdir, create_fake_config, create_dir
from tests.helpers import (clean, create_dir, create_fake_config,
create_random_file, edit_content, file_in_yaml,
get_path_strip_version, get_string, get_tempdir,
load_options, populate_fake_config)
class TestImport(unittest.TestCase):
@@ -39,18 +39,13 @@ class TestImport(unittest.TestCase):
def assert_file(self, path, o, profile):
"""Make sure path has been inserted in conf for profile"""
strip = get_path_strip_version(path)
self.assertTrue(strip in [x.src for x in o.dotfiles])
dsts = [os.path.expanduser(x.dst) for x in o.dotfiles]
self.assertTrue(any(x.src.endswith(strip) for x in o.dotfiles))
dsts = (os.path.expanduser(x.dst) for x in o.dotfiles)
self.assertTrue(path in dsts)
def assert_in_yaml(self, path, dic, link=False):
"""Make sure "path" is in the "dic" representing the yaml file"""
strip = get_path_strip_version(path)
self.assertTrue(strip in [x['src'] for x in dic['dotfiles'].values()])
dsts = [os.path.expanduser(x['dst']) for x in dic['dotfiles'].values()]
if link:
self.assertTrue(get_dotfile_from_yaml(dic, path)['link'])
self.assertTrue(path in dsts)
self.assertTrue(file_in_yaml(dic, path, link))
def test_import(self):
"""Test the import function"""
@@ -203,6 +198,210 @@ class TestImport(unittest.TestCase):
c2 = open(indt1, 'r').read()
self.assertTrue(editcontent == c2)
def test_ext_config_yaml_not_mix(self):
"""Test whether the import_configs mixes yaml files upon importing."""
# dotfiles on filesystem
src = get_tempdir()
self.assertTrue(os.path.exists(src))
self.addCleanup(clean, src)
# create some random dotfiles
dotfiles = []
for _ in range(3):
dotfile, _ = create_random_file(src)
dotfiles.append(dotfile)
self.addCleanup(clean, dotfile)
self.assertTrue(all(map(os.path.exists, dotfiles)))
# create dotdrop home
dotdrop_home = get_tempdir()
self.assertTrue(os.path.exists(dotdrop_home))
self.addCleanup(clean, dotdrop_home)
imported = {
'config': {
'dotpath': 'imported',
},
'dotfiles': {},
'profiles': {
'host1': {
'dotfiles': [],
},
},
'actions': {
'pre': {
'a_pre_log_ed': 'echo pre 2',
},
'post': {
'a_post_log_ed': 'echo post 2',
},
'a_log_ed': 'echo 2',
},
'trans': {
't_log_ed': 'echo 3',
},
'trans_write': {
'tw_log_ed': 'echo 4',
},
'variables': {
'v_log_ed': '42',
},
'dynvariables': {
'dv_log_ed': 'echo 5',
},
}
importing = {
'config': {
'dotpath': 'importing',
},
'dotfiles': {},
'profiles': {
'host2': {
'dotfiles': [],
'include': ['host1'],
},
},
'actions': {
'pre': {
'a_pre_log_ing': 'echo pre a',
},
'post': {
'a_post_log_ing': 'echo post a',
},
'a_log_ing': 'echo a',
},
'trans': {
't_log_ing': 'echo b',
},
'trans_write': {
'tw_log_ing': 'echo c',
},
'variables': {
'v_log_ing': 'd',
},
'dynvariables': {
'dv_log_ing': 'echo e',
},
}
dotfiles_ing, dotfiles_ed = dotfiles[:-1], dotfiles[-1:]
# create the imported base config file
imported_path = create_fake_config(dotdrop_home,
configname='config-2.yaml',
**imported['config'])
# create the importing base config file
importing_path = create_fake_config(dotdrop_home,
configname='config.yaml',
import_configs=('config-*.yaml',),
**importing['config'])
# edit the imported config
populate_fake_config(imported_path, **{
k: v
for k, v in imported.items()
if k != 'config'
})
# edit the importing config
populate_fake_config(importing_path, **{
k: v
for k, v in importing.items()
if k != 'config'
})
# import the dotfiles
o = load_options(imported_path, 'host1')
o.import_path = dotfiles_ed
cmd_importer(o)
o = load_options(importing_path, 'host2')
o.import_path = dotfiles_ing
cmd_importer(o)
# reload the config
o = load_options(importing_path, 'host2')
# test imported config
y = self.load_yaml(imported_path)
# testing dotfiles
self.assertTrue(all(file_in_yaml(y, df) for df in dotfiles_ed))
self.assertFalse(any(file_in_yaml(y, df) for df in dotfiles_ing))
# testing profiles
profiles = y['profiles'].keys()
self.assertTrue('host1' in profiles)
self.assertFalse('host2' in profiles)
# testing actions
actions = y['actions']['pre']
actions.update(y['actions']['post'])
actions.update({
k: v
for k, v in y['actions'].items()
if k not in ('pre', 'post')
})
actions = actions.keys()
self.assertTrue(all(a.endswith('ed') for a in actions))
self.assertFalse(any(a.endswith('ing') for a in actions))
# testing transformations
transformations = y['trans'].keys()
self.assertTrue(all(t.endswith('ed') for t in transformations))
self.assertFalse(any(t.endswith('ing') for t in transformations))
transformations = y['trans_write'].keys()
self.assertTrue(all(t.endswith('ed') for t in transformations))
self.assertFalse(any(t.endswith('ing') for t in transformations))
# testing variables
variables = y['variables'].keys()
self.assertTrue(all(v.endswith('ed') for v in variables))
self.assertFalse(any(v.endswith('ing') for v in variables))
dyn_variables = y['dynvariables'].keys()
self.assertTrue(all(dv.endswith('ed') for dv in dyn_variables))
self.assertFalse(any(dv.endswith('ing') for dv in dyn_variables))
# test importing config
y = self.load_yaml(importing_path)
# testing dotfiles
self.assertTrue(all(file_in_yaml(y, df) for df in dotfiles_ing))
self.assertFalse(any(file_in_yaml(y, df) for df in dotfiles_ed))
# testing profiles
profiles = y['profiles'].keys()
self.assertTrue('host2' in profiles)
self.assertFalse('host1' in profiles)
# testing actions
actions = y['actions']['pre']
actions.update(y['actions']['post'])
actions.update({
k: v
for k, v in y['actions'].items()
if k not in ('pre', 'post')
})
actions = actions.keys()
self.assertTrue(all(action.endswith('ing') for action in actions))
self.assertFalse(any(action.endswith('ed') for action in actions))
# testing transformations
transformations = y['trans'].keys()
self.assertTrue(all(t.endswith('ing') for t in transformations))
self.assertFalse(any(t.endswith('ed') for t in transformations))
transformations = y['trans_write'].keys()
self.assertTrue(all(t.endswith('ing') for t in transformations))
self.assertFalse(any(t.endswith('ed') for t in transformations))
# testing variables
variables = y['variables'].keys()
self.assertTrue(all(v.endswith('ing') for v in variables))
self.assertFalse(any(v.endswith('ed') for v in variables))
dyn_variables = y['dynvariables'].keys()
self.assertTrue(all(dv.endswith('ing') for dv in dyn_variables))
self.assertFalse(any(dv.endswith('ed') for dv in dyn_variables))
def main():
unittest.main()

View File

@@ -10,8 +10,9 @@ from unittest.mock import MagicMock, patch
import filecmp
from dotdrop.config import Cfg
from tests.helpers import create_dir, get_string, get_tempdir, clean, \
create_random_file, load_options
from tests.helpers import (clean, create_dir, create_fake_config,
create_random_file, get_string, get_tempdir,
load_options, populate_fake_config)
from dotdrop.dotfile import Dotfile
from dotdrop.installer import Installer
from dotdrop.action import Action
@@ -231,6 +232,107 @@ exec bspwm
tempcontent = open(dst10, 'r').read().rstrip()
self.assertTrue(tempcontent == header())
def test_install_import_configs(self):
"""Test the install function with imported configs"""
# dotpath location
tmp = get_tempdir()
self.assertTrue(os.path.exists(tmp))
self.addCleanup(clean, tmp)
os.mkdir(os.path.join(tmp, 'importing'))
os.mkdir(os.path.join(tmp, 'imported'))
# where dotfiles will be installed
dst = get_tempdir()
self.assertTrue(os.path.exists(dst))
self.addCleanup(clean, dst)
# creating random dotfiles
imported_dotfile, _ = create_random_file(os.path.join(tmp, 'imported'))
imported_dotfile = {
'dst': os.path.join(dst, imported_dotfile),
'key': 'f_{}'.format(imported_dotfile),
'name': imported_dotfile,
'src': os.path.join(tmp, 'imported', imported_dotfile),
}
importing_dotfile, _ = \
create_random_file(os.path.join(tmp, 'importing'))
importing_dotfile = {
'dst': os.path.join(dst, importing_dotfile),
'key': 'f_{}'.format(importing_dotfile),
'name': importing_dotfile,
'src': os.path.join(tmp, 'imported', importing_dotfile),
}
imported = {
'config': {
'dotpath': 'imported',
},
'dotfiles': {
imported_dotfile['key']: {
'dst': imported_dotfile['dst'],
'src': imported_dotfile['name'],
},
},
'profiles': {
'host1': {
'dotfiles': [imported_dotfile['key']],
},
},
}
importing = {
'config': {
'dotpath': 'importing',
},
'dotfiles': {
importing_dotfile['key']: {
'dst': importing_dotfile['dst'],
'src': importing_dotfile['src'],
},
},
'profiles': {
'host2': {
'dotfiles': [importing_dotfile['key']],
'include': ['host1'],
},
},
}
# create the imported base config file
imported_path = create_fake_config(tmp,
configname='config-2.yaml',
**imported['config'])
# create the importing base config file
importing_path = create_fake_config(tmp,
configname='config.yaml',
import_configs=('config-*.yaml',),
**importing['config'])
# edit the imported config
populate_fake_config(imported_path, **{
k: v
for k, v in imported.items()
if k != 'config'
})
# edit the importing config
populate_fake_config(importing_path, **{
k: v
for k, v in importing.items()
if k != 'config'
})
# install them
o = load_options(importing_path, 'host2')
o.safe = False
o.install_showdiff = True
o.variables = {}
cmd_install(o)
# now compare the generated files
self.assertTrue(os.path.exists(importing_dotfile['dst']))
self.assertTrue(os.path.exists(imported_dotfile['dst']))
def test_link_children(self):
"""test the link children"""
# create source dir