1
0
mirror of https://github.com/deadc0de6/dotdrop.git synced 2026-02-04 20:54:51 +00:00
Files
dotdrop/dotdrop/cfg_yaml.py
2023-04-13 22:25:54 +02:00

1838 lines
66 KiB
Python

"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2019, deadc0de6
handle lower level of the config file
will provide the following dictionaries to
the upper layer:
* self.settings
* self.dotfiles
* self.profiles
* self.actions
* self.trans_r
* self.trans_w
* self.variables
Additionally a few methods are exported.
"""
# pylint: disable=C0302
import os
import glob
import io
from copy import deepcopy
from itertools import chain
from ruamel.yaml import YAML as yaml
import toml
# local imports
from dotdrop.version import __version__ as VERSION
from dotdrop.settings import Settings
from dotdrop.logger import Logger
from dotdrop.templategen import Templategen
from dotdrop.linktypes import LinkTypes
from dotdrop.utils import shellrun, uniq_list, userinput
from dotdrop.exceptions import YamlException, UndefinedException
class CfgYaml:
"""yaml config file parser"""
# global entries
key_settings = Settings.key_yaml
key_dotfiles = 'dotfiles'
key_profiles = 'profiles'
key_actions = 'actions'
old_key_trans_r = 'trans'
key_trans_r = 'trans_read'
key_trans_w = 'trans_write'
key_variables = 'variables'
key_dvariables = 'dynvariables'
key_uvariables = 'uservariables'
action_pre = 'pre'
action_post = 'post'
save_uservariables_name = 'uservariables{}.yaml'
# profiles/dotfiles entries
key_dotfile_src = 'src'
key_dotfile_dst = 'dst'
key_dotfile_link = 'link'
key_dotfile_actions = 'actions'
key_dotfile_noempty = 'ignoreempty'
key_dotfile_template = 'template'
key_dotfile_chmod = 'chmod'
# chmod value
chmod_ignore = 'preserve'
# profile
key_profile_dotfiles = 'dotfiles'
key_profile_include = 'include'
key_profile_variables = 'variables'
key_profile_dvariables = 'dynvariables'
key_profile_actions = 'actions'
key_all = 'ALL'
# import entries
key_import_actions = 'import_actions'
key_import_configs = 'import_configs'
key_import_variables = 'import_variables'
key_import_profile_dfs = 'import'
key_import_sep = ':'
key_import_ignore_key = 'optional'
key_import_fatal_not_found = True
# settings
key_settings_dotpath = Settings.key_dotpath
key_settings_workdir = Settings.key_workdir
key_settings_link_dotfile_default = Settings.key_link_dotfile_default
key_settings_noempty = Settings.key_ignoreempty
key_settings_minversion = Settings.key_minversion
key_imp_link = Settings.key_link_on_import
key_settings_template = Settings.key_template_dotfile_default
# link values
lnk_nolink = LinkTypes.NOLINK.name.lower()
lnk_link = LinkTypes.LINK.name.lower()
lnk_children = LinkTypes.LINK_CHILDREN.name.lower()
lnk_absolute = LinkTypes.ABSOLUTE.name.lower()
lnk_relative = LinkTypes.RELATIVE.name.lower()
# checks
allowed_link_val = [lnk_nolink, lnk_link, lnk_children,
lnk_absolute, lnk_relative]
top_entries = [key_dotfiles, key_settings, key_profiles]
def __init__(self, path, profile=None, addprofiles=None,
reloading=False, debug=False, imported_configs=None):
"""
config parser
@path: config file path
@profile: the selected profile names
@addprofiles: included profiles names (list)
@reloading: true when reloading
@imported_configs: paths of config files that have been imported so far
@debug: debug flag
"""
self._path = os.path.abspath(path)
self._profile = profile
self._reloading = reloading
self._debug = debug
self._log = Logger(debug=self._debug)
# config format
self._config_format = 'yaml'
# config needs to be written
self._dirty = False
# indicates the config has been updated
self._dirty_deprecated = False
# profile variables
self._profilevarskeys = []
# included profiles
self._inc_profiles = addprofiles or []
# imported configs
self.imported_configs = imported_configs or []
# init the dictionaries
self.settings = {}
self.dotfiles = {}
self.profiles = {}
self.actions = {}
self.trans_r = {}
self.trans_w = {}
self.variables = {}
if not os.path.exists(self._path):
err = f'invalid config path: \"{path}\"'
if self._debug:
self._dbg(err)
raise YamlException(err)
self._dbg('START of config parsing')
self._dbg(f'reloading: {reloading}')
self._dbg(f'profile: {profile}')
pfs = ','.join(self._inc_profiles)
self._dbg(f'included profiles: {pfs}')
self._yaml_dict = self._load_yaml(self._path)
# live patch deprecated entries
self._fix_deprecated(self._yaml_dict)
# validate content
self._validate(self._yaml_dict)
##################################################
# parse the config and variables
##################################################
# parse the "config" block
self.settings = self._parse_blk_settings(self._yaml_dict)
# base templater (when no vars/dvars exist)
self.variables = self._enrich_vars(self.variables, self._profile)
self._redefine_templater()
# variables and dynvariables need to be first merged
# before being templated in order to allow cyclic
# references between them
# parse the "variables" block
var = self._parse_blk_variables(self._yaml_dict)
self._add_variables(var, template=False)
# parse the "dynvariables" block
dvariables = self._parse_blk_dynvariables(self._yaml_dict)
self._add_variables(dvariables, template=False)
# now template variables and dynvariables from the same pool
self.variables = self._rec_resolve_variables(self.variables)
# and execute dvariables
# since this is done after recursively resolving variables
# and dynvariables this means that variables referencing
# dynvariables will result with the not executed value
if dvariables.keys():
self._shell_exec_dvars(self.variables, keys=dvariables.keys())
# finally redefine the template
self._redefine_templater()
if self._debug:
title = 'current variables defined'
self._debug_dict(title, self.variables)
# parse the "profiles" block
self.profiles = self._parse_blk_profiles(self._yaml_dict)
# include the profile's variables/dynvariables last
# as it overwrites existing ones
incpro, pvar, pdvar = self._get_profile_included_vars()
self._inc_profiles = uniq_list(self._inc_profiles + incpro)
self._add_variables(pvar, prio=True)
self._add_variables(pdvar, shell=True, prio=True)
self._profilevarskeys.extend(pvar.keys())
self._profilevarskeys.extend(pdvar.keys())
# template variables
self.variables = self._template_dict(self.variables)
if self._debug:
title = 'variables defined (after template dict)'
self._debug_dict(title, self.variables)
##################################################
# template the "include" entries
##################################################
self._template_include_entry()
if self._debug:
title = 'variables defined (after template include)'
self._debug_dict(title, self.variables)
##################################################
# template config entries
##################################################
entry = self.settings[self.key_settings_dotpath]
val = self._template_item(entry)
self.settings[self.key_settings_dotpath] = val
##################################################
# parse the other blocks
##################################################
# parse the "dotfiles" block
self.dotfiles = self._parse_blk_dotfiles(self._yaml_dict)
# parse the "actions" block
self.actions = self._parse_blk_actions(self._yaml_dict)
# parse the "trans_r" block
self.trans_r = self._parse_blk_trans_r(self._yaml_dict)
# parse the "trans_w" block
self.trans_w = self._parse_blk_trans_w(self._yaml_dict)
##################################################
# import elements
##################################################
# process imported variables (import_variables)
newvars = self._import_variables()
self._clear_profile_vars(newvars)
self._add_variables(newvars, prio=True)
# process imported actions (import_actions)
self._import_actions()
# process imported profile dotfiles (import)
self._import_profiles_dotfiles()
# process imported configs (import_configs)
self._import_configs()
# process profile include items (actions, dotfiles, ...)
self._resolve_profile_includes()
# add the current profile variables
_, pvar, pdvar = self._get_profile_included_vars()
self._add_variables(pvar, prio=False)
self._add_variables(pdvar, shell=True, prio=False)
self._profilevarskeys.extend(pvar.keys())
self._profilevarskeys.extend(pdvar.keys())
# resolve variables
self._clear_profile_vars(newvars)
self._add_variables(newvars)
# process profile ALL
self._resolve_profile_all()
# template dotfiles entries
self._template_dotfiles_entries()
# parse the "uservariables" block
uvariables = self._parse_blk_uservariables(self._yaml_dict,
self.variables)
self._add_variables(uvariables, template=False, prio=False)
# end of parsing
if self._debug:
self._dbg('########### final config ###########')
self._debug_entries()
self._dbg('END of config parsing')
########################################################
# public methods
########################################################
def _resolve_dotfile_link(self, link):
"""resolve dotfile link entry"""
newlink = self._template_item(link)
# check link value
if newlink not in self.allowed_link_val:
err = f'bad link value: {newlink}'
self._log.err(err)
self._log.err(f'allowed: {self.allowed_link_val}')
raise YamlException(f'config content error: {err}')
return newlink
def resolve_dotfile_src(self, src, templater=None):
"""
get abs src file from a relative path
in dotpath
"""
newsrc = ''
if src:
new = src
if templater:
new = templater.generate_string(src)
if new != src and self._debug:
msg = f'dotfile src: \"{src}\" -> \"{new}\"'
self._dbg(msg)
src = new
src = os.path.join(self.settings[self.key_settings_dotpath],
src)
newsrc = self._norm_path(src)
return newsrc
def resolve_dotfile_dst(self, dst, templater=None):
"""resolve dotfile dst path"""
newdst = ''
if dst:
new = dst
if templater:
new = templater.generate_string(dst)
if new != dst and self._debug:
msg = f'dotfile dst: \"{dst}\" -> \"{new}\"'
self._dbg(msg)
dst = new
newdst = self._norm_path(dst)
return newdst
def add_dotfile_to_profile(self, dotfile_key, profile_key):
"""
add an existing dotfile key to a profile_key
if profile_key doesn't exist, the profile is created
we test using profiles variable since it merges
imported ones (include, etc) but insert in main
yaml only
return True if was added
"""
# create the profile if it doesn't exist
self._new_profile(profile_key)
if profile_key not in self.profiles:
return False
profile = self.profiles[profile_key]
# ensure profile dotfiles list is not None in local object
if self.key_profile_dotfiles not in profile or \
profile[self.key_profile_dotfiles] is None:
profile[self.key_profile_dotfiles] = []
# ensure profile dotfiles list is not None in yaml dict
dict_pro = self._yaml_dict[self.key_profiles][profile_key]
if self.key_profile_dotfiles not in dict_pro or \
dict_pro[self.key_profile_dotfiles] is None:
dict_pro[self.key_profile_dotfiles] = []
# add to the profile
pdfs = profile[self.key_profile_dotfiles]
if self.key_all not in pdfs and \
dotfile_key not in pdfs:
# append dotfile
pro = self._yaml_dict[self.key_profiles][profile_key]
pro[self.key_profile_dotfiles].append(dotfile_key)
if self._debug:
msg = f'add \"{dotfile_key}\" to profile \"{profile_key}\"'
self._dbg(msg)
self._dirty = True
return self._dirty
def get_all_dotfile_keys(self):
"""return all existing dotfile keys"""
return self.dotfiles.keys()
def _update_dotfile_chmod(self, key, dotfile, chmod):
old = None
if self.key_dotfile_chmod in dotfile:
old = dotfile[self.key_dotfile_chmod]
if old == self.chmod_ignore:
msg = (
'ignore chmod change since '
f'{self.chmod_ignore}'
)
self._dbg(msg)
return False
if old == chmod:
return False
if self._debug:
self._dbg(f'update dotfile: {key}')
self._dbg(f'old chmod value: {old}')
self._dbg(f'new chmod value: {chmod}')
dotfile = self._yaml_dict[self.key_dotfiles][key]
if not chmod:
del dotfile[self.key_dotfile_chmod]
else:
dotfile[self.key_dotfile_chmod] = str(format(chmod, 'o'))
return True
def update_dotfile(self, key, chmod):
"""
update an existing dotfile
return true if updated
"""
if key not in self.dotfiles.keys():
return False
dotfile = self._yaml_dict[self.key_dotfiles][key]
if not self._update_dotfile_chmod(key, dotfile, chmod):
return False
self._dirty = True
return True
def add_dotfile(self, key, src, dst, link, chmod=None,
trans_r_key=None, trans_w_key=None):
"""add a new dotfile"""
if key in self.dotfiles.keys():
return False
if self._debug:
self._dbg(f'adding new dotfile: {key}')
self._dbg(f'new dotfile src: {src}')
self._dbg(f'new dotfile dst: {dst}')
self._dbg(f'new dotfile link: {link}')
self._dbg(f'new dotfile chmod: {chmod}')
self._dbg(f'new dotfile trans_r: {trans_r_key}')
self._dbg(f'new dotfile trans_w: {trans_w_key}')
# create the dotfile dict
df_dict = {
self.key_dotfile_src: src,
self.key_dotfile_dst: dst,
}
# link
dfl = self.settings[self.key_settings_link_dotfile_default]
if str(link) != dfl:
df_dict[self.key_dotfile_link] = str(link)
# chmod
if chmod:
df_dict[self.key_dotfile_chmod] = str(format(chmod, 'o'))
# trans_r/trans_w
if trans_r_key:
df_dict[self.key_trans_r] = str(trans_r_key)
if trans_w_key:
df_dict[self.key_trans_w] = str(trans_w_key)
if self._debug:
self._dbg(f'dotfile dict: {df_dict}')
# add to global dict
self._yaml_dict[self.key_dotfiles][key] = df_dict
self._dirty = True
return True
def del_dotfile(self, key):
"""remove this dotfile from config"""
if key not in self._yaml_dict[self.key_dotfiles]:
self._log.err(f'key not in dotfiles: {key}')
return False
if self._debug:
self._dbg(f'remove dotfile: {key}')
del self._yaml_dict[self.key_dotfiles][key]
if self._debug:
dfs = self._yaml_dict[self.key_dotfiles]
self._dbg(f'new dotfiles: {dfs}')
self._dirty = True
return True
def del_dotfile_from_profile(self, df_key, pro_key):
"""remove this dotfile from that profile"""
if self._debug:
self._dbg(f'removing \"{df_key}\" from \"{pro_key}\"')
if df_key not in self.dotfiles.keys():
self._log.err(f'key not in dotfiles: {df_key}')
return False
if pro_key not in self.profiles.keys():
self._log.err(f'key not in profile: {pro_key}')
return False
# get the profile dictionary
profile = self._yaml_dict[self.key_profiles][pro_key]
if self.key_profile_dotfiles not in profile:
# profile does not contain any dotfiles
return True
if df_key not in profile[self.key_profile_dotfiles]:
return True
if self._debug:
dfs = profile[self.key_profile_dotfiles]
self._dbg(f'{pro_key} profile dotfiles: {dfs}')
self._dbg(f'remove {df_key} from profile {pro_key}')
profile[self.key_profile_dotfiles].remove(df_key)
if self._debug:
dfs = profile[self.key_profile_dotfiles]
self._dbg(f'{pro_key} profile dotfiles: {dfs}')
self._dirty = True
return True
def save(self):
"""save this instance and return True if saved"""
if not self._dirty:
return False
content = self._prepare_to_save(self._yaml_dict)
if self._dirty_deprecated:
# add minversion
settings = content[self.key_settings]
settings[self.key_settings_minversion] = VERSION
# save to file
if self._debug:
self._dbg(f'saving to {self._path}')
try:
with open(self._path, 'w', encoding='utf8') as file:
self._yaml_dump(content, file, fmt=self._config_format)
except Exception as exc:
self._log.err(exc)
err = f'error saving config: {self._path}'
raise YamlException(err) from exc
if self._dirty_deprecated:
warn = 'your config contained deprecated entries'
warn += ' and was updated'
self._log.warn(warn)
self._dirty = False
return True
def dump(self):
"""dump the config dictionary"""
output = io.StringIO()
content = self._prepare_to_save(self._yaml_dict.copy())
self._yaml_dump(content, output, fmt=self._config_format)
return output.getvalue()
########################################################
# block parsing
########################################################
def _parse_blk_settings(self, dic):
"""parse the "config" block"""
block = self._get_entry(dic, self.key_settings).copy()
# set defaults
settings = Settings(None).serialize().get(self.key_settings)
settings.update(block)
# resolve minimum version
if self.key_settings_minversion in settings:
minversion = settings[self.key_settings_minversion]
self._check_minversion(minversion)
# normalize paths
paths = self._norm_path(settings[self.key_settings_dotpath])
settings[self.key_settings_dotpath] = paths
paths = self._norm_path(settings[self.key_settings_workdir])
settings[self.key_settings_workdir] = paths
paths = [
self._norm_path(path)
for path in settings[Settings.key_filter_file]
]
settings[Settings.key_filter_file] = paths
paths = [
self._norm_path(path)
for path in settings[Settings.key_func_file]
]
settings[Settings.key_func_file] = paths
if self._debug:
self._debug_dict('settings block:', settings)
return settings
def _parse_blk_dotfiles(self, dic):
"""parse the "dotfiles" block"""
dotfiles = self._get_entry(dic, self.key_dotfiles).copy()
keys = dotfiles.keys()
if len(keys) != len(list(set(keys))):
dups = [x for x in keys if x not in list(set(keys))]
err = f'duplicate dotfile keys found: {dups}'
raise YamlException(err)
dotfiles = self._norm_dotfiles(dotfiles)
if self._debug:
self._debug_dict('dotfiles block', dotfiles)
return dotfiles
def _parse_blk_profiles(self, dic):
"""parse the "profiles" block"""
profiles = self._get_entry(dic, self.key_profiles).copy()
profiles = self._norm_profiles(profiles)
if self._debug:
self._debug_dict('profiles block', profiles)
return profiles
def _parse_blk_actions(self, dic):
"""parse the "actions" block"""
actions = self._get_entry(dic, self.key_actions,
mandatory=False)
if actions:
actions = actions.copy()
actions = self._norm_actions(actions)
if self._debug:
self._debug_dict('actions block', actions)
return actions
def _parse_blk_trans_r(self, dic):
"""parse the "trans_r" block"""
key = self.key_trans_r
if self.old_key_trans_r in dic:
msg = '\"trans\" is deprecated, please use \"trans_read\"'
self._log.warn(msg)
dic[self.key_trans_r] = dic[self.old_key_trans_r]
del dic[self.old_key_trans_r]
trans_r = self._get_entry(dic, key, mandatory=False)
if trans_r:
trans_r = trans_r.copy()
if self._debug:
self._debug_dict('trans_r block', trans_r)
return trans_r
def _parse_blk_trans_w(self, dic):
"""parse the "trans_w" block"""
trans_w = self._get_entry(dic, self.key_trans_w,
mandatory=False)
if trans_w:
trans_w = trans_w.copy()
if self._debug:
self._debug_dict('trans_w block', trans_w)
return trans_w
def _parse_blk_variables(self, dic):
"""parse the "variables" block"""
variables = self._get_entry(dic,
self.key_variables,
mandatory=False)
if variables:
variables = variables.copy()
if self._debug:
self._debug_dict('variables block', variables)
return variables
def _parse_blk_dynvariables(self, dic):
"""parse the "dynvariables" block"""
dvariables = self._get_entry(dic,
self.key_dvariables,
mandatory=False)
if dvariables:
dvariables = dvariables.copy()
if self._debug:
self._debug_dict('dynvariables block', dvariables)
return dvariables
def _parse_blk_uservariables(self, dic, current):
"""parse the "uservariables" block"""
uvariables = self._get_entry(dic,
self.key_uvariables,
mandatory=False)
uvars = {}
if not self._reloading and uvariables:
try:
for name, prompt in uvariables.items():
if name in current:
# ignore if already defined
if self._debug:
self._dbg(f'ignore uservariables {name}')
continue
content = userinput(prompt, debug=self._debug)
uvars[name] = content
except KeyboardInterrupt as exc:
raise YamlException('interrupted') from exc
if uvars:
uvars = uvars.copy()
if self._debug:
self._debug_dict('uservariables block', uvars)
# save uservariables
if uvars:
try:
self._save_uservariables(uvars)
except YamlException:
# ignore
pass
return uvars
########################################################
# parsing helpers
########################################################
def _template_include_entry(self):
"""template all "include" entries"""
# import_actions
new = []
entries = self.settings.get(self.key_import_actions, [])
new = self._template_list(entries)
if new:
self.settings[self.key_import_actions] = new
# import_configs
entries = self.settings.get(self.key_import_configs, [])
new = self._template_list(entries)
if new:
self.settings[self.key_import_configs] = new
# import_variables
entries = self.settings.get(self.key_import_variables, [])
new = self._template_list(entries)
if new:
self.settings[self.key_import_variables] = new
# profile's import
for _, val in self.profiles.items():
entries = val.get(self.key_import_profile_dfs, [])
new = self._template_list(entries)
if new:
val[self.key_import_profile_dfs] = new
def _norm_actions(self, actions):
"""
ensure each action is either pre or post explicitely
action entry of the form {action_key: (pre|post, action)}
"""
if not actions:
return actions
new = {}
for k, val in actions.items():
if k in (self.action_pre, self.action_post):
for key, action in val.items():
new[key] = (k, action)
else:
new[k] = (self.action_post, val)
return new
def _norm_profiles(self, profiles):
"""normalize profiles entries"""
if not profiles:
return profiles
new = {}
# loop through each profile
for pro, entries in profiles.items():
if pro == self.key_all:
msg = f'\"{self.key_all}\" is a special profile name, '
msg += 'consider renaming to avoid any issue.'
self._log.warn(msg)
if not pro:
msg = 'empty profile name'
self._log.warn(msg)
continue
if not entries:
# no entries in profile dict
continue
# add "dotfiles:" entry if not present in local object
if self.key_profile_dotfiles not in entries or \
entries[self.key_profile_dotfiles] is None:
entries[self.key_profile_dotfiles] = []
# add "dotfiles:" entry if not present in yaml dict
dict_pro = self._yaml_dict[self.key_profiles][pro]
if self.key_profile_dotfiles not in dict_pro or \
dict_pro[self.key_profile_dotfiles] is None:
dict_pro[self.key_profile_dotfiles] = []
new[pro] = entries
return new
def _norm_dotfile_chmod(self, entry):
value = str(entry[self.key_dotfile_chmod])
if value == self.chmod_ignore:
# is preserve
return
if len(value) < 3:
# bad format
err = f'bad format for chmod: {value}'
self._log.err(err)
raise YamlException(f'config content error: {err}')
# check is valid value
try:
int(value)
except Exception as exc:
err = f'bad format for chmod: {value}'
self._log.err(err)
err = f'config content error: {err}'
raise YamlException(err) from exc
# normalize chmod value
for chmodv in list(value):
chmodint = int(chmodv)
if chmodint < 0 or chmodint > 7:
err = f'bad format for chmod: {value}'
self._log.err(err)
raise YamlException(
f'config content error: {err}'
)
# octal
entry[self.key_dotfile_chmod] = int(value, 8)
def _norm_dotfiles(self, dotfiles):
"""normalize and check dotfiles entries"""
if not dotfiles:
return dotfiles
new = {}
for k, val in dotfiles.items():
if self.key_dotfile_src not in val:
# add 'src' as key' if not present
val[self.key_dotfile_src] = k
new[k] = val
else:
new[k] = val
if self.old_key_trans_r in val:
# fix deprecated trans key
msg = f'{k} \"trans\" is deprecated, please use \"trans_read\"'
self._log.warn(msg)
val[self.key_trans_r] = val[self.old_key_trans_r]
del val[self.old_key_trans_r]
new[k] = val
if self.key_dotfile_link not in val:
# apply link value if undefined
value = self.settings[self.key_settings_link_dotfile_default]
val[self.key_dotfile_link] = value
if self.key_dotfile_noempty not in val:
# apply noempty if undefined
value = self.settings.get(self.key_settings_noempty, False)
val[self.key_dotfile_noempty] = value
if self.key_dotfile_template not in val:
# apply template if undefined
value = self.settings.get(self.key_settings_template, True)
val[self.key_dotfile_template] = value
if self.key_dotfile_chmod in val:
# validate value of chmod if defined
self._norm_dotfile_chmod(val)
return new
def _add_variables(self, new, shell=False, template=True, prio=False):
"""
add new variables
@shell: execute the variable through the shell
@template: template the variable
@prio: new takes priority over existing variables
"""
if not new:
return
# merge
if prio:
self.variables = self._merge_dict(new, self.variables)
else:
# clear existing variable
for k in list(new.keys()):
if k in self.variables.keys():
del new[k]
# merge
self.variables = self._merge_dict(self.variables, new)
# ensure enriched variables are relative to this config
self.variables = self._enrich_vars(self.variables, self._profile)
# re-create the templater
self._redefine_templater()
if template:
# rec resolve variables with new ones
self.variables = self._rec_resolve_variables(self.variables)
if shell and new:
# shell exec
self._shell_exec_dvars(self.variables, keys=new.keys())
# re-create the templater
self._redefine_templater()
def _enrich_vars(self, variables, profile):
"""return enriched variables"""
# add profile variable
if profile:
variables['profile'] = profile
# add some more variables
path = self.settings.get(self.key_settings_dotpath)
path = self._norm_path(path)
variables['_dotdrop_dotpath'] = path
variables['_dotdrop_cfgpath'] = self._norm_path(self._path)
path = self.settings.get(self.key_settings_workdir)
path = self._norm_path(path)
variables['_dotdrop_workdir'] = path
return variables
def _get_profile_included_item(self, keyitem):
"""recursively get included <keyitem> in profile"""
profiles = [self._profile] + self._inc_profiles
items = {}
for profile in profiles:
seen = [self._profile]
i = self.__get_profile_included_item(profile, keyitem, seen)
items = self._merge_dict(i, items)
return items
def __get_profile_included_item(self, profile, keyitem, seen):
"""recursively get included <keyitem> from profile"""
items = {}
if not profile or profile not in self.profiles.keys():
return items
# considered profile entry
pentry = self.profiles.get(profile)
# recursively get <keyitem> from inherited profile
for inherited_profile in pentry.get(self.key_profile_include, []):
if inherited_profile == profile or inherited_profile in seen:
raise YamlException('\"include\" loop')
seen.append(inherited_profile)
new = self.__get_profile_included_item(inherited_profile,
keyitem, seen)
if self._debug:
msg = f'included {keyitem} from {inherited_profile}: {new}'
self._dbg(msg)
items.update(new)
cur = pentry.get(keyitem, {})
return self._merge_dict(cur, items)
def _resolve_profile_all(self):
"""resolve some other parts of the config"""
# profile -> ALL
for k, val in self.profiles.items():
dfs = val.get(self.key_profile_dotfiles, None)
if not dfs:
continue
if self.key_all in dfs:
if self._debug:
self._dbg(f'add ALL to profile \"{k}\"')
val[self.key_profile_dotfiles] = self.dotfiles.keys()
def _resolve_profile_includes(self):
"""resolve elements included through other profiles"""
for k, _ in self.profiles.items():
self._rec_resolve_profile_include(k)
def _rec_resolve_profile_include(self, profile):
"""
recursively resolve include of other profiles's:
* dotfiles
* actions
returns dotfiles, actions
"""
this_profile = self.profiles[profile]
# considered profile content
dotfiles = this_profile.get(self.key_profile_dotfiles, []) or []
actions = this_profile.get(self.key_profile_actions, []) or []
includes = this_profile.get(self.key_profile_include, []) or []
if not includes:
# nothing to include
return dotfiles, actions
if self._debug:
incs = ','.join(includes)
self._dbg(f'{profile} includes {incs}')
self._dbg(f'{profile} dotfiles before include: {dotfiles}')
self._dbg(f'{profile} actions before include: {actions}')
seen = []
for i in uniq_list(includes):
if self._debug:
self._dbg(f'resolving includes "{profile}" <- "{i}"')
# ensure no include loop occurs
if i in seen:
raise YamlException('\"include loop\"')
seen.append(i)
# included profile even exists
if i not in self.profiles.keys():
self._log.warn(f'include unknown profile: {i}')
continue
# recursive resolve
if self._debug:
self._dbg(f'recursively resolving includes for profile "{i}"')
o_dfs, o_actions = self._rec_resolve_profile_include(i)
# merge dotfile keys
if self._debug:
msg = f'Merging dotfiles {profile}'
msg += f' <- {i}: {dotfiles} <- {o_dfs}'
self._dbg(msg)
dotfiles.extend(o_dfs)
this_profile[self.key_profile_dotfiles] = uniq_list(dotfiles)
# merge actions keys
if self._debug:
msg = f'Merging actions {profile} '
msg += f'<- {i}: {actions} <- {o_actions}'
self._dbg(msg)
actions.extend(o_actions)
this_profile[self.key_profile_actions] = uniq_list(actions)
dotfiles = this_profile.get(self.key_profile_dotfiles, [])
actions = this_profile.get(self.key_profile_actions, [])
if self._debug:
self._dbg(f'{profile} dotfiles after include: {dotfiles}')
self._dbg(f'{profile} actions after include: {actions}')
# since included items are resolved here
# we can clear these include
self.profiles[profile][self.key_profile_include] = []
return dotfiles, actions
########################################################
# handle imported entries
########################################################
def _import_variables(self):
"""import external variables from paths"""
paths = self.settings.get(self.key_import_variables, None)
if not paths:
return None
paths = self._resolve_paths(paths)
newvars = {}
for path in paths:
if self._debug:
self._dbg(f'import variables from {path}')
var = self._import_sub(path, self.key_variables,
mandatory=False)
if self._debug:
self._dbg(f'import dynvariables from {path}')
dvar = self._import_sub(path, self.key_dvariables,
mandatory=False)
merged = self._merge_dict(dvar, var)
merged = self._rec_resolve_variables(merged)
if dvar.keys():
self._shell_exec_dvars(merged, keys=dvar.keys())
self._clear_profile_vars(merged)
newvars = self._merge_dict(merged, newvars)
if self._debug:
self._debug_dict('imported variables', newvars)
return newvars
def _import_actions(self):
"""import external actions from paths"""
paths = self.settings.get(self.key_import_actions, None)
if not paths:
return
paths = self._resolve_paths(paths)
for path in paths:
if self._debug:
self._dbg(f'import actions from {path}')
new = self._import_sub(path, self.key_actions,
mandatory=False,
patch_func=self._norm_actions)
self.actions = self._merge_dict(new, self.actions)
def _import_profiles_dotfiles(self):
"""import profile dotfiles"""
for k, val in self.profiles.items():
imp = val.get(self.key_import_profile_dfs, None)
if not imp:
continue
if self._debug:
self._dbg(f'import dotfiles for profile {k}')
paths = self._resolve_paths(imp)
for path in paths:
current = val.get(self.key_dotfiles, [])
new = self._import_sub(path, self.key_dotfiles,
mandatory=False)
val[self.key_dotfiles] = new + current
def _import_config(self, path):
"""import config from path"""
if self._debug:
self._dbg(f'import config from {path}')
self._dbg(f'profile: {self._profile}')
self._dbg(f'included profiles: {self._inc_profiles}')
sub = CfgYaml(path, profile=self._profile,
addprofiles=self._inc_profiles,
debug=self._debug,
imported_configs=self.imported_configs)
# settings are ignored from external file
# except for filter_file and func_file
self.settings[Settings.key_func_file] += [
self._norm_path(func_file)
for func_file in sub.settings[Settings.key_func_file]
]
self.settings[Settings.key_filter_file] += [
self._norm_path(func_file)
for func_file in sub.settings[Settings.key_filter_file]
]
# merge top entries
self.dotfiles = self._merge_dict(self.dotfiles, sub.dotfiles)
self.profiles = self._merge_dict(self.profiles, sub.profiles,
deep=True)
self.actions = self._merge_dict(self.actions, sub.actions)
self.trans_r = self._merge_dict(self.trans_r, sub.trans_r)
self.trans_w = self._merge_dict(self.trans_w, sub.trans_w)
self._clear_profile_vars(sub.variables)
self.imported_configs.append(path)
self.imported_configs += sub.imported_configs
if self._debug:
self._debug_dict('add import_configs var', sub.variables)
self._add_variables(sub.variables, prio=True)
def _import_configs(self):
"""import configs from external files"""
# settings -> import_configs
imp = self.settings.get(self.key_import_configs, None)
if not imp:
return
paths = self._resolve_paths(imp)
for path in paths:
if path in self.imported_configs:
err = f'{path} imported more than once in {self._path}'
raise YamlException(err)
self._import_config(path)
def _import_sub(self, path, key, mandatory=False, patch_func=None):
"""
import the block "key" from "path"
patch_func is applied to each element if defined
"""
if self._debug:
self._dbg(f'import \"{key}\" from \"{path}\"')
extdict = self._load_yaml(path)
new = self._get_entry(extdict, key, mandatory=mandatory)
if patch_func:
if self._debug:
self._dbg(f'calling patch: {patch_func}')
new = patch_func(new)
if not new and mandatory:
err = f'no \"{key}\" imported from \"{path}\"'
self._log.warn(err)
raise YamlException(err)
if self._debug:
self._dbg(f'imported \"{key}\": {new}')
return new
########################################################
# add/remove entries
########################################################
def _new_profile(self, key):
"""add a new profile if it doesn't exist"""
if key == self.key_all:
err = f'profile key \"{key}\" is reserved'
self._log.warn(err)
raise YamlException(err)
if not key:
err = 'empty profile key'
self._log.warn(err)
raise YamlException(err)
if key not in self.profiles.keys():
# update yaml_dict
self._yaml_dict[self.key_profiles][key] = {
self.key_profile_dotfiles: []
}
self.profiles[key] = {
self.key_profile_dotfiles: []
}
if self._debug:
self._dbg(f'adding new profile: {key}')
self._dirty = True
########################################################
# handle deprecated entries
########################################################
def _fix_deprecated(self, yamldict):
"""fix deprecated entries"""
if not yamldict:
return
self._fix_deprecated_link_by_default(yamldict)
self._fix_deprecated_dotfile_link(yamldict)
def _fix_deprecated_link_by_default(self, yamldict):
"""fix deprecated link_by_default"""
old_key = 'link_by_default'
newkey = self.key_imp_link
if self.key_settings not in yamldict:
return
if not yamldict[self.key_settings]:
return
config = yamldict[self.key_settings]
if old_key not in config:
return
if config[old_key]:
config[newkey] = self.lnk_link
else:
config[newkey] = self.lnk_nolink
del config[old_key]
self._log.warn('deprecated \"link_by_default\"')
self._dirty = True
self._dirty_deprecated = True
def _fix_deprecated_dotfile_link(self, yamldict):
"""fix deprecated link in dotfiles"""
old_key = 'link_children'
if self.key_dotfiles not in yamldict:
return
if not yamldict[self.key_dotfiles]:
return
for _, dotfile in yamldict[self.key_dotfiles].items():
if self.key_dotfile_link in dotfile and \
isinstance(dotfile[self.key_dotfile_link], bool):
# patch link: <bool>
cur = dotfile[self.key_dotfile_link]
new = self.lnk_nolink
if cur:
new = self.lnk_link
dotfile[self.key_dotfile_link] = new
self._dirty = True
self._dirty_deprecated = True
warn = 'deprecated \"link: <boolean>\"'
warn += f', updated to \"link: {new}\"'
self._log.warn(warn)
if self.key_dotfile_link in dotfile and \
dotfile[self.key_dotfile_link] == self.lnk_link:
# patch "link: link"
# to "link: absolute"
new = self.lnk_absolute
dotfile[self.key_dotfile_link] = new
self._dirty = True
self._dirty_deprecated = True
warn = 'deprecated \"link: link\"'
warn += f', updated to \"link: {new}\"'
self._log.warn(warn)
if old_key in dotfile and \
isinstance(dotfile[old_key], bool):
# patch link_children: <bool>
cur = dotfile[old_key]
new = self.lnk_nolink
if cur:
new = self.lnk_children
del dotfile[old_key]
dotfile[self.key_dotfile_link] = new
self._dirty = True
self._dirty_deprecated = True
warn = 'deprecated \"link_children\" value'
warn += f', updated to \"{new}\"'
self._log.warn(warn)
########################################################
# yaml utils
########################################################
def _prepare_to_save(self, content):
content = self._clear_none(content)
# make sure we have the base entries
if self.key_settings not in content:
content[self.key_settings] = None
if self.key_dotfiles not in content:
content[self.key_dotfiles] = None
if self.key_profiles not in content:
content[self.key_profiles] = None
return content
def _load_yaml(self, path):
"""load a yaml file to a dict"""
content = {}
if self._debug:
self._dbg(f'----------dump:{path}----------')
cfg = '\n'
with open(path, 'r', encoding='utf8') as file:
for line in file:
cfg += line
self._dbg(cfg.rstrip())
self._dbg(f'----------end:{path}----------')
try:
content, fmt = self._yaml_load(path)
self._config_format = fmt
except Exception as exc:
self._log.err(exc)
err = f'config yaml error: {path}'
raise YamlException(err) from exc
if self._debug:
self._dbg(f'format: {self._config_format}')
return content
def _validate(self, yamldict):
"""validate entries"""
if not yamldict:
return
# check top entries
for entry in self.top_entries:
if entry not in yamldict:
err = f'no {entry} entry found'
self._log.err(err)
raise YamlException(f'config format error: {err}')
# check link_dotfile_default
if self.key_settings not in yamldict:
# no configs top entry
return
if not yamldict[self.key_settings]:
# configs empty
return
# check settings values
settings = yamldict[self.key_settings]
if self.key_settings_link_dotfile_default not in settings:
return
val = settings[self.key_settings_link_dotfile_default]
if val not in self.allowed_link_val:
err = f'bad link value: {val}'
self._log.err(err)
self._log.err(f'allowed: {self.allowed_link_val}')
raise YamlException(f'config content error: {err}')
@classmethod
def _yaml_load(cls, path):
"""load config file"""
is_toml = path.lower().endswith(".toml")
if is_toml:
return cls.__toml_load(path), 'toml'
return cls.__yaml_load(path), 'yaml'
@classmethod
def __yaml_load(cls, path):
"""load from yaml"""
with open(path, 'r', encoding='utf8') as file:
data = yaml()
data.typ = 'rt'
content = data.load(file)
return content
@classmethod
def __toml_load(cls, path):
"""load from toml"""
with open(path, 'r', encoding='utf8') as file:
data = file.read()
content = toml.loads(data)
# handle inexistent dotfiles/profiles
# since toml doesn't have a nul/nil/null/none
if cls.key_dotfiles not in content:
content[cls.key_dotfiles] = None
if cls.key_profiles not in content:
content[cls.key_profiles] = None
return content
@classmethod
def _yaml_dump(cls, content, file, fmt='yaml'):
"""dump config file"""
if fmt == 'toml':
return cls.__toml_dump(content, file)
if fmt == 'yaml':
return cls.__yaml_dump(content, file)
raise YamlException("unsupported format")
@classmethod
def __yaml_dump(cls, content, file):
"""dump to yaml"""
data = yaml()
data.default_flow_style = False
data.indent = 2
data.typ = 'rt'
data.dump(content, file)
@classmethod
def __toml_dump(cls, content, file):
"""dump to toml"""
toml.dump(content, file)
########################################################
# templating
########################################################
def _redefine_templater(self):
"""create templater based on current variables"""
fufile = None
fifile = None
if Settings.key_func_file in self.settings:
fufile = self.settings[Settings.key_func_file]
if Settings.key_filter_file in self.settings:
fifile = self.settings[Settings.key_filter_file]
self._tmpl = Templategen(variables=self.variables,
func_file=fufile,
filter_file=fifile)
def _template_item(self, item, exc_if_fail=True):
"""
template an item using the templategen
will raise an exception if template failed and exc_if_fail
"""
if not Templategen.string_is_template(item):
return item
try:
val = item
while Templategen.string_is_template(val):
val = self._tmpl.generate_string(val)
except UndefinedException as exc:
if exc_if_fail:
raise exc
return val
def _template_list(self, entries):
"""template a list of entries"""
new = []
if not entries:
return new
for entry in entries:
newe = self._template_item(entry)
if self._debug and entry != newe:
self._dbg(f'resolved: {entry} -> {newe}')
new.append(newe)
return new
def _template_dict(self, entries):
"""template a dictionary of entries"""
new = {}
if not entries:
return new
for k, val in entries.items():
newv = self._template_item(val)
if self._debug and val != newv:
self._dbg(f'resolved: {val} -> {newv}')
new[k] = newv
return new
def _template_dotfiles_entries(self):
"""template dotfiles entries"""
if self._debug:
self._dbg('templating dotfiles entries')
dotfiles = self.dotfiles.copy()
# make sure no dotfiles path is None
for dotfile in dotfiles.values():
src = dotfile[self.key_dotfile_src]
if src is None:
dotfile[self.key_dotfile_src] = ''
dst = dotfile[self.key_dotfile_dst]
if dst is None:
dotfile[self.key_dotfile_dst] = ''
# resolve links before taking subset of
# dotfiles to avoid issues in upper layer
for dotfile in dotfiles.values():
# link
if self.key_dotfile_link in dotfile:
# normalize the link value
link = dotfile[self.key_dotfile_link]
newlink = self._resolve_dotfile_link(link)
dotfile[self.key_dotfile_link] = newlink
# only keep dotfiles related to the selected profile
pdfs = []
pro = self.profiles.get(self._profile, [])
if pro:
pdfs = list(pro.get(self.key_profile_dotfiles, []))
# and any included profiles
for addpro in self._inc_profiles:
pro = self.profiles.get(addpro, [])
if not pro:
continue
pdfsalt = pro.get(self.key_profile_dotfiles, [])
pdfs.extend(pdfsalt)
pdfs = uniq_list(pdfs)
# if ALL is defined
if self.key_all not in pdfs:
# take a subset of the dotfiles
newdotfiles = {}
for k, val in dotfiles.items():
if k in pdfs:
newdotfiles[k] = val
dotfiles = newdotfiles
for dotfile in dotfiles.values():
# src
src = dotfile[self.key_dotfile_src]
newsrc = self.resolve_dotfile_src(src, templater=self._tmpl)
dotfile[self.key_dotfile_src] = newsrc
# dst
dst = dotfile[self.key_dotfile_dst]
newdst = self.resolve_dotfile_dst(dst, templater=self._tmpl)
dotfile[self.key_dotfile_dst] = newdst
def _rec_resolve_variables(self, variables):
"""recursive resolve variables"""
var = self._enrich_vars(variables, self._profile)
# use a separated templategen to handle variables
# resolved outside the main config
func_files = self.settings[Settings.key_func_file]
filter_files = self.settings[Settings.key_filter_file]
templ = Templategen(variables=var,
func_file=func_files,
filter_file=filter_files)
newvars = variables.copy()
for k in variables.keys():
val = variables[k]
while Templategen.var_is_template(val):
val = templ.generate_string_or_dict(val)
if isinstance(val, dict):
for sub in val:
subkey = f'{k}.{sub}'
newvars[subkey] = val[sub]
else:
newvars[k] = val
templ.update_variables(newvars)
if newvars is self.variables:
self._redefine_templater()
return newvars
def _get_profile_included_vars(self):
"""
resolve profile included variables/dynvariables
returns inc_profiles, inc_var, inc_dvar
"""
for _, val in self.profiles.items():
if self.key_profile_include in val and \
val[self.key_profile_include]:
new = []
for entry in val[self.key_profile_include]:
new.append(self._tmpl.generate_string(entry))
val[self.key_profile_include] = new
# now get the included ones
pro_var = self._get_profile_included_item(self.key_profile_variables)
pro_dvar = self._get_profile_included_item(self.key_profile_dvariables)
# the included profiles
inc_profiles = []
if self._profile and self._profile in self.profiles:
pentry = self.profiles.get(self._profile)
inc_profiles = pentry.get(self.key_profile_include, [])
# exec incl dynvariables
return inc_profiles, pro_var, pro_dvar
########################################################
# helpers
########################################################
def _clear_profile_vars(self, dic):
"""
remove profile variables from dic if found inplace
to avoid profile variables being overwriten
"""
if not dic:
return
for k in self._profilevarskeys:
dic.pop(k, None)
def _parse_extended_import_path(self, path_entry):
"""Parse an import path in a tuple (path, fatal_not_found)."""
if self._debug:
self._dbg(f'parsing path entry {path_entry}')
path, _, attribute = path_entry.rpartition(self.key_import_sep)
fatal_not_found = attribute != self.key_import_ignore_key
is_valid_attribute = attribute in ('', self.key_import_ignore_key)
if not is_valid_attribute:
# If attribute is not valid it can mean that:
# - path_entry doesn't contain the separator, and attribute is set
# to the whole path by str.rpartition
# - path_entry contains a separator, but it's in the file path, so
# attribute is set to whatever comes after the separator by
# str.rpartition
# In both cases, path_entry is the path we're looking for.
if self._debug:
msg = 'using attribute default values'
msg += f' for path {path_entry}'
self._dbg(msg)
path = path_entry
fatal_not_found = self.key_import_fatal_not_found
elif self._debug:
msg = f'path entry {path_entry} has fatal_not_found'
msg += f' flag set to {fatal_not_found}'
self._dbg(msg)
return path, fatal_not_found
def _handle_non_existing_path(self, path, fatal_not_found=True):
"""Raise an exception or log a warning to handle non-existing paths."""
error = f'bad path {path}'
if fatal_not_found:
raise YamlException(error)
self._log.warn(error)
def _check_path_existence(self, path, fatal_not_found=True):
"""Check if a path exists, raising if necessary."""
if os.path.exists(path):
if self._debug:
self._dbg(f'path {path} exists')
return path
self._handle_non_existing_path(path, fatal_not_found)
# Explicit return for readability. Anything evaluating to false is ok.
return None
def _process_path(self, path_entry):
"""
This method processed a path entry. Namely it:
- Normalizes the path.
- Expands globs.
- Checks for path existence, taking in account fatal_not_found.
This method always returns a list containing only absolute paths
existing on the filesystem. If the input is not a glob, the list
contains at most one element, otherwise it could hold more.
"""
path, fatal_not_found = self._parse_extended_import_path(path_entry)
path = self._norm_path(path)
paths = self._glob_path(path) if self._is_glob(path) else [path]
if not paths:
if self._debug:
self._dbg(f"glob path {path} didn't expand")
self._handle_non_existing_path(path, fatal_not_found)
return []
checked_paths = (self._check_path_existence(p, fatal_not_found)
for p in paths)
return [p for p in checked_paths if p]
def _resolve_paths(self, paths):
"""
This function resolves a list of paths. This means normalizing,
expanding globs and checking for existence, taking in account
fatal_not_found flags.
"""
processed_paths = (self._process_path(p) for p in paths)
return list(chain.from_iterable(processed_paths))
@classmethod
def _merge_dict(cls, high, low, deep=False):
"""
both dict must be the same form/type
if deep is True, then merge recursively
"""
if not high:
high = {}
if not low:
low = {}
if not high and not low:
return {}
if not deep:
return {**low, **high}
final = high.copy()
for k, val in low.items():
if isinstance(val, dict):
# content is dict, recurse
if k not in final:
final[k] = {}
final[k] = cls._merge_dict(val, final[k], deep=True)
elif isinstance(val, list):
# content is list, merge
if k not in final:
final[k] = []
final[k] += val
else:
# don't know how to handle
err = 'unable to merge'
raise YamlException(err)
return final
@classmethod
def _get_entry(cls, dic, key, mandatory=True):
"""return copy of entry from yaml dictionary"""
if key not in dic:
if mandatory:
err = f'invalid config: no entry \"{key}\" found'
raise YamlException(err)
dic[key] = {}
return deepcopy(dic[key])
if mandatory and not dic[key]:
# ensure is not none
dic[key] = {}
return deepcopy(dic[key])
def _clear_none(self, dic):
"""recursively delete all none/empty values in a dictionary."""
new = {}
for k, val in dic.items():
if k == self.key_dotfile_src:
# allow empty dotfile src
new[k] = val
continue
if k == self.key_dotfile_dst:
# allow empty dotfile dst
new[k] = val
continue
newv = val
if isinstance(val, dict):
# recursive travers dict
newv = self._clear_none(val)
if not newv:
# no empty dict
continue
if newv is None:
# no None value
continue
if isinstance(newv, list) and not newv:
# no empty list
continue
new[k] = newv
return new
@classmethod
def _is_glob(cls, path):
"""Quick test if path is a glob."""
return '*' in path or '?' in path
def _glob_path(self, path):
"""Expand a glob."""
if self._debug:
self._dbg(f'expanding glob {path}')
expanded_path = os.path.expanduser(path)
return glob.glob(expanded_path, recursive=True)
def _norm_path(self, path):
"""Resolve a path either absolute or relative to config path"""
if not path:
return path
path = os.path.expanduser(path)
if not os.path.isabs(path):
dirn = os.path.dirname(self._path)
ret = os.path.join(dirn, path)
if self._debug:
msg = f'normalizing relative to cfg: {path} -> {ret}'
self._dbg(msg)
path = ret
ret = os.path.normpath(path)
if self._debug and path != ret:
self._dbg(f'normalizing: {path} -> {ret}')
return ret
def _shell_exec_dvars(self, dic, keys=None):
"""shell execute dynvariables in-place"""
if not keys:
keys = dic.keys()
for k in keys:
val = dic[k]
ret, out = shellrun(val, debug=self._debug)
if not ret:
err = f'var \"{k}: {val}\" failed: {out}'
self._log.err(err)
raise YamlException(err)
if self._debug:
self._dbg(f'{k}: `{val}` -> {out}')
dic[k] = out
self._debug_dict('dynvars after', dic)
@classmethod
def _check_minversion(cls, minversion):
if not minversion:
return
try:
cur = ([int(x) for x in VERSION.split('.')])
cfg = ([int(x) for x in minversion.split('.')])
except Exception as exc:
err = f'bad version: \"{VERSION}\" VS \"{minversion}\"'
raise YamlException(err) from exc
if cur < cfg:
err = 'current dotdrop version is too old for that config file.'
err += ' Please update.'
raise YamlException(err)
def _debug_entries(self):
"""debug print all interesting entries"""
if not self._debug:
return
self._dbg('Current entries')
self._debug_dict('entry settings', self.settings)
self._debug_dict('entry dotfiles', self.dotfiles)
self._debug_dict('entry profiles', self.profiles)
self._debug_dict('entry actions', self.actions)
self._debug_dict('entry trans_r', self.trans_r)
self._debug_dict('entry trans_w', self.trans_w)
self._debug_dict('entry variables', self.variables)
def _debug_dict(self, title, elems):
"""pretty print dict"""
if not self._debug:
return
self._dbg(f'{title}:')
if not elems:
return
for k, val in elems.items():
if isinstance(val, dict):
self._dbg(f' - \"{k}\"')
for subkey, sub in val.items():
self._dbg(f' * {subkey}: \"{sub}\"')
else:
self._dbg(f' - \"{k}\": {val}')
def _dbg(self, content):
directory = os.path.basename(os.path.dirname(self._path))
pre = os.path.join(directory, os.path.basename(self._path))
self._log.dbg(f'[{pre}] {content}')
def _save_uservariables(self, uvars):
"""save uservariables to file"""
parent = os.path.dirname(self._path)
# find a unique path
path = None
cnt = 0
while True:
if cnt == 0:
name = self.save_uservariables_name.format('')
else:
name = self.save_uservariables_name.format(f'-{cnt}')
cnt += 1
path = os.path.join(parent, name)
if not os.path.exists(path):
break
# save the config
content = {'variables': uvars}
try:
if self._debug:
self._dbg(f'saving uservariables values to {path}')
with open(path, 'w', encoding='utf8') as file:
self._yaml_dump(content, file, fmt=self._config_format)
except Exception as exc:
# self._log.err(exc)
err = f'error saving uservariables to {path}'
self._log.err(err)
raise YamlException(err) from exc
self._log.log(f'uservariables values saved to {path}')