1
0
mirror of https://github.com/deadc0de6/dotdrop.git synced 2026-02-04 18:34:48 +00:00
Files
dotdrop/dotdrop/updater.py
2023-10-22 14:40:55 +02:00

448 lines
17 KiB
Python

"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
handle the update of dotfiles
"""
import os
import shutil
import filecmp
# local imports
from dotdrop.logger import Logger
from dotdrop.templategen import Templategen
from dotdrop.utils import ignores_to_absolute, removepath, \
get_unique_tmp_name, write_to_tmpfile, must_ignore, \
mirror_file_rights, get_file_perm, copytree_with_ign
from dotdrop.exceptions import UndefinedException
TILD = '~'
class Updater:
"""dotfiles updater"""
def __init__(self, dotpath, variables, conf,
profile_key, dry=False, safe=True,
debug=False, ignore=None, showpatch=False,
ignore_missing_in_dotdrop=False):
"""constructor
@dotpath: path where dotfiles are stored
@variables: dictionary of variables for the templates
@conf: configuration manager
@profile_key: the profile key
@dry: simulate
@safe: ask for overwrite if True
@debug: enable debug
@ignore: pattern to ignore when updating
@showpatch: show patch if dotfile to update is a template
"""
self.dotpath = dotpath
self.variables = variables
self.conf = conf
self.profile_key = profile_key
self.dry = dry
self.safe = safe
self.debug = debug
self.ignore = ignore or []
self.showpatch = showpatch
self.ignore_missing_in_dotdrop = ignore_missing_in_dotdrop
self.templater = Templategen(variables=self.variables,
base=self.dotpath,
debug=self.debug)
# save template vars
self.tvars = self.templater.add_tmp_vars()
self.log = Logger(debug=self.debug)
def update_path(self, path):
"""update the dotfile installed on path"""
path = os.path.expanduser(path)
if not os.path.lexists(path):
self.log.err(f'\"{path}\" does not exist!')
return False
dotfiles = self.conf.get_dotfile_by_dst(path,
profile_key=self.profile_key)
if not dotfiles:
return False
for dotfile in dotfiles:
if not dotfile:
err = f'invalid dotfile for update: {dotfile.key}'
self.log.err(err)
return False
msg = f'updating {dotfile} from path \"{path}\"'
self.log.dbg(msg)
if not self._update(path, dotfile):
return False
return True
def update_key(self, key):
"""update the dotfile referenced by key"""
dotfile = self.conf.get_dotfile(key, profile_key=self.profile_key)
if not dotfile:
self.log.dbg(f'no such dotfile: \"{key}\"')
msg = f'invalid dotfile for update: {key}'
self.log.err(msg)
return False
self.log.dbg(f'updating {dotfile} from key \"{key}\"')
path = self.conf.path_to_dotfile_dst(dotfile.dst)
return self._update(path, dotfile)
def _update(self, path, dotfile):
"""update dotfile from file pointed by path"""
ret = False
new_path = None
ignores = list(set(self.ignore + dotfile.upignore))
prefixes = [dotfile.dst, dotfile.src]
ignores = ignores_to_absolute(ignores, prefixes,
debug=self.debug)
self.log.dbg(f'ignore pattern(s) for {path}: {ignores}')
deployed_path = os.path.expanduser(path)
local_path = os.path.join(self.dotpath, dotfile.src)
local_path = os.path.expanduser(local_path)
if not os.path.exists(deployed_path):
msg = f'\"{deployed_path}\" does not exist'
self.log.err(msg)
return False
if not os.path.exists(local_path):
msg = f'\"{local_path}\" does not exist, import it first'
self.log.err(msg)
return False
ignore_missing_in_dotdrop = self.ignore_missing_in_dotdrop or \
dotfile.ignore_missing_in_dotdrop
if ignore_missing_in_dotdrop and not os.path.exists(local_path):
self.log.sub(f'\"{dotfile.key}\" ignored')
return True
# apply write transformation if any
new_path = self._apply_trans_update(deployed_path, dotfile)
if not new_path:
return False
# save current rights
deployed_mode = get_file_perm(deployed_path)
local_mode = get_file_perm(local_path)
# handle the pointed file
if os.path.isdir(new_path):
ret = self._handle_dir(new_path, local_path,
dotfile, ignores)
else:
ret = self._handle_file(new_path, local_path,
ignores)
# mirror rights
if deployed_mode != local_mode:
msg = f'adopt mode {deployed_mode:o} for {dotfile.key}'
self.log.dbg(msg)
if self.conf.update_dotfile(dotfile.key, deployed_mode):
ret = True
# clean temporary files
if new_path != deployed_path and os.path.exists(new_path):
removepath(new_path, logger=self.log)
return ret
def _apply_trans_update(self, path, dotfile):
"""apply write transformation to dotfile"""
trans = dotfile.get_trans_update()
if not trans:
return path
self.log.dbg(f'executing write transformation {trans}')
tmp = get_unique_tmp_name()
self.templater.restore_vars(self.tvars)
newvars = dotfile.get_dotfile_variables()
self.templater.add_tmp_vars(newvars=newvars)
if not trans.transform(path, tmp, templater=self.templater,
debug=self.debug):
if os.path.exists(tmp):
removepath(tmp, logger=self.log)
err = f'transformation \"{trans.key}\" failed for {dotfile.key}'
self.log.err(err)
return None
return tmp
def _is_template(self, path):
if not Templategen.path_is_template(path,
debug=self.debug):
self.log.dbg(f'{path} is NO template')
return False
self.log.warn(f'{path} uses template, update manually')
return True
def _show_patch(self, fpath, tpath):
"""provide a way to manually patch the template"""
content = self._resolve_template(tpath)
tmp = write_to_tmpfile(content)
mirror_file_rights(tpath, tmp)
cmds = ['diff', '-u', tmp, fpath, '|', 'patch', tpath]
cmdss = ' '.join(cmds)
self.log.warn(f'try patching with: \"{cmdss}\"')
return False
def _resolve_template(self, tpath):
"""resolve the template to a temporary file"""
self.templater.restore_vars(self.tvars)
return self.templater.generate(tpath)
def _same_rights(self, left, right):
"""return True if files have the same modes"""
try:
lefts = get_file_perm(left)
rights = get_file_perm(right)
return lefts == rights
except OSError as exc:
self.log.err(exc)
return False
def _mirror_file_perms(self, src, dst):
srcr = get_file_perm(src)
dstr = get_file_perm(dst)
if srcr == dstr:
return
msg = f'copy rights from {src} ({srcr:o}) to {dst} ({dstr:o})'
self.log.dbg(msg)
try:
mirror_file_rights(src, dst)
except OSError as exc:
self.log.err(exc)
def _handle_file(self, deployed_path, local_path,
ignores, compare=True):
"""sync path (deployed file) and local_path (dotdrop dotfile path)"""
if self._must_ignore([deployed_path, local_path], ignores):
self.log.sub(f'\"{local_path}\" ignored')
return True
self.log.dbg(f'update for file {deployed_path} and {local_path}')
if self._is_template(local_path):
# dotfile is a template
self.log.dbg(f'{local_path} is a template')
if self.showpatch:
try:
self._show_patch(deployed_path, local_path)
except UndefinedException as exc:
msg = f'unable to show patch for {deployed_path}: {exc}'
self.log.warn(msg)
return False
if compare and \
filecmp.cmp(deployed_path, local_path, shallow=False) and \
self._same_rights(deployed_path, local_path):
# no difference
self.log.dbg(f'identical files: {deployed_path} and {local_path}')
return True
if not self._overwrite(deployed_path, local_path):
return False
try:
if self.dry:
self.log.dry(f'would cp {deployed_path} {local_path}')
else:
self.log.dbg(f'cp {deployed_path} {local_path}')
shutil.copyfile(deployed_path, local_path)
self._mirror_file_perms(deployed_path, local_path)
self.log.sub(f'\"{local_path}\" updated')
except IOError as exc:
self.log.warn(f'{deployed_path} update failed, do manually: {exc}')
return False
return True
def _handle_dir(self, deployed_path, local_path,
dotfile, ignores):
"""sync path (local dir) and local_path (dotdrop dir path)"""
self.log.dbg(f'handle update for dir {deployed_path} to {local_path}')
# paths must be absolute (no tildes)
deployed_path = os.path.expanduser(deployed_path)
local_path = os.path.expanduser(local_path)
# find the differences
diff = filecmp.dircmp(deployed_path, local_path, ignore=None)
# handle directories diff
ret = self._merge_dirs(diff, dotfile,
ignores)
self._mirror_file_perms(deployed_path, local_path)
return ret
def _merge_dirs_create_left_only(self, diff, left, right,
ignore_missing_in_dotdrop,
ignores):
"""create dirs that don't exist in dotdrop"""
self.log.dbg(f'_merge_dirs_create_left_only: {diff.left_only}')
for toadd in diff.left_only:
exist = os.path.join(left, toadd)
if not os.path.isdir(exist):
# ignore files for now
continue
# match to dotdrop dotpath
new = os.path.join(right, toadd)
if ignore_missing_in_dotdrop and not os.path.exists(new):
self.log.sub(f'\"{exist}\" ignored')
continue
if self.dry:
self.log.dry(f'would cp -r {exist} {new}')
continue
self.log.dbg(f'cp -r {exist} {new}')
try:
ign_func = self._ignore(ignores)
copytree_with_ign(exist, new,
ignore_func=ign_func,
debug=self.debug)
except OSError as exc:
msg = f'error copying dir {exist}'
self.log.err(f'{msg}: {exc}')
continue
self.log.sub(f'\"{new}\" dir added')
def _ignore(self, ignores):
def ignore_func(path):
return must_ignore([path], ignores,
debug=self.debug)
return ignore_func
def _merge_dirs_remove_right_only(self, diff, left, right,
ignore_missing_in_dotdrop,
ignores):
"""remove dirs that don't exist in deployed version"""
self.log.dbg(f'_merge_dirs_remove_right_only: {diff.right_only}')
for toremove in diff.right_only:
old = os.path.join(right, toremove)
if not os.path.isdir(old):
# ignore files for now
continue
if self._must_ignore([old], ignores):
continue
if self.dry:
self.log.dry(f'would rm -r {old}')
continue
self.log.dbg(f'rm -r {old}')
if not self._confirm_rm_r(old):
continue
removepath(old, logger=self.log)
self.log.sub(f'\"{old}\" dir removed')
# handle files diff
# sync files that exist in both but are different
self.log.dbg(f'_merge_dirs_remove_right_only: {diff.diff_files}')
fdiff = diff.diff_files
fdiff.extend(diff.funny_files)
fdiff.extend(diff.common_funny)
for file in fdiff:
fleft = os.path.join(left, file)
fright = os.path.join(right, file)
if (ignore_missing_in_dotdrop and not os.path.exists(fright)) or \
self._must_ignore([fleft, fright], ignores):
continue
if self.dry:
self.log.dry(f'would cp {fleft} {fright}')
continue
self.log.dbg(f'cp {fleft} {fright}')
self._handle_file(fleft, fright,
ignores,
compare=False)
def _merge_dirs_copy_left_only(self, diff, left, right,
ignore_missing_in_dotdrop,
ignores):
"""copy files that don't exist in dotdrop"""
self.log.dbg(f'_merge_dirs_copy_left_only: {diff.left_only}')
for toadd in diff.left_only:
exist = os.path.join(left, toadd)
if os.path.isdir(exist):
# ignore dirs, done above
continue
new = os.path.join(right, toadd)
if (ignore_missing_in_dotdrop and not os.path.exists(new)) or \
self._must_ignore([exist, new], ignores):
continue
if self.dry:
self.log.dry(f'would cp {exist} {new}')
continue
self.log.dbg(f'cp {exist} {new}')
try:
shutil.copyfile(exist, new)
except OSError as exc:
msg = f'error copying file {exist}'
self.log.err(f'{msg}: {exc}')
continue
self._mirror_file_perms(exist, new)
self.log.sub(f'\"{new}\" added')
def _merge_dirs_remove_right_only_2(self, diff, right, ignores):
"""remove files that don't exist in deployed version"""
self.log.dbg(f'_merge_dirs_remove_right_only_2: {diff.right_only}')
for toremove in diff.right_only:
new = os.path.join(right, toremove)
if not os.path.exists(new):
continue
if os.path.isdir(new):
# ignore dirs, done above
continue
if self._must_ignore([new], ignores):
continue
if self.dry:
self.log.dry(f'would rm {new}')
continue
self.log.dbg(f'rm {new}')
removepath(new, logger=self.log)
self.log.sub(f'\"{new}\" removed')
def _merge_dirs(self, diff, dotfile, ignores):
"""Synchronize directories recursively."""
left, right = diff.left, diff.right
self.log.dbg(f'sync dir {left} to {right}')
ignore_missing_in_dotdrop = self.ignore_missing_in_dotdrop or \
dotfile.ignore_missing_in_dotdrop
self._merge_dirs_create_left_only(diff, left, right,
ignore_missing_in_dotdrop,
ignores)
self._merge_dirs_remove_right_only(diff, left, right,
ignore_missing_in_dotdrop,
ignores)
self._merge_dirs_copy_left_only(diff, left, right,
ignore_missing_in_dotdrop,
ignores)
self._merge_dirs_remove_right_only_2(diff, right, ignores)
# compare rights
for common in diff.common_files:
leftf = os.path.join(left, common)
rightf = os.path.join(right, common)
if not self._same_rights(leftf, rightf):
self._mirror_file_perms(leftf, rightf)
# Recursively decent into common subdirectories.
for subdir in diff.subdirs.values():
self._merge_dirs(subdir, dotfile,
ignores)
# Nothing more to do here.
return True
def _overwrite(self, src, dst):
"""ask for overwritting"""
msg = f'Overwrite \"{dst}\" with \"{src}\"?'
if self.safe and not self.log.ask(msg):
return False
return True
def _confirm_rm_r(self, directory):
"""ask for rm -r directory"""
msg = f'Recursively remove \"{directory}\"?'
if self.safe and not self.log.ask(msg):
return False
return True
def _must_ignore(self, paths, ignores):
if must_ignore(paths, ignores, debug=self.debug):
self.log.dbg(f'ignoring update for {paths}')
return True
return False