mirror of
https://github.com/deadc0de6/dotdrop.git
synced 2026-02-05 03:19:43 +00:00
242 lines
6.0 KiB
Python
242 lines
6.0 KiB
Python
"""
|
|
author: deadc0de6 (https://github.com/deadc0de6)
|
|
Copyright (c) 2017, deadc0de6
|
|
|
|
utilities
|
|
"""
|
|
|
|
import subprocess
|
|
import tempfile
|
|
import os
|
|
import uuid
|
|
import fnmatch
|
|
import inspect
|
|
import importlib
|
|
from shutil import rmtree, which
|
|
|
|
# local import
|
|
from dotdrop.logger import Logger
|
|
|
|
LOG = Logger()
|
|
STAR = '*'
|
|
|
|
# files dotdrop refuses to remove
|
|
DONOTDELETE = [
|
|
os.path.expanduser('~'),
|
|
os.path.expanduser('~/.config'),
|
|
]
|
|
NOREMOVE = [os.path.normpath(p) for p in DONOTDELETE]
|
|
|
|
|
|
def run(cmd, raw=True, debug=False, checkerr=False):
|
|
"""run a command (expects a list)"""
|
|
if debug:
|
|
LOG.dbg('exec: {}'.format(' '.join(cmd)))
|
|
p = subprocess.Popen(cmd, shell=False,
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
p.wait()
|
|
ret = p.returncode
|
|
out = p.stdout.readlines()
|
|
lines = ''.join([x.decode('utf-8', 'replace') for x in out])
|
|
if checkerr and ret != 0:
|
|
c = ' '.join(cmd)
|
|
errl = lines.rstrip()
|
|
m = '\"{}\" returned non zero ({}): {}'.format(c, ret, errl)
|
|
LOG.err(m)
|
|
if raw:
|
|
return ret == 0, out
|
|
return ret == 0, lines
|
|
|
|
|
|
def write_to_tmpfile(content):
|
|
"""write some content to a tmp file"""
|
|
path = get_tmpfile()
|
|
with open(path, 'wb') as f:
|
|
f.write(content)
|
|
return path
|
|
|
|
|
|
def shell(cmd, debug=False):
|
|
"""
|
|
run a command in the shell (expects a string)
|
|
returns True|False, output
|
|
"""
|
|
if debug:
|
|
LOG.dbg('shell exec: {}'.format(cmd))
|
|
ret, out = subprocess.getstatusoutput(cmd)
|
|
if debug:
|
|
LOG.dbg('shell result ({}): {}'.format(ret, out))
|
|
return ret == 0, out
|
|
|
|
|
|
def diff(original, modified, raw=True,
|
|
diff_cmd='', debug=False):
|
|
"""compare two files"""
|
|
if not diff_cmd:
|
|
diff_cmd = 'diff -r -u {0} {1}'
|
|
|
|
replacements = {
|
|
"{0}": original,
|
|
"{original}": original,
|
|
"{1}": modified,
|
|
"{modified}": modified,
|
|
}
|
|
cmd = [replacements.get(x, x) for x in diff_cmd.split()]
|
|
_, out = run(cmd, raw=raw, debug=debug)
|
|
return out
|
|
|
|
|
|
def get_tmpdir():
|
|
"""create a temporary directory"""
|
|
return tempfile.mkdtemp(prefix='dotdrop-')
|
|
|
|
|
|
def get_tmpfile():
|
|
"""create a temporary file"""
|
|
(_, path) = tempfile.mkstemp(prefix='dotdrop-')
|
|
return path
|
|
|
|
|
|
def get_unique_tmp_name():
|
|
"""get a unique file name (not created)"""
|
|
unique = str(uuid.uuid4())
|
|
return os.path.join(tempfile.gettempdir(), unique)
|
|
|
|
|
|
def remove(path, quiet=False):
|
|
"""remove a file/directory/symlink"""
|
|
if not path:
|
|
return
|
|
if not os.path.lexists(path):
|
|
if quiet:
|
|
return
|
|
raise OSError("File not found: {}".format(path))
|
|
if os.path.normpath(os.path.expanduser(path)) in NOREMOVE:
|
|
if quiet:
|
|
return
|
|
err = 'Dotdrop refuses to remove {}'.format(path)
|
|
LOG.err(err)
|
|
raise OSError(err)
|
|
if os.path.islink(path) or os.path.isfile(path):
|
|
os.unlink(path)
|
|
elif os.path.isdir(path):
|
|
rmtree(path)
|
|
else:
|
|
if quiet:
|
|
return
|
|
raise OSError("Unsupported file type for deletion: {}".format(path))
|
|
|
|
|
|
def samefile(path1, path2):
|
|
"""return True if represent the same file"""
|
|
if not os.path.exists(path1):
|
|
return False
|
|
if not os.path.exists(path2):
|
|
return False
|
|
return os.path.samefile(path1, path2)
|
|
|
|
|
|
def header():
|
|
"""return dotdrop header"""
|
|
return 'This dotfile is managed using dotdrop'
|
|
|
|
|
|
def content_empty(string):
|
|
"""return True if is empty or only one CRLF"""
|
|
if not string:
|
|
return True
|
|
if string == b'\n':
|
|
return True
|
|
return False
|
|
|
|
|
|
def strip_home(path):
|
|
"""properly strip $HOME from path"""
|
|
home = os.path.expanduser('~') + os.sep
|
|
if path.startswith(home):
|
|
path = path[len(home):]
|
|
return path
|
|
|
|
|
|
def must_ignore(paths, ignores, debug=False):
|
|
"""return true if any paths in list matches any ignore patterns"""
|
|
if not ignores:
|
|
return False
|
|
if debug:
|
|
LOG.dbg('must ignore? {} against {}'.format(paths, ignores))
|
|
for p in paths:
|
|
for i in ignores:
|
|
if fnmatch.fnmatch(p, i):
|
|
if debug:
|
|
LOG.dbg('ignore \"{}\" match: {}'.format(i, p))
|
|
return True
|
|
return False
|
|
|
|
|
|
def uniq_list(a_list):
|
|
"""unique elements of a list while preserving order"""
|
|
new = []
|
|
for a in a_list:
|
|
if a not in new:
|
|
new.append(a)
|
|
return new
|
|
|
|
|
|
def patch_ignores(ignores, prefix, debug=False):
|
|
"""allow relative ignore pattern"""
|
|
new = []
|
|
if debug:
|
|
LOG.dbg('ignores before patching: {}'.format(ignores))
|
|
for ignore in ignores:
|
|
if os.path.isabs(ignore):
|
|
# is absolute
|
|
new.append(ignore)
|
|
continue
|
|
if STAR in ignore:
|
|
if ignore.startswith(STAR) or ignore.startswith(os.sep):
|
|
# is glob
|
|
new.append(ignore)
|
|
continue
|
|
# patch ignore
|
|
path = os.path.join(prefix, ignore)
|
|
new.append(path)
|
|
if debug:
|
|
LOG.dbg('ignores after patching: {}'.format(new))
|
|
return new
|
|
|
|
|
|
def get_module_functions(mod):
|
|
"""return a list of fonction from a module"""
|
|
funcs = []
|
|
for m in inspect.getmembers(mod):
|
|
name, func = m
|
|
if not inspect.isfunction(func):
|
|
continue
|
|
funcs.append((name, func))
|
|
return funcs
|
|
|
|
|
|
def get_module_from_path(path):
|
|
"""get module from path"""
|
|
if not path or not os.path.exists(path):
|
|
return None
|
|
module_name = os.path.basename(path).rstrip('.py')
|
|
loader = importlib.machinery.SourceFileLoader(module_name, path)
|
|
mod = loader.load_module()
|
|
return mod
|
|
|
|
|
|
def dependencies_met():
|
|
"""make sure all dependencies are met"""
|
|
deps = ['file', 'diff', 'mkdir']
|
|
err = 'The tool \"{}\" was not found in the PATH!'
|
|
for dep in deps:
|
|
if not which(dep):
|
|
raise Exception(err.format(dep))
|
|
|
|
|
|
def mirror_file_rights(src, dst):
|
|
"""mirror file rights of src to dst (can rise exc)"""
|
|
rights = os.stat(src).st_mode
|
|
os.chmod(dst, rights)
|