mirror of
https://github.com/deadc0de6/dotdrop.git
synced 2026-02-04 19:09:44 +00:00
448 lines
17 KiB
Python
448 lines
17 KiB
Python
"""
|
|
author: deadc0de6 (https://github.com/deadc0de6)
|
|
Copyright (c) 2017, deadc0de6
|
|
|
|
handle the update of dotfiles
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import filecmp
|
|
|
|
# local imports
|
|
from dotdrop.logger import Logger
|
|
from dotdrop.templategen import Templategen
|
|
from dotdrop.utils import ignores_to_absolute, removepath, \
|
|
get_unique_tmp_name, write_to_tmpfile, must_ignore, \
|
|
mirror_file_rights, get_file_perm, copytree_with_ign
|
|
from dotdrop.exceptions import UndefinedException
|
|
|
|
|
|
TILD = '~'
|
|
|
|
|
|
class Updater:
|
|
"""dotfiles updater"""
|
|
|
|
def __init__(self, dotpath, variables, conf,
|
|
profile_key, dry=False, safe=True,
|
|
debug=False, ignore=None, showpatch=False,
|
|
ignore_missing_in_dotdrop=False):
|
|
"""constructor
|
|
@dotpath: path where dotfiles are stored
|
|
@variables: dictionary of variables for the templates
|
|
@conf: configuration manager
|
|
@profile_key: the profile key
|
|
@dry: simulate
|
|
@safe: ask for overwrite if True
|
|
@debug: enable debug
|
|
@ignore: pattern to ignore when updating
|
|
@showpatch: show patch if dotfile to update is a template
|
|
"""
|
|
self.dotpath = dotpath
|
|
self.variables = variables
|
|
self.conf = conf
|
|
self.profile_key = profile_key
|
|
self.dry = dry
|
|
self.safe = safe
|
|
self.debug = debug
|
|
self.ignore = ignore or []
|
|
self.showpatch = showpatch
|
|
self.ignore_missing_in_dotdrop = ignore_missing_in_dotdrop
|
|
self.templater = Templategen(variables=self.variables,
|
|
base=self.dotpath,
|
|
debug=self.debug)
|
|
# save template vars
|
|
self.tvars = self.templater.add_tmp_vars()
|
|
self.log = Logger(debug=self.debug)
|
|
|
|
def update_path(self, path):
|
|
"""update the dotfile installed on path"""
|
|
path = os.path.expanduser(path)
|
|
if not os.path.lexists(path):
|
|
self.log.err(f'\"{path}\" does not exist!')
|
|
return False
|
|
dotfiles = self.conf.get_dotfile_by_dst(path,
|
|
profile_key=self.profile_key)
|
|
if not dotfiles:
|
|
return False
|
|
for dotfile in dotfiles:
|
|
if not dotfile:
|
|
err = f'invalid dotfile for update: {dotfile.key}'
|
|
self.log.err(err)
|
|
return False
|
|
|
|
msg = f'updating {dotfile} from path \"{path}\"'
|
|
self.log.dbg(msg)
|
|
if not self._update(path, dotfile):
|
|
return False
|
|
return True
|
|
|
|
def update_key(self, key):
|
|
"""update the dotfile referenced by key"""
|
|
dotfile = self.conf.get_dotfile(key, profile_key=self.profile_key)
|
|
if not dotfile:
|
|
self.log.dbg(f'no such dotfile: \"{key}\"')
|
|
msg = f'invalid dotfile for update: {key}'
|
|
self.log.err(msg)
|
|
return False
|
|
self.log.dbg(f'updating {dotfile} from key \"{key}\"')
|
|
path = self.conf.path_to_dotfile_dst(dotfile.dst)
|
|
return self._update(path, dotfile)
|
|
|
|
def _update(self, path, dotfile):
|
|
"""update dotfile from file pointed by path"""
|
|
ret = False
|
|
new_path = None
|
|
ignores = list(set(self.ignore + dotfile.upignore))
|
|
prefixes = [dotfile.dst, dotfile.src]
|
|
ignores = ignores_to_absolute(ignores, prefixes,
|
|
debug=self.debug)
|
|
self.log.dbg(f'ignore pattern(s) for {path}: {ignores}')
|
|
|
|
deployed_path = os.path.expanduser(path)
|
|
local_path = os.path.join(self.dotpath, dotfile.src)
|
|
local_path = os.path.expanduser(local_path)
|
|
|
|
if not os.path.exists(deployed_path):
|
|
msg = f'\"{deployed_path}\" does not exist'
|
|
self.log.err(msg)
|
|
return False
|
|
|
|
if not os.path.exists(local_path):
|
|
msg = f'\"{local_path}\" does not exist, import it first'
|
|
self.log.err(msg)
|
|
return False
|
|
|
|
ignore_missing_in_dotdrop = self.ignore_missing_in_dotdrop or \
|
|
dotfile.ignore_missing_in_dotdrop
|
|
|
|
if ignore_missing_in_dotdrop and not os.path.exists(local_path):
|
|
self.log.sub(f'\"{dotfile.key}\" ignored')
|
|
return True
|
|
|
|
# apply write transformation if any
|
|
new_path = self._apply_trans_w(deployed_path, dotfile)
|
|
if not new_path:
|
|
return False
|
|
|
|
# save current rights
|
|
deployed_mode = get_file_perm(deployed_path)
|
|
local_mode = get_file_perm(local_path)
|
|
|
|
# handle the pointed file
|
|
if os.path.isdir(new_path):
|
|
ret = self._handle_dir(new_path, local_path,
|
|
dotfile, ignores)
|
|
else:
|
|
ret = self._handle_file(new_path, local_path,
|
|
ignores)
|
|
|
|
# mirror rights
|
|
if deployed_mode != local_mode:
|
|
msg = f'adopt mode {deployed_mode:o} for {dotfile.key}'
|
|
self.log.dbg(msg)
|
|
if self.conf.update_dotfile(dotfile.key, deployed_mode):
|
|
ret = True
|
|
|
|
# clean temporary files
|
|
if new_path != deployed_path and os.path.exists(new_path):
|
|
removepath(new_path, logger=self.log)
|
|
return ret
|
|
|
|
def _apply_trans_w(self, path, dotfile):
|
|
"""apply write transformation to dotfile"""
|
|
trans = dotfile.get_trans_w()
|
|
if not trans:
|
|
return path
|
|
self.log.dbg(f'executing write transformation {trans}')
|
|
tmp = get_unique_tmp_name()
|
|
self.templater.restore_vars(self.tvars)
|
|
newvars = dotfile.get_dotfile_variables()
|
|
self.templater.add_tmp_vars(newvars=newvars)
|
|
if not trans.transform(path, tmp, templater=self.templater,
|
|
debug=self.debug):
|
|
if os.path.exists(tmp):
|
|
removepath(tmp, logger=self.log)
|
|
err = f'transformation \"{trans.key}\" failed for {dotfile.key}'
|
|
self.log.err(err)
|
|
return None
|
|
return tmp
|
|
|
|
def _is_template(self, path):
|
|
if not Templategen.path_is_template(path,
|
|
debug=self.debug):
|
|
self.log.dbg(f'{path} is NO template')
|
|
return False
|
|
self.log.warn(f'{path} uses template, update manually')
|
|
return True
|
|
|
|
def _show_patch(self, fpath, tpath):
|
|
"""provide a way to manually patch the template"""
|
|
content = self._resolve_template(tpath)
|
|
tmp = write_to_tmpfile(content)
|
|
mirror_file_rights(tpath, tmp)
|
|
cmds = ['diff', '-u', tmp, fpath, '|', 'patch', tpath]
|
|
cmdss = ' '.join(cmds)
|
|
self.log.warn(f'try patching with: \"{cmdss}\"')
|
|
return False
|
|
|
|
def _resolve_template(self, tpath):
|
|
"""resolve the template to a temporary file"""
|
|
self.templater.restore_vars(self.tvars)
|
|
return self.templater.generate(tpath)
|
|
|
|
def _same_rights(self, left, right):
|
|
"""return True if files have the same modes"""
|
|
try:
|
|
lefts = get_file_perm(left)
|
|
rights = get_file_perm(right)
|
|
return lefts == rights
|
|
except OSError as exc:
|
|
self.log.err(exc)
|
|
return False
|
|
|
|
def _mirror_file_perms(self, src, dst):
|
|
srcr = get_file_perm(src)
|
|
dstr = get_file_perm(dst)
|
|
if srcr == dstr:
|
|
return
|
|
msg = f'copy rights from {src} ({srcr:o}) to {dst} ({dstr:o})'
|
|
self.log.dbg(msg)
|
|
try:
|
|
mirror_file_rights(src, dst)
|
|
except OSError as exc:
|
|
self.log.err(exc)
|
|
|
|
def _handle_file(self, deployed_path, local_path,
|
|
ignores, compare=True):
|
|
"""sync path (deployed file) and local_path (dotdrop dotfile path)"""
|
|
if self._must_ignore([deployed_path, local_path], ignores):
|
|
self.log.sub(f'\"{local_path}\" ignored')
|
|
return True
|
|
self.log.dbg(f'update for file {deployed_path} and {local_path}')
|
|
if self._is_template(local_path):
|
|
# dotfile is a template
|
|
self.log.dbg(f'{local_path} is a template')
|
|
if self.showpatch:
|
|
try:
|
|
self._show_patch(deployed_path, local_path)
|
|
except UndefinedException as exc:
|
|
msg = f'unable to show patch for {deployed_path}: {exc}'
|
|
self.log.warn(msg)
|
|
return False
|
|
if compare and \
|
|
filecmp.cmp(deployed_path, local_path, shallow=False) and \
|
|
self._same_rights(deployed_path, local_path):
|
|
# no difference
|
|
self.log.dbg(f'identical files: {deployed_path} and {local_path}')
|
|
return True
|
|
if not self._overwrite(deployed_path, local_path):
|
|
return False
|
|
try:
|
|
if self.dry:
|
|
self.log.dry(f'would cp {deployed_path} {local_path}')
|
|
else:
|
|
self.log.dbg(f'cp {deployed_path} {local_path}')
|
|
shutil.copyfile(deployed_path, local_path)
|
|
self._mirror_file_perms(deployed_path, local_path)
|
|
self.log.sub(f'\"{local_path}\" updated')
|
|
except IOError as exc:
|
|
self.log.warn(f'{deployed_path} update failed, do manually: {exc}')
|
|
return False
|
|
return True
|
|
|
|
def _handle_dir(self, deployed_path, local_path,
|
|
dotfile, ignores):
|
|
"""sync path (local dir) and local_path (dotdrop dir path)"""
|
|
self.log.dbg(f'handle update for dir {deployed_path} to {local_path}')
|
|
# paths must be absolute (no tildes)
|
|
deployed_path = os.path.expanduser(deployed_path)
|
|
local_path = os.path.expanduser(local_path)
|
|
|
|
# find the differences
|
|
diff = filecmp.dircmp(deployed_path, local_path, ignore=None)
|
|
# handle directories diff
|
|
ret = self._merge_dirs(diff, dotfile,
|
|
ignores)
|
|
self._mirror_file_perms(deployed_path, local_path)
|
|
return ret
|
|
|
|
def _merge_dirs_create_left_only(self, diff, left, right,
|
|
ignore_missing_in_dotdrop,
|
|
ignores):
|
|
"""create dirs that don't exist in dotdrop"""
|
|
self.log.dbg(f'_merge_dirs_create_left_only: {diff.left_only}')
|
|
for toadd in diff.left_only:
|
|
exist = os.path.join(left, toadd)
|
|
if not os.path.isdir(exist):
|
|
# ignore files for now
|
|
continue
|
|
# match to dotdrop dotpath
|
|
new = os.path.join(right, toadd)
|
|
if ignore_missing_in_dotdrop and not os.path.exists(new):
|
|
self.log.sub(f'\"{exist}\" ignored')
|
|
continue
|
|
if self.dry:
|
|
self.log.dry(f'would cp -r {exist} {new}')
|
|
continue
|
|
self.log.dbg(f'cp -r {exist} {new}')
|
|
try:
|
|
ign_func = self._ignore(ignores)
|
|
copytree_with_ign(exist, new,
|
|
ignore_func=ign_func,
|
|
debug=self.debug)
|
|
except OSError as exc:
|
|
msg = f'error copying dir {exist}'
|
|
self.log.err(f'{msg}: {exc}')
|
|
continue
|
|
self.log.sub(f'\"{new}\" dir added')
|
|
|
|
def _ignore(self, ignores):
|
|
def ignore_func(path):
|
|
return must_ignore([path], ignores,
|
|
debug=self.debug)
|
|
return ignore_func
|
|
|
|
def _merge_dirs_remove_right_only(self, diff, left, right,
|
|
ignore_missing_in_dotdrop,
|
|
ignores):
|
|
"""remove dirs that don't exist in deployed version"""
|
|
self.log.dbg(f'_merge_dirs_remove_right_only: {diff.right_only}')
|
|
for toremove in diff.right_only:
|
|
old = os.path.join(right, toremove)
|
|
if not os.path.isdir(old):
|
|
# ignore files for now
|
|
continue
|
|
if self._must_ignore([old], ignores):
|
|
continue
|
|
if self.dry:
|
|
self.log.dry(f'would rm -r {old}')
|
|
continue
|
|
self.log.dbg(f'rm -r {old}')
|
|
if not self._confirm_rm_r(old):
|
|
continue
|
|
removepath(old, logger=self.log)
|
|
self.log.sub(f'\"{old}\" dir removed')
|
|
|
|
# handle files diff
|
|
# sync files that exist in both but are different
|
|
self.log.dbg(f'_merge_dirs_remove_right_only: {diff.diff_files}')
|
|
fdiff = diff.diff_files
|
|
fdiff.extend(diff.funny_files)
|
|
fdiff.extend(diff.common_funny)
|
|
for file in fdiff:
|
|
fleft = os.path.join(left, file)
|
|
fright = os.path.join(right, file)
|
|
if (ignore_missing_in_dotdrop and not os.path.exists(fright)) or \
|
|
self._must_ignore([fleft, fright], ignores):
|
|
continue
|
|
if self.dry:
|
|
self.log.dry(f'would cp {fleft} {fright}')
|
|
continue
|
|
self.log.dbg(f'cp {fleft} {fright}')
|
|
self._handle_file(fleft, fright,
|
|
ignores,
|
|
compare=False)
|
|
|
|
def _merge_dirs_copy_left_only(self, diff, left, right,
|
|
ignore_missing_in_dotdrop,
|
|
ignores):
|
|
"""copy files that don't exist in dotdrop"""
|
|
self.log.dbg(f'_merge_dirs_copy_left_only: {diff.left_only}')
|
|
for toadd in diff.left_only:
|
|
exist = os.path.join(left, toadd)
|
|
if os.path.isdir(exist):
|
|
# ignore dirs, done above
|
|
continue
|
|
new = os.path.join(right, toadd)
|
|
if (ignore_missing_in_dotdrop and not os.path.exists(new)) or \
|
|
self._must_ignore([exist, new], ignores):
|
|
continue
|
|
if self.dry:
|
|
self.log.dry(f'would cp {exist} {new}')
|
|
continue
|
|
self.log.dbg(f'cp {exist} {new}')
|
|
try:
|
|
shutil.copyfile(exist, new)
|
|
except OSError as exc:
|
|
msg = f'error copying file {exist}'
|
|
self.log.err(f'{msg}: {exc}')
|
|
continue
|
|
|
|
self._mirror_file_perms(exist, new)
|
|
self.log.sub(f'\"{new}\" added')
|
|
|
|
def _merge_dirs_remove_right_only_2(self, diff, right, ignores):
|
|
"""remove files that don't exist in deployed version"""
|
|
self.log.dbg(f'_merge_dirs_remove_right_only_2: {diff.right_only}')
|
|
for toremove in diff.right_only:
|
|
new = os.path.join(right, toremove)
|
|
if not os.path.exists(new):
|
|
continue
|
|
if os.path.isdir(new):
|
|
# ignore dirs, done above
|
|
continue
|
|
if self._must_ignore([new], ignores):
|
|
continue
|
|
if self.dry:
|
|
self.log.dry(f'would rm {new}')
|
|
continue
|
|
self.log.dbg(f'rm {new}')
|
|
removepath(new, logger=self.log)
|
|
self.log.sub(f'\"{new}\" removed')
|
|
|
|
def _merge_dirs(self, diff, dotfile, ignores):
|
|
"""Synchronize directories recursively."""
|
|
left, right = diff.left, diff.right
|
|
self.log.dbg(f'sync dir {left} to {right}')
|
|
|
|
ignore_missing_in_dotdrop = self.ignore_missing_in_dotdrop or \
|
|
dotfile.ignore_missing_in_dotdrop
|
|
|
|
self._merge_dirs_create_left_only(diff, left, right,
|
|
ignore_missing_in_dotdrop,
|
|
ignores)
|
|
self._merge_dirs_remove_right_only(diff, left, right,
|
|
ignore_missing_in_dotdrop,
|
|
ignores)
|
|
self._merge_dirs_copy_left_only(diff, left, right,
|
|
ignore_missing_in_dotdrop,
|
|
ignores)
|
|
self._merge_dirs_remove_right_only_2(diff, right, ignores)
|
|
|
|
# compare rights
|
|
for common in diff.common_files:
|
|
leftf = os.path.join(left, common)
|
|
rightf = os.path.join(right, common)
|
|
if not self._same_rights(leftf, rightf):
|
|
self._mirror_file_perms(leftf, rightf)
|
|
|
|
# Recursively decent into common subdirectories.
|
|
for subdir in diff.subdirs.values():
|
|
self._merge_dirs(subdir, dotfile,
|
|
ignores)
|
|
|
|
# Nothing more to do here.
|
|
return True
|
|
|
|
def _overwrite(self, src, dst):
|
|
"""ask for overwritting"""
|
|
msg = f'Overwrite \"{dst}\" with \"{src}\"?'
|
|
if self.safe and not self.log.ask(msg):
|
|
return False
|
|
return True
|
|
|
|
def _confirm_rm_r(self, directory):
|
|
"""ask for rm -r directory"""
|
|
msg = f'Recursively remove \"{directory}\"?'
|
|
if self.safe and not self.log.ask(msg):
|
|
return False
|
|
return True
|
|
|
|
def _must_ignore(self, paths, ignores):
|
|
if must_ignore(paths, ignores, debug=self.debug):
|
|
self.log.dbg(f'ignoring update for {paths}')
|
|
return True
|
|
return False
|