""" author: deadc0de6 (https://github.com/deadc0de6) Copyright (c) 2019, deadc0de6 handle lower level of the config file will provide the following dictionaries to the upper layer: * self.settings * self.dotfiles * self.profiles * self.actions * self.trans_r * self.trans_w * self.variables Additionally a few methods are exported. """ # pylint: disable=C0302 import os import glob import io from copy import deepcopy from itertools import chain from ruamel.yaml import YAML as yaml import toml # local imports from dotdrop.version import __version__ as VERSION from dotdrop.settings import Settings from dotdrop.logger import Logger from dotdrop.templategen import Templategen from dotdrop.linktypes import LinkTypes from dotdrop.utils import shellrun, uniq_list, userinput from dotdrop.exceptions import YamlException, UndefinedException class CfgYaml: """yaml config file parser""" # global entries key_settings = Settings.key_yaml key_dotfiles = 'dotfiles' key_profiles = 'profiles' key_actions = 'actions' old_key_trans_r = 'trans' key_trans_r = 'trans_read' key_trans_w = 'trans_write' key_variables = 'variables' key_dvariables = 'dynvariables' key_uvariables = 'uservariables' action_pre = 'pre' action_post = 'post' save_uservariables_name = 'uservariables{}.yaml' # profiles/dotfiles entries key_dotfile_src = 'src' key_dotfile_dst = 'dst' key_dotfile_link = 'link' key_dotfile_actions = 'actions' key_dotfile_noempty = 'ignoreempty' key_dotfile_template = 'template' key_dotfile_chmod = 'chmod' # chmod value chmod_ignore = 'preserve' # profile key_profile_dotfiles = 'dotfiles' key_profile_include = 'include' key_profile_variables = 'variables' key_profile_dvariables = 'dynvariables' key_profile_actions = 'actions' key_all = 'ALL' # import entries key_import_actions = 'import_actions' key_import_configs = 'import_configs' key_import_variables = 'import_variables' key_import_profile_dfs = 'import' key_import_sep = ':' key_import_ignore_key = 'optional' key_import_fatal_not_found = True # settings key_settings_dotpath = Settings.key_dotpath key_settings_workdir = Settings.key_workdir key_settings_link_dotfile_default = Settings.key_link_dotfile_default key_settings_noempty = Settings.key_ignoreempty key_settings_minversion = Settings.key_minversion key_imp_link = Settings.key_link_on_import key_settings_template = Settings.key_template_dotfile_default # link values lnk_nolink = LinkTypes.NOLINK.name.lower() lnk_link = LinkTypes.LINK.name.lower() lnk_children = LinkTypes.LINK_CHILDREN.name.lower() lnk_absolute = LinkTypes.ABSOLUTE.name.lower() lnk_relative = LinkTypes.RELATIVE.name.lower() # checks allowed_link_val = [lnk_nolink, lnk_link, lnk_children, lnk_absolute, lnk_relative] top_entries = [key_dotfiles, key_settings, key_profiles] def __init__(self, path, profile=None, addprofiles=None, reloading=False, debug=False, imported_configs=None): """ config parser @path: config file path @profile: the selected profile names @addprofiles: included profiles names (list) @reloading: true when reloading @imported_configs: paths of config files that have been imported so far @debug: debug flag """ self._path = os.path.abspath(path) self._profile = profile self._reloading = reloading self._debug = debug self._log = Logger(debug=self._debug) # config format self._config_format = 'yaml' # config needs to be written self._dirty = False # indicates the config has been updated self._dirty_deprecated = False # profile variables self._profilevarskeys = [] # included profiles self._inc_profiles = addprofiles or [] # imported configs self.imported_configs = imported_configs or [] # init the dictionaries self.settings = {} self.dotfiles = {} self.profiles = {} self.actions = {} self.trans_r = {} self.trans_w = {} self.variables = {} if not os.path.exists(self._path): err = f'invalid config path: \"{path}\"' if self._debug: self._dbg(err) raise YamlException(err) self._dbg('START of config parsing') self._dbg(f'reloading: {reloading}') self._dbg(f'profile: {profile}') pfs = ','.join(self._inc_profiles) self._dbg(f'included profiles: {pfs}') self._yaml_dict = self._load_yaml(self._path) # live patch deprecated entries self._fix_deprecated(self._yaml_dict) # validate content self._validate(self._yaml_dict) ################################################## # parse the config and variables ################################################## # parse the "config" block self.settings = self._parse_blk_settings(self._yaml_dict) # base templater (when no vars/dvars exist) self.variables = self._enrich_vars(self.variables, self._profile) self._redefine_templater() # variables and dynvariables need to be first merged # before being templated in order to allow cyclic # references between them # parse the "variables" block var = self._parse_blk_variables(self._yaml_dict) self._add_variables(var, template=False) # parse the "dynvariables" block dvariables = self._parse_blk_dynvariables(self._yaml_dict) self._add_variables(dvariables, template=False) # now template variables and dynvariables from the same pool self._rec_resolve_variables(self.variables) # and execute dvariables # since this is done after recursively resolving variables # and dynvariables this means that variables referencing # dynvariables will result with the not executed value if dvariables.keys(): self._shell_exec_dvars(self.variables, keys=dvariables.keys()) # finally redefine the template self._redefine_templater() if self._debug: title = 'current variables defined' self._debug_dict(title, self.variables) # parse the "profiles" block self.profiles = self._parse_blk_profiles(self._yaml_dict) # include the profile's variables/dynvariables last # as it overwrites existing ones incpro, pvar, pdvar = self._get_profile_included_vars() self._inc_profiles = uniq_list(self._inc_profiles + incpro) self._add_variables(pvar, prio=True) self._add_variables(pdvar, shell=True, prio=True) self._profilevarskeys.extend(pvar.keys()) self._profilevarskeys.extend(pdvar.keys()) # template variables self.variables = self._template_dict(self.variables) if self._debug: title = 'variables defined (after template dict)' self._debug_dict(title, self.variables) ################################################## # template the "include" entries ################################################## self._template_include_entry() if self._debug: title = 'variables defined (after template include)' self._debug_dict(title, self.variables) ################################################## # template config entries ################################################## entry = self.settings[self.key_settings_dotpath] val = self._template_item(entry) self.settings[self.key_settings_dotpath] = val ################################################## # parse the other blocks ################################################## # parse the "dotfiles" block self.dotfiles = self._parse_blk_dotfiles(self._yaml_dict) # parse the "actions" block self.actions = self._parse_blk_actions(self._yaml_dict) # parse the "trans_r" block self.trans_r = self._parse_blk_trans_r(self._yaml_dict) # parse the "trans_w" block self.trans_w = self._parse_blk_trans_w(self._yaml_dict) ################################################## # import elements ################################################## # process imported variables (import_variables) newvars = self._import_variables() self._clear_profile_vars(newvars) self._add_variables(newvars) # process imported actions (import_actions) self._import_actions() # process imported profile dotfiles (import) self._import_profiles_dotfiles() # process imported configs (import_configs) self._import_configs() # process profile include self._resolve_profile_includes() # add the current profile variables _, pvar, pdvar = self._get_profile_included_vars() self._add_variables(pvar, prio=False) self._add_variables(pdvar, shell=True, prio=False) self._profilevarskeys.extend(pvar.keys()) self._profilevarskeys.extend(pdvar.keys()) # resolve variables self._clear_profile_vars(newvars) self._add_variables(newvars) # process profile ALL self._resolve_profile_all() # template dotfiles entries self._template_dotfiles_entries() # parse the "uservariables" block uvariables = self._parse_blk_uservariables(self._yaml_dict, self.variables) self._add_variables(uvariables, template=False, prio=False) # end of parsing if self._debug: self._dbg('########### final config ###########') self._debug_entries() self._dbg('END of config parsing') ######################################################## # public methods ######################################################## def _resolve_dotfile_link(self, link): """resolve dotfile link entry""" newlink = self._template_item(link) # check link value if newlink not in self.allowed_link_val: err = f'bad link value: {newlink}' self._log.err(err) self._log.err(f'allowed: {self.allowed_link_val}') raise YamlException(f'config content error: {err}') return newlink def resolve_dotfile_src(self, src, templater=None): """ get abs src file from a relative path in dotpath """ newsrc = '' if src: new = src if templater: new = templater.generate_string(src) if new != src and self._debug: msg = f'dotfile src: \"{src}\" -> \"{new}\"' self._dbg(msg) src = new src = os.path.join(self.settings[self.key_settings_dotpath], src) newsrc = self._norm_path(src) return newsrc def resolve_dotfile_dst(self, dst, templater=None): """resolve dotfile dst path""" newdst = '' if dst: new = dst if templater: new = templater.generate_string(dst) if new != dst and self._debug: msg = f'dotfile dst: \"{dst}\" -> \"{new}\"' self._dbg(msg) dst = new newdst = self._norm_path(dst) return newdst def add_dotfile_to_profile(self, dotfile_key, profile_key): """ add an existing dotfile key to a profile_key if profile_key doesn't exist, the profile is created we test using profiles variable since it merges imported ones (include, etc) but insert in main yaml only return True if was added """ # create the profile if it doesn't exist self._new_profile(profile_key) if profile_key not in self.profiles: return False profile = self.profiles[profile_key] # ensure profile dotfiles list is not None if self.key_profile_dotfiles not in profile or \ profile[self.key_profile_dotfiles] is None: profile[self.key_profile_dotfiles] = [] self._yaml_dict[self.key_profiles][profile_key] = [] # add to the profile pdfs = profile[self.key_profile_dotfiles] if self.key_all not in pdfs and \ dotfile_key not in pdfs: # append dotfile pro = self._yaml_dict[self.key_profiles][profile_key] pro[self.key_profile_dotfiles].append(dotfile_key) if self._debug: msg = f'add \"{dotfile_key}\" to profile \"{profile_key}\"' self._dbg(msg) self._dirty = True return self._dirty def get_all_dotfile_keys(self): """return all existing dotfile keys""" return self.dotfiles.keys() def _update_dotfile_chmod(self, key, dotfile, chmod): old = None if self.key_dotfile_chmod in dotfile: old = dotfile[self.key_dotfile_chmod] if old == self.chmod_ignore: msg = ( 'ignore chmod change since ' f'{self.chmod_ignore}' ) self._dbg(msg) return False if old == chmod: return False if self._debug: self._dbg(f'update dotfile: {key}') self._dbg(f'old chmod value: {old}') self._dbg(f'new chmod value: {chmod}') dotfile = self._yaml_dict[self.key_dotfiles][key] if not chmod: del dotfile[self.key_dotfile_chmod] else: dotfile[self.key_dotfile_chmod] = str(format(chmod, 'o')) return True def update_dotfile(self, key, chmod): """ update an existing dotfile return true if updated """ if key not in self.dotfiles.keys(): return False dotfile = self._yaml_dict[self.key_dotfiles][key] if not self._update_dotfile_chmod(key, dotfile, chmod): return False self._dirty = True return True def add_dotfile(self, key, src, dst, link, chmod=None, trans_r_key=None, trans_w_key=None): """add a new dotfile""" if key in self.dotfiles.keys(): return False if self._debug: self._dbg(f'adding new dotfile: {key}') self._dbg(f'new dotfile src: {src}') self._dbg(f'new dotfile dst: {dst}') self._dbg(f'new dotfile link: {link}') self._dbg(f'new dotfile chmod: {chmod}') self._dbg(f'new dotfile trans_r: {trans_r_key}') self._dbg(f'new dotfile trans_w: {trans_w_key}') # create the dotfile dict df_dict = { self.key_dotfile_src: src, self.key_dotfile_dst: dst, } # link dfl = self.settings[self.key_settings_link_dotfile_default] if str(link) != dfl: df_dict[self.key_dotfile_link] = str(link) # chmod if chmod: df_dict[self.key_dotfile_chmod] = str(format(chmod, 'o')) # trans_r/trans_w if trans_r_key: df_dict[self.key_trans_r] = str(trans_r_key) if trans_w_key: df_dict[self.key_trans_w] = str(trans_w_key) if self._debug: self._dbg(f'dotfile dict: {df_dict}') # add to global dict self._yaml_dict[self.key_dotfiles][key] = df_dict self._dirty = True return True def del_dotfile(self, key): """remove this dotfile from config""" if key not in self._yaml_dict[self.key_dotfiles]: self._log.err(f'key not in dotfiles: {key}') return False if self._debug: self._dbg(f'remove dotfile: {key}') del self._yaml_dict[self.key_dotfiles][key] if self._debug: dfs = self._yaml_dict[self.key_dotfiles] self._dbg(f'new dotfiles: {dfs}') self._dirty = True return True def del_dotfile_from_profile(self, df_key, pro_key): """remove this dotfile from that profile""" if self._debug: self._dbg(f'removing \"{df_key}\" from \"{pro_key}\"') if df_key not in self.dotfiles.keys(): self._log.err(f'key not in dotfiles: {df_key}') return False if pro_key not in self.profiles.keys(): self._log.err(f'key not in profile: {pro_key}') return False # get the profile dictionary profile = self._yaml_dict[self.key_profiles][pro_key] if self.key_profile_dotfiles not in profile: # profile does not contain any dotfiles return True if df_key not in profile[self.key_profile_dotfiles]: return True if self._debug: dfs = profile[self.key_profile_dotfiles] self._dbg(f'{pro_key} profile dotfiles: {dfs}') self._dbg(f'remove {df_key} from profile {pro_key}') profile[self.key_profile_dotfiles].remove(df_key) if self._debug: dfs = profile[self.key_profile_dotfiles] self._dbg(f'{pro_key} profile dotfiles: {dfs}') self._dirty = True return True def save(self): """save this instance and return True if saved""" if not self._dirty: return False content = self._prepare_to_save(self._yaml_dict) if self._dirty_deprecated: # add minversion settings = content[self.key_settings] settings[self.key_settings_minversion] = VERSION # save to file if self._debug: self._dbg(f'saving to {self._path}') try: with open(self._path, 'w', encoding='utf8') as file: self._yaml_dump(content, file, fmt=self._config_format) except Exception as exc: self._log.err(exc) err = f'error saving config: {self._path}' raise YamlException(err) from exc if self._dirty_deprecated: warn = 'your config contained deprecated entries' warn += ' and was updated' self._log.warn(warn) self._dirty = False return True def dump(self): """dump the config dictionary""" output = io.StringIO() content = self._prepare_to_save(self._yaml_dict.copy()) self._yaml_dump(content, output, fmt=self._config_format) return output.getvalue() ######################################################## # block parsing ######################################################## def _parse_blk_settings(self, dic): """parse the "config" block""" block = self._get_entry(dic, self.key_settings).copy() # set defaults settings = Settings(None).serialize().get(self.key_settings) settings.update(block) # resolve minimum version if self.key_settings_minversion in settings: minversion = settings[self.key_settings_minversion] self._check_minversion(minversion) # normalize paths paths = self._norm_path(settings[self.key_settings_dotpath]) settings[self.key_settings_dotpath] = paths paths = self._norm_path(settings[self.key_settings_workdir]) settings[self.key_settings_workdir] = paths paths = [ self._norm_path(path) for path in settings[Settings.key_filter_file] ] settings[Settings.key_filter_file] = paths paths = [ self._norm_path(path) for path in settings[Settings.key_func_file] ] settings[Settings.key_func_file] = paths if self._debug: self._debug_dict('settings block:', settings) return settings def _parse_blk_dotfiles(self, dic): """parse the "dotfiles" block""" dotfiles = self._get_entry(dic, self.key_dotfiles).copy() keys = dotfiles.keys() if len(keys) != len(list(set(keys))): dups = [x for x in keys if x not in list(set(keys))] err = f'duplicate dotfile keys found: {dups}' raise YamlException(err) dotfiles = self._norm_dotfiles(dotfiles) if self._debug: self._debug_dict('dotfiles block', dotfiles) return dotfiles def _parse_blk_profiles(self, dic): """parse the "profiles" block""" profiles = self._get_entry(dic, self.key_profiles).copy() profiles = self._norm_profiles(profiles) if self._debug: self._debug_dict('profiles block', profiles) return profiles def _parse_blk_actions(self, dic): """parse the "actions" block""" actions = self._get_entry(dic, self.key_actions, mandatory=False) if actions: actions = actions.copy() actions = self._norm_actions(actions) if self._debug: self._debug_dict('actions block', actions) return actions def _parse_blk_trans_r(self, dic): """parse the "trans_r" block""" key = self.key_trans_r if self.old_key_trans_r in dic: msg = '\"trans\" is deprecated, please use \"trans_read\"' self._log.warn(msg) dic[self.key_trans_r] = dic[self.old_key_trans_r] del dic[self.old_key_trans_r] trans_r = self._get_entry(dic, key, mandatory=False) if trans_r: trans_r = trans_r.copy() if self._debug: self._debug_dict('trans_r block', trans_r) return trans_r def _parse_blk_trans_w(self, dic): """parse the "trans_w" block""" trans_w = self._get_entry(dic, self.key_trans_w, mandatory=False) if trans_w: trans_w = trans_w.copy() if self._debug: self._debug_dict('trans_w block', trans_w) return trans_w def _parse_blk_variables(self, dic): """parse the "variables" block""" variables = self._get_entry(dic, self.key_variables, mandatory=False) if variables: variables = variables.copy() if self._debug: self._debug_dict('variables block', variables) return variables def _parse_blk_dynvariables(self, dic): """parse the "dynvariables" block""" dvariables = self._get_entry(dic, self.key_dvariables, mandatory=False) if dvariables: dvariables = dvariables.copy() if self._debug: self._debug_dict('dynvariables block', dvariables) return dvariables def _parse_blk_uservariables(self, dic, current): """parse the "uservariables" block""" uvariables = self._get_entry(dic, self.key_uvariables, mandatory=False) uvars = {} if not self._reloading and uvariables: try: for name, prompt in uvariables.items(): if name in current: # ignore if already defined if self._debug: self._dbg(f'ignore uservariables {name}') continue content = userinput(prompt, debug=self._debug) uvars[name] = content except KeyboardInterrupt as exc: raise YamlException('interrupted') from exc if uvars: uvars = uvars.copy() if self._debug: self._debug_dict('uservariables block', uvars) # save uservariables if uvars: try: self._save_uservariables(uvars) except YamlException: # ignore pass return uvars ######################################################## # parsing helpers ######################################################## def _template_include_entry(self): """template all "include" entries""" # import_actions new = [] entries = self.settings.get(self.key_import_actions, []) new = self._template_list(entries) if new: self.settings[self.key_import_actions] = new # import_configs entries = self.settings.get(self.key_import_configs, []) new = self._template_list(entries) if new: self.settings[self.key_import_configs] = new # import_variables entries = self.settings.get(self.key_import_variables, []) new = self._template_list(entries) if new: self.settings[self.key_import_variables] = new # profile's import for _, val in self.profiles.items(): entries = val.get(self.key_import_profile_dfs, []) new = self._template_list(entries) if new: val[self.key_import_profile_dfs] = new def _norm_actions(self, actions): """ ensure each action is either pre or post explicitely action entry of the form {action_key: (pre|post, action)} """ if not actions: return actions new = {} for k, val in actions.items(): if k in (self.action_pre, self.action_post): for key, action in val.items(): new[key] = (k, action) else: new[k] = (self.action_post, val) return new def _norm_profiles(self, profiles): """normalize profiles entries""" if not profiles: return profiles new = {} for k, val in profiles.items(): if k == self.key_all: msg = f'\"{self.key_all}\" is a special profile name, ' msg += 'consider renaming to avoid any issue.' self._log.warn(msg) if not k: msg = 'empty profile name' self._log.warn(msg) continue if not val: # no dotfiles continue # add dotfiles entry if not present if self.key_profile_dotfiles not in val: val[self.key_profile_dotfiles] = [] new[k] = val return new def _norm_dotfile_chmod(self, entry): value = str(entry[self.key_dotfile_chmod]) if value == self.chmod_ignore: # is preserve return if len(value) < 3: # bad format err = f'bad format for chmod: {value}' self._log.err(err) raise YamlException(f'config content error: {err}') # check is valid value try: int(value) except Exception as exc: err = f'bad format for chmod: {value}' self._log.err(err) err = f'config content error: {err}' raise YamlException(err) from exc # normalize chmod value for chmodv in list(value): chmodint = int(chmodv) if chmodint < 0 or chmodint > 7: err = f'bad format for chmod: {value}' self._log.err(err) raise YamlException( f'config content error: {err}' ) # octal entry[self.key_dotfile_chmod] = int(value, 8) def _norm_dotfiles(self, dotfiles): """normalize and check dotfiles entries""" if not dotfiles: return dotfiles new = {} for k, val in dotfiles.items(): if self.key_dotfile_src not in val: # add 'src' as key' if not present val[self.key_dotfile_src] = k new[k] = val else: new[k] = val if self.old_key_trans_r in val: # fix deprecated trans key msg = f'{k} \"trans\" is deprecated, please use \"trans_read\"' self._log.warn(msg) val[self.key_trans_r] = val[self.old_key_trans_r] del val[self.old_key_trans_r] new[k] = val if self.key_dotfile_link not in val: # apply link value if undefined value = self.settings[self.key_settings_link_dotfile_default] val[self.key_dotfile_link] = value if self.key_dotfile_noempty not in val: # apply noempty if undefined value = self.settings.get(self.key_settings_noempty, False) val[self.key_dotfile_noempty] = value if self.key_dotfile_template not in val: # apply template if undefined value = self.settings.get(self.key_settings_template, True) val[self.key_dotfile_template] = value if self.key_dotfile_chmod in val: # validate value of chmod if defined self._norm_dotfile_chmod(val) return new def _add_variables(self, new, shell=False, template=True, prio=False): """ add new variables @shell: execute the variable through the shell @template: template the variable @prio: new takes priority over existing variables """ if not new: return # merge if prio: self.variables = self._merge_dict(new, self.variables) else: # clear existing variable for k in list(new.keys()): if k in self.variables.keys(): del new[k] # merge self.variables = self._merge_dict(self.variables, new) # ensure enriched variables are relative to this config self.variables = self._enrich_vars(self.variables, self._profile) # re-create the templater self._redefine_templater() if template: # rec resolve variables with new ones self._rec_resolve_variables(self.variables) if shell and new: # shell exec self._shell_exec_dvars(self.variables, keys=new.keys()) # re-create the templater self._redefine_templater() def _enrich_vars(self, variables, profile): """return enriched variables""" # add profile variable if profile: variables['profile'] = profile # add some more variables path = self.settings.get(self.key_settings_dotpath) path = self._norm_path(path) variables['_dotdrop_dotpath'] = path variables['_dotdrop_cfgpath'] = self._norm_path(self._path) path = self.settings.get(self.key_settings_workdir) path = self._norm_path(path) variables['_dotdrop_workdir'] = path return variables def _get_profile_included_item(self, keyitem): """recursively get included in profile""" profiles = [self._profile] + self._inc_profiles items = {} for profile in profiles: seen = [self._profile] i = self.__get_profile_included_item(profile, keyitem, seen) items = self._merge_dict(i, items) return items def __get_profile_included_item(self, profile, keyitem, seen): """recursively get included from profile""" items = {} if not profile or profile not in self.profiles.keys(): return items # considered profile entry pentry = self.profiles.get(profile) # recursively get from inherited profile for inherited_profile in pentry.get(self.key_profile_include, []): if inherited_profile == profile or inherited_profile in seen: raise YamlException('\"include\" loop') seen.append(inherited_profile) new = self.__get_profile_included_item(inherited_profile, keyitem, seen) if self._debug: msg = f'included {keyitem} from {inherited_profile}: {new}' self._dbg(msg) items.update(new) cur = pentry.get(keyitem, {}) return self._merge_dict(cur, items) def _resolve_profile_all(self): """resolve some other parts of the config""" # profile -> ALL for k, val in self.profiles.items(): dfs = val.get(self.key_profile_dotfiles, None) if not dfs: continue if self.key_all in dfs: if self._debug: self._dbg(f'add ALL to profile \"{k}\"') val[self.key_profile_dotfiles] = self.dotfiles.keys() def _resolve_profile_includes(self): """resolve profile(s) including other profiles""" for k, _ in self.profiles.items(): self._rec_resolve_profile_include(k) def _rec_resolve_profile_include(self, profile): """ recursively resolve include of other profiles's: * dotfiles * actions returns dotfiles, actions """ this_profile = self.profiles[profile] # considered profile content dotfiles = this_profile.get(self.key_profile_dotfiles, []) or [] actions = this_profile.get(self.key_profile_actions, []) or [] includes = this_profile.get(self.key_profile_include, []) or [] if not includes: # nothing to include return dotfiles, actions if self._debug: incs = ','.join(includes) self._dbg(f'{profile} includes {incs}') self._dbg(f'{profile} dotfiles before include: {dotfiles}') self._dbg(f'{profile} actions before include: {actions}') seen = [] for i in uniq_list(includes): if self._debug: self._dbg(f'resolving includes "{profile}" <- "{i}"') # ensure no include loop occurs if i in seen: raise YamlException('\"include loop\"') seen.append(i) # included profile even exists if i not in self.profiles.keys(): self._log.warn(f'include unknown profile: {i}') continue # recursive resolve if self._debug: self._dbg(f'recursively resolving includes for profile "{i}"') o_dfs, o_actions = self._rec_resolve_profile_include(i) # merge dotfile keys if self._debug: msg = f'Merging dotfiles {profile}' msg += f' <- {i}: {dotfiles} <- {o_dfs}' self._dbg(msg) dotfiles.extend(o_dfs) this_profile[self.key_profile_dotfiles] = uniq_list(dotfiles) # merge actions keys if self._debug: msg = f'Merging actions {profile} ' msg += f'<- {i}: {actions} <- {o_actions}' self._dbg(msg) actions.extend(o_actions) this_profile[self.key_profile_actions] = uniq_list(actions) dotfiles = this_profile.get(self.key_profile_dotfiles, []) actions = this_profile.get(self.key_profile_actions, []) if self._debug: self._dbg(f'{profile} dotfiles after include: {dotfiles}') self._dbg(f'{profile} actions after include: {actions}') # since included items are resolved here # we can clear these include self.profiles[profile][self.key_profile_include] = [] return dotfiles, actions ######################################################## # handle imported entries ######################################################## def _import_variables(self): """import external variables from paths""" paths = self.settings.get(self.key_import_variables, None) if not paths: return None paths = self._resolve_paths(paths) newvars = {} for path in paths: if self._debug: self._dbg(f'import variables from {path}') var = self._import_sub(path, self.key_variables, mandatory=False) if self._debug: self._dbg(f'import dynvariables from {path}') dvar = self._import_sub(path, self.key_dvariables, mandatory=False) merged = self._merge_dict(dvar, var) self._rec_resolve_variables(merged) if dvar.keys(): self._shell_exec_dvars(merged, keys=dvar.keys()) self._clear_profile_vars(merged) newvars = self._merge_dict(newvars, merged) if self._debug: self._debug_dict('imported variables', newvars) return newvars def _import_actions(self): """import external actions from paths""" paths = self.settings.get(self.key_import_actions, None) if not paths: return paths = self._resolve_paths(paths) for path in paths: if self._debug: self._dbg(f'import actions from {path}') new = self._import_sub(path, self.key_actions, mandatory=False, patch_func=self._norm_actions) self.actions = self._merge_dict(new, self.actions) def _import_profiles_dotfiles(self): """import profile dotfiles""" for k, val in self.profiles.items(): imp = val.get(self.key_import_profile_dfs, None) if not imp: continue if self._debug: self._dbg(f'import dotfiles for profile {k}') paths = self._resolve_paths(imp) for path in paths: current = val.get(self.key_dotfiles, []) new = self._import_sub(path, self.key_dotfiles, mandatory=False) val[self.key_dotfiles] = new + current def _import_config(self, path): """import config from path""" if self._debug: self._dbg(f'import config from {path}') self._dbg(f'profile: {self._profile}') self._dbg(f'included profiles: {self._inc_profiles}') sub = CfgYaml(path, profile=self._profile, addprofiles=self._inc_profiles, debug=self._debug, imported_configs=self.imported_configs) # settings are ignored from external file # except for filter_file and func_file self.settings[Settings.key_func_file] += [ self._norm_path(func_file) for func_file in sub.settings[Settings.key_func_file] ] self.settings[Settings.key_filter_file] += [ self._norm_path(func_file) for func_file in sub.settings[Settings.key_filter_file] ] # merge top entries self.dotfiles = self._merge_dict(self.dotfiles, sub.dotfiles) self.profiles = self._merge_dict(self.profiles, sub.profiles, deep=True) self.actions = self._merge_dict(self.actions, sub.actions) self.trans_r = self._merge_dict(self.trans_r, sub.trans_r) self.trans_w = self._merge_dict(self.trans_w, sub.trans_w) self._clear_profile_vars(sub.variables) self.imported_configs.append(path) self.imported_configs += sub.imported_configs if self._debug: self._debug_dict('add import_configs var', sub.variables) self._add_variables(sub.variables, prio=True) def _import_configs(self): """import configs from external files""" # settings -> import_configs imp = self.settings.get(self.key_import_configs, None) if not imp: return paths = self._resolve_paths(imp) for path in paths: if path in self.imported_configs: err = f'{path} imported more than once in {self._path}' raise YamlException(err) self._import_config(path) def _import_sub(self, path, key, mandatory=False, patch_func=None): """ import the block "key" from "path" patch_func is applied to each element if defined """ if self._debug: self._dbg(f'import \"{key}\" from \"{path}\"') extdict = self._load_yaml(path) new = self._get_entry(extdict, key, mandatory=mandatory) if patch_func: if self._debug: self._dbg(f'calling patch: {patch_func}') new = patch_func(new) if not new and mandatory: err = f'no \"{key}\" imported from \"{path}\"' self._log.warn(err) raise YamlException(err) if self._debug: self._dbg(f'imported \"{key}\": {new}') return new ######################################################## # add/remove entries ######################################################## def _new_profile(self, key): """add a new profile if it doesn't exist""" if key == self.key_all: err = f'profile key \"{key}\" is reserved' self._log.warn(err) raise YamlException(err) if not key: err = 'empty profile key' self._log.warn(err) raise YamlException(err) if key not in self.profiles.keys(): # update yaml_dict self._yaml_dict[self.key_profiles][key] = { self.key_profile_dotfiles: [] } self.profiles[key] = { self.key_profile_dotfiles: [] } if self._debug: self._dbg(f'adding new profile: {key}') self._dirty = True ######################################################## # handle deprecated entries ######################################################## def _fix_deprecated(self, yamldict): """fix deprecated entries""" if not yamldict: return self._fix_deprecated_link_by_default(yamldict) self._fix_deprecated_dotfile_link(yamldict) def _fix_deprecated_link_by_default(self, yamldict): """fix deprecated link_by_default""" old_key = 'link_by_default' newkey = self.key_imp_link if self.key_settings not in yamldict: return if not yamldict[self.key_settings]: return config = yamldict[self.key_settings] if old_key not in config: return if config[old_key]: config[newkey] = self.lnk_link else: config[newkey] = self.lnk_nolink del config[old_key] self._log.warn('deprecated \"link_by_default\"') self._dirty = True self._dirty_deprecated = True def _fix_deprecated_dotfile_link(self, yamldict): """fix deprecated link in dotfiles""" old_key = 'link_children' if self.key_dotfiles not in yamldict: return if not yamldict[self.key_dotfiles]: return for _, dotfile in yamldict[self.key_dotfiles].items(): if self.key_dotfile_link in dotfile and \ isinstance(dotfile[self.key_dotfile_link], bool): # patch link: cur = dotfile[self.key_dotfile_link] new = self.lnk_nolink if cur: new = self.lnk_link dotfile[self.key_dotfile_link] = new self._dirty = True self._dirty_deprecated = True warn = 'deprecated \"link: \"' warn += f', updated to \"link: {new}\"' self._log.warn(warn) if self.key_dotfile_link in dotfile and \ dotfile[self.key_dotfile_link] == self.lnk_link: # patch "link: link" # to "link: absolute" new = self.lnk_absolute dotfile[self.key_dotfile_link] = new self._dirty = True self._dirty_deprecated = True warn = 'deprecated \"link: link\"' warn += f', updated to \"link: {new}\"' self._log.warn(warn) if old_key in dotfile and \ isinstance(dotfile[old_key], bool): # patch link_children: cur = dotfile[old_key] new = self.lnk_nolink if cur: new = self.lnk_children del dotfile[old_key] dotfile[self.key_dotfile_link] = new self._dirty = True self._dirty_deprecated = True warn = 'deprecated \"link_children\" value' warn += f', updated to \"{new}\"' self._log.warn(warn) ######################################################## # yaml utils ######################################################## def _prepare_to_save(self, content): content = self._clear_none(content) # make sure we have the base entries if self.key_settings not in content: content[self.key_settings] = None if self.key_dotfiles not in content: content[self.key_dotfiles] = None if self.key_profiles not in content: content[self.key_profiles] = None return content def _load_yaml(self, path): """load a yaml file to a dict""" content = {} if self._debug: self._dbg(f'----------dump:{path}----------') cfg = '\n' with open(path, 'r', encoding='utf8') as file: for line in file: cfg += line self._dbg(cfg.rstrip()) self._dbg(f'----------end:{path}----------') try: content, fmt = self._yaml_load(path) self._config_format = fmt except Exception as exc: self._log.err(exc) err = f'config yaml error: {path}' raise YamlException(err) from exc if self._debug: self._dbg(f'format: {self._config_format}') return content def _validate(self, yamldict): """validate entries""" if not yamldict: return # check top entries for entry in self.top_entries: if entry not in yamldict: err = f'no {entry} entry found' self._log.err(err) raise YamlException(f'config format error: {err}') # check link_dotfile_default if self.key_settings not in yamldict: # no configs top entry return if not yamldict[self.key_settings]: # configs empty return # check settings values settings = yamldict[self.key_settings] if self.key_settings_link_dotfile_default not in settings: return val = settings[self.key_settings_link_dotfile_default] if val not in self.allowed_link_val: err = f'bad link value: {val}' self._log.err(err) self._log.err(f'allowed: {self.allowed_link_val}') raise YamlException(f'config content error: {err}') @classmethod def _yaml_load(cls, path): """load config file""" is_toml = path.lower().endswith(".toml") if is_toml: return cls.__toml_load(path), 'toml' return cls.__yaml_load(path), 'yaml' @classmethod def __yaml_load(cls, path): """load from yaml""" with open(path, 'r', encoding='utf8') as file: data = yaml() data.typ = 'rt' content = data.load(file) return content @classmethod def __toml_load(cls, path): """load from toml""" with open(path, 'r', encoding='utf8') as file: data = file.read() content = toml.loads(data) # handle inexistent dotfiles/profiles # since toml doesn't have a nul/nil/null/none if cls.key_dotfiles not in content: content[cls.key_dotfiles] = None if cls.key_profiles not in content: content[cls.key_profiles] = None return content @classmethod def _yaml_dump(cls, content, file, fmt='yaml'): """dump config file""" if fmt == 'toml': return cls.__toml_dump(content, file) if fmt == 'yaml': return cls.__yaml_dump(content, file) raise YamlException("unsupported format") @classmethod def __yaml_dump(cls, content, file): """dump to yaml""" data = yaml() data.default_flow_style = False data.indent = 2 data.typ = 'rt' data.dump(content, file) @classmethod def __toml_dump(cls, content, file): """dump to toml""" toml.dump(content, file) ######################################################## # templating ######################################################## def _redefine_templater(self): """create templater based on current variables""" fufile = None fifile = None if Settings.key_func_file in self.settings: fufile = self.settings[Settings.key_func_file] if Settings.key_filter_file in self.settings: fifile = self.settings[Settings.key_filter_file] self._tmpl = Templategen(variables=self.variables, func_file=fufile, filter_file=fifile) def _template_item(self, item, exc_if_fail=True): """ template an item using the templategen will raise an exception if template failed and exc_if_fail """ if not Templategen.var_is_template(item): return item try: val = item while Templategen.var_is_template(val): val = self._tmpl.generate_string(val) except UndefinedException as exc: if exc_if_fail: raise exc return val def _template_list(self, entries): """template a list of entries""" new = [] if not entries: return new for entry in entries: newe = self._template_item(entry) if self._debug and entry != newe: self._dbg(f'resolved: {entry} -> {newe}') new.append(newe) return new def _template_dict(self, entries): """template a dictionary of entries""" new = {} if not entries: return new for k, val in entries.items(): newv = self._template_item(val) if self._debug and val != newv: self._dbg(f'resolved: {val} -> {newv}') new[k] = newv return new def _template_dotfiles_entries(self): """template dotfiles entries""" if self._debug: self._dbg('templating dotfiles entries') dotfiles = self.dotfiles.copy() # make sure no dotfiles path is None for dotfile in dotfiles.values(): src = dotfile[self.key_dotfile_src] if src is None: dotfile[self.key_dotfile_src] = '' dst = dotfile[self.key_dotfile_dst] if dst is None: dotfile[self.key_dotfile_dst] = '' # resolve links before taking subset of # dotfiles to avoid issues in upper layer for dotfile in dotfiles.values(): # link if self.key_dotfile_link in dotfile: # normalize the link value link = dotfile[self.key_dotfile_link] newlink = self._resolve_dotfile_link(link) dotfile[self.key_dotfile_link] = newlink # only keep dotfiles related to the selected profile pdfs = [] pro = self.profiles.get(self._profile, []) if pro: pdfs = list(pro.get(self.key_profile_dotfiles, [])) # and any included profiles for addpro in self._inc_profiles: pro = self.profiles.get(addpro, []) if not pro: continue pdfsalt = pro.get(self.key_profile_dotfiles, []) pdfs.extend(pdfsalt) pdfs = uniq_list(pdfs) # if ALL is defined if self.key_all not in pdfs: # take a subset of the dotfiles newdotfiles = {} for k, val in dotfiles.items(): if k in pdfs: newdotfiles[k] = val dotfiles = newdotfiles for dotfile in dotfiles.values(): # src src = dotfile[self.key_dotfile_src] newsrc = self.resolve_dotfile_src(src, templater=self._tmpl) dotfile[self.key_dotfile_src] = newsrc # dst dst = dotfile[self.key_dotfile_dst] newdst = self.resolve_dotfile_dst(dst, templater=self._tmpl) dotfile[self.key_dotfile_dst] = newdst def _rec_resolve_variables(self, variables): """recursive resolve variables""" var = self._enrich_vars(variables, self._profile) # use a separated templategen to handle variables # resolved outside the main config func_files = self.settings[Settings.key_func_file] filter_files = self.settings[Settings.key_filter_file] templ = Templategen(variables=var, func_file=func_files, filter_file=filter_files) for k in variables.keys(): val = variables[k] while Templategen.var_is_template(val): val = templ.generate_string(val) variables[k] = val templ.update_variables(variables) if variables is self.variables: self._redefine_templater() def _get_profile_included_vars(self): """resolve profile included variables/dynvariables""" for _, val in self.profiles.items(): if self.key_profile_include in val and \ val[self.key_profile_include]: new = [] for entry in val[self.key_profile_include]: new.append(self._tmpl.generate_string(entry)) val[self.key_profile_include] = new # now get the included ones pro_var = self._get_profile_included_item(self.key_profile_variables) pro_dvar = self._get_profile_included_item(self.key_profile_dvariables) # the included profiles inc_profiles = [] if self._profile and self._profile in self.profiles: pentry = self.profiles.get(self._profile) inc_profiles = pentry.get(self.key_profile_include, []) # exec incl dynvariables return inc_profiles, pro_var, pro_dvar ######################################################## # helpers ######################################################## def _clear_profile_vars(self, dic): """ remove profile variables from dic if found inplace to avoid profile variables being overwriten """ if not dic: return for k in self._profilevarskeys: dic.pop(k, None) def _parse_extended_import_path(self, path_entry): """Parse an import path in a tuple (path, fatal_not_found).""" if self._debug: self._dbg(f'parsing path entry {path_entry}') path, _, attribute = path_entry.rpartition(self.key_import_sep) fatal_not_found = attribute != self.key_import_ignore_key is_valid_attribute = attribute in ('', self.key_import_ignore_key) if not is_valid_attribute: # If attribute is not valid it can mean that: # - path_entry doesn't contain the separator, and attribute is set # to the whole path by str.rpartition # - path_entry contains a separator, but it's in the file path, so # attribute is set to whatever comes after the separator by # str.rpartition # In both cases, path_entry is the path we're looking for. if self._debug: msg = 'using attribute default values' msg += f' for path {path_entry}' self._dbg(msg) path = path_entry fatal_not_found = self.key_import_fatal_not_found elif self._debug: msg = f'path entry {path_entry} has fatal_not_found' msg += f' flag set to {fatal_not_found}' self._dbg(msg) return path, fatal_not_found def _handle_non_existing_path(self, path, fatal_not_found=True): """Raise an exception or log a warning to handle non-existing paths.""" error = f'bad path {path}' if fatal_not_found: raise YamlException(error) self._log.warn(error) def _check_path_existence(self, path, fatal_not_found=True): """Check if a path exists, raising if necessary.""" if os.path.exists(path): if self._debug: self._dbg(f'path {path} exists') return path self._handle_non_existing_path(path, fatal_not_found) # Explicit return for readability. Anything evaluating to false is ok. return None def _process_path(self, path_entry): """ This method processed a path entry. Namely it: - Normalizes the path. - Expands globs. - Checks for path existence, taking in account fatal_not_found. This method always returns a list containing only absolute paths existing on the filesystem. If the input is not a glob, the list contains at most one element, otherwise it could hold more. """ path, fatal_not_found = self._parse_extended_import_path(path_entry) path = self._norm_path(path) paths = self._glob_path(path) if self._is_glob(path) else [path] if not paths: if self._debug: self._dbg(f"glob path {path} didn't expand") self._handle_non_existing_path(path, fatal_not_found) return [] checked_paths = (self._check_path_existence(p, fatal_not_found) for p in paths) return [p for p in checked_paths if p] def _resolve_paths(self, paths): """ This function resolves a list of paths. This means normalizing, expanding globs and checking for existence, taking in account fatal_not_found flags. """ processed_paths = (self._process_path(p) for p in paths) return list(chain.from_iterable(processed_paths)) @classmethod def _merge_dict(cls, high, low, deep=False): """ both dict must be the same form/type if deep is True, then merge recursively """ if not high: high = {} if not low: low = {} if not high and not low: return {} if not deep: return {**low, **high} final = high.copy() for k, val in low.items(): if isinstance(val, dict): # content is dict, recurse if k not in final: final[k] = {} final[k] = cls._merge_dict(val, final[k], deep=True) elif isinstance(val, list): # content is list, merge if k not in final: final[k] = [] final[k] += val else: # don't know how to handle err = 'unable to merge' raise YamlException(err) return final @classmethod def _get_entry(cls, dic, key, mandatory=True): """return copy of entry from yaml dictionary""" if key not in dic: if mandatory: err = f'invalid config: no entry \"{key}\" found' raise YamlException(err) dic[key] = {} return deepcopy(dic[key]) if mandatory and not dic[key]: # ensure is not none dic[key] = {} return deepcopy(dic[key]) def _clear_none(self, dic): """recursively delete all none/empty values in a dictionary.""" new = {} for k, val in dic.items(): if k == self.key_dotfile_src: # allow empty dotfile src new[k] = val continue if k == self.key_dotfile_dst: # allow empty dotfile dst new[k] = val continue newv = val if isinstance(val, dict): # recursive travers dict newv = self._clear_none(val) if not newv: # no empty dict continue if newv is None: # no None value continue if isinstance(newv, list) and not newv: # no empty list continue new[k] = newv return new @classmethod def _is_glob(cls, path): """Quick test if path is a glob.""" return '*' in path or '?' in path def _glob_path(self, path): """Expand a glob.""" if self._debug: self._dbg(f'expanding glob {path}') expanded_path = os.path.expanduser(path) return glob.glob(expanded_path, recursive=True) def _norm_path(self, path): """Resolve a path either absolute or relative to config path""" if not path: return path path = os.path.expanduser(path) if not os.path.isabs(path): dirn = os.path.dirname(self._path) ret = os.path.join(dirn, path) if self._debug: msg = f'normalizing relative to cfg: {path} -> {ret}' self._dbg(msg) path = ret ret = os.path.normpath(path) if self._debug and path != ret: self._dbg(f'normalizing: {path} -> {ret}') return ret def _shell_exec_dvars(self, dic, keys=None): """shell execute dynvariables in-place""" if not keys: keys = dic.keys() for k in keys: val = dic[k] ret, out = shellrun(val, debug=self._debug) if not ret: err = f'var \"{k}: {val}\" failed: {out}' self._log.err(err) raise YamlException(err) if self._debug: self._dbg(f'{k}: `{val}` -> {out}') dic[k] = out @classmethod def _check_minversion(cls, minversion): if not minversion: return try: cur = ([int(x) for x in VERSION.split('.')]) cfg = ([int(x) for x in minversion.split('.')]) except Exception as exc: err = f'bad version: \"{VERSION}\" VS \"{minversion}\"' raise YamlException(err) from exc if cur < cfg: err = 'current dotdrop version is too old for that config file.' err += ' Please update.' raise YamlException(err) def _debug_entries(self): """debug print all interesting entries""" if not self._debug: return self._dbg('Current entries') self._debug_dict('entry settings', self.settings) self._debug_dict('entry dotfiles', self.dotfiles) self._debug_dict('entry profiles', self.profiles) self._debug_dict('entry actions', self.actions) self._debug_dict('entry trans_r', self.trans_r) self._debug_dict('entry trans_w', self.trans_w) self._debug_dict('entry variables', self.variables) def _debug_dict(self, title, elems): """pretty print dict""" if not self._debug: return self._dbg(f'{title}:') if not elems: return for k, val in elems.items(): if isinstance(val, dict): self._dbg(f' - \"{k}\"') for subkey, sub in val.items(): self._dbg(f' * {subkey}: \"{sub}\"') else: self._dbg(f' - \"{k}\": {val}') def _dbg(self, content): directory = os.path.basename(os.path.dirname(self._path)) pre = os.path.join(directory, os.path.basename(self._path)) self._log.dbg(f'[{pre}] {content}') def _save_uservariables(self, uvars): """save uservariables to file""" parent = os.path.dirname(self._path) # find a unique path path = None cnt = 0 while True: if cnt == 0: name = self.save_uservariables_name.format('') else: name = self.save_uservariables_name.format(f'-{cnt}') cnt += 1 path = os.path.join(parent, name) if not os.path.exists(path): break # save the config content = {'variables': uvars} try: if self._debug: self._dbg(f'saving uservariables values to {path}') with open(path, 'w', encoding='utf8') as file: self._yaml_dump(content, file, fmt=self._config_format) except Exception as exc: # self._log.err(exc) err = f'error saving uservariables to {path}' self._log.err(err) raise YamlException(err) from exc self._log.log(f'uservariables values saved to {path}')