1
0
mirror of https://github.com/deadc0de6/dotdrop.git synced 2026-02-10 23:09:16 +00:00

type utils

This commit is contained in:
deadc0de6
2024-01-18 21:46:19 +01:00
parent d9fabe3e81
commit 6dba75ff47

View File

@@ -24,6 +24,12 @@ from packaging import version
from dotdrop.logger import Logger from dotdrop.logger import Logger
from dotdrop.exceptions import UnmetDependency from dotdrop.exceptions import UnmetDependency
from dotdrop.version import __version__ as VERSION from dotdrop.version import __version__ as VERSION
from dotdrop.action import Action
from dotdrop.dotfile import Dotfile
from dotdrop.options import Options
from dotdrop.profile import Profile
from ruamel.yaml.comments import CommentedSeq
from typing import Any, Callable, List, Optional, Tuple, Union
LOG = Logger() LOG = Logger()
STAR = '*' STAR = '*'
@@ -40,7 +46,7 @@ DONOTDELETE = [
NOREMOVE = [os.path.normpath(p) for p in DONOTDELETE] NOREMOVE = [os.path.normpath(p) for p in DONOTDELETE]
def run(cmd, debug=False): def run(cmd: List[str], debug: bool=False) -> Tuple[bool, str]:
"""run a command (expects a list)""" """run a command (expects a list)"""
if debug: if debug:
fcmd = ' '.join(cmd) fcmd = ' '.join(cmd)
@@ -55,7 +61,7 @@ def run(cmd, debug=False):
return ret == 0, lines return ret == 0, lines
def write_to_tmpfile(content): def write_to_tmpfile(content: bytes) -> str:
"""write some content to a tmp file""" """write some content to a tmp file"""
path = get_tmpfile() path = get_tmpfile()
with open(path, 'wb') as file: with open(path, 'wb') as file:
@@ -63,7 +69,7 @@ def write_to_tmpfile(content):
return path return path
def shellrun(cmd, debug=False): def shellrun(cmd: str, debug: bool=False) -> Tuple[bool, str]:
""" """
run a command in the shell (expects a string) run a command in the shell (expects a string)
returns True|False, output returns True|False, output
@@ -76,7 +82,7 @@ def shellrun(cmd, debug=False):
return ret == 0, out return ret == 0, out
def userinput(prompt, debug=False): def userinput(prompt: str, debug: bool = False) -> str:
""" """
get user input get user input
return user input return user input
@@ -90,13 +96,13 @@ def userinput(prompt, debug=False):
return res return res
def fastdiff(left, right): def fastdiff(left: str, right: str) -> bool:
"""fast compare files and returns True if different""" """fast compare files and returns True if different"""
return not filecmp.cmp(left, right, shallow=False) return not filecmp.cmp(left, right, shallow=False)
def diff(original, modified, def diff(original: str, modified: str,
diff_cmd='', debug=False): diff_cmd: str='', debug: bool=False) -> str:
"""compare two files, returns '' if same""" """compare two files, returns '' if same"""
if not diff_cmd: if not diff_cmd:
diff_cmd = 'diff -r -u {0} {1}' diff_cmd = 'diff -r -u {0} {1}'
@@ -112,7 +118,7 @@ def diff(original, modified,
return out return out
def get_tmpdir(): def get_tmpdir() -> str:
"""create and return the temporary directory""" """create and return the temporary directory"""
# pylint: disable=W0603 # pylint: disable=W0603
global TMPDIR global TMPDIR
@@ -124,7 +130,7 @@ def get_tmpdir():
return tmp return tmp
def _get_tmpdir(): def _get_tmpdir() -> str:
"""create the tmpdir""" """create the tmpdir"""
try: try:
if ENV_TEMP in os.environ: if ENV_TEMP in os.environ:
@@ -139,21 +145,21 @@ def _get_tmpdir():
return tempfile.mkdtemp(prefix='dotdrop-') return tempfile.mkdtemp(prefix='dotdrop-')
def get_tmpfile(): def get_tmpfile() -> str:
"""create a temporary file""" """create a temporary file"""
tmpdir = get_tmpdir() tmpdir = get_tmpdir()
return tempfile.NamedTemporaryFile(prefix='dotdrop-', return tempfile.NamedTemporaryFile(prefix='dotdrop-',
dir=tmpdir, delete=False).name dir=tmpdir, delete=False).name
def get_unique_tmp_name(): def get_unique_tmp_name() -> str:
"""get a unique file name (not created)""" """get a unique file name (not created)"""
unique = str(uuid.uuid4()) unique = str(uuid.uuid4())
tmpdir = get_tmpdir() tmpdir = get_tmpdir()
return os.path.join(tmpdir, unique) return os.path.join(tmpdir, unique)
def removepath(path, logger=None): def removepath(path: str, logger: Optional[Logger]=None) -> None:
""" """
remove a file/directory/symlink remove a file/directory/symlink
if logger is defined, OSError are catched if logger is defined, OSError are catched
@@ -193,7 +199,7 @@ def removepath(path, logger=None):
raise OSError(err) from exc raise OSError(err) from exc
def samefile(path1, path2): def samefile(path1: str, path2: str) -> bool:
"""return True if represent the same file""" """return True if represent the same file"""
if not os.path.exists(path1): if not os.path.exists(path1):
return False return False
@@ -202,12 +208,12 @@ def samefile(path1, path2):
return os.path.samefile(path1, path2) return os.path.samefile(path1, path2)
def header(): def header() -> str:
"""return dotdrop header""" """return dotdrop header"""
return 'This dotfile is managed using dotdrop' return 'This dotfile is managed using dotdrop'
def content_empty(string): def content_empty(string: bytes) -> bool:
"""return True if is empty or only one CRLF""" """return True if is empty or only one CRLF"""
if not string: if not string:
return True return True
@@ -216,7 +222,7 @@ def content_empty(string):
return False return False
def strip_home(path): def strip_home(path: str) -> str:
"""properly strip $HOME from path""" """properly strip $HOME from path"""
home = os.path.expanduser('~') + os.sep home = os.path.expanduser('~') + os.sep
if path.startswith(home): if path.startswith(home):
@@ -224,7 +230,7 @@ def strip_home(path):
return path return path
def _match_ignore_pattern(path, pattern, debug=False): def _match_ignore_pattern(path: str, pattern: str, debug: bool=False) -> bool:
""" """
returns true if path matches the pattern returns true if path matches the pattern
we test the entire path but also we test the entire path but also
@@ -249,7 +255,10 @@ def _match_ignore_pattern(path, pattern, debug=False):
return False return False
def _must_ignore(path, ignores, neg_ignores, debug=False): def _must_ignore(path: str,
ignores: List[str],
neg_ignores: List[str],
debug: bool=False) -> bool:
""" """
return true if path matches any ignore patterns return true if path matches any ignore patterns
""" """
@@ -315,7 +324,9 @@ def _must_ignore(path, ignores, neg_ignores, debug=False):
return True return True
def must_ignore(paths, ignores, debug=False): def must_ignore(paths: List[str],
ignores: List[str],
debug: bool=False) -> bool:
""" """
return true if any paths in list matches any ignore patterns return true if any paths in list matches any ignore patterns
""" """
@@ -336,7 +347,10 @@ def must_ignore(paths, ignores, debug=False):
return False return False
def _cp(src, dst, ignore_func=None, debug=False): def _cp(src: str,
dst: str,
ignore_func: Optional[Callable]=None,
debug: bool=False) -> int:
""" """
the copy function for copytree the copy function for copytree
returns the numb of files copied returns the numb of files copied
@@ -363,7 +377,7 @@ def _cp(src, dst, ignore_func=None, debug=False):
return 0 return 0
def copyfile(src, dst, debug=False): def copyfile(src: str, dst: str, debug: bool=False) -> bool:
""" """
copy file from src to dst copy file from src to dst
no dir expected! no dir expected!
@@ -372,7 +386,10 @@ def copyfile(src, dst, debug=False):
return _cp(src, dst, debug=debug) == 1 return _cp(src, dst, debug=debug) == 1
def copytree_with_ign(src, dst, ignore_func=None, debug=False): def copytree_with_ign(src: str,
dst: str,
ignore_func: Optional[Callable]=None,
debug: bool=False) -> int:
""" """
copytree with support for ignore copytree with support for ignore
returns the numb of files installed returns the numb of files installed
@@ -407,7 +424,7 @@ def copytree_with_ign(src, dst, ignore_func=None, debug=False):
return copied_count return copied_count
def uniq_list(a_list): def uniq_list(a_list: List[str]) -> List[str]:
"""unique elements of a list while preserving order""" """unique elements of a list while preserving order"""
new = [] new = []
if not a_list: if not a_list:
@@ -418,7 +435,9 @@ def uniq_list(a_list):
return new return new
def ignores_to_absolute(ignores, prefixes, debug=False): def ignores_to_absolute(ignores: List[str],
prefixes: List[str],
debug: bool=False) -> List[str]:
"""allow relative ignore pattern""" """allow relative ignore pattern"""
new = [] new = []
LOG.dbg(f'ignores before patching: {ignores}', force=debug) LOG.dbg(f'ignores before patching: {ignores}', force=debug)
@@ -464,7 +483,7 @@ def get_module_functions(mod):
return funcs return funcs
def get_module_from_path(path): def get_module_from_path(path: str):
"""get module from path""" """get module from path"""
if not path or not os.path.exists(path): if not path or not os.path.exists(path):
return None return None
@@ -482,7 +501,7 @@ def get_module_from_path(path):
return mod return mod
def dependencies_met(): def dependencies_met() -> None:
"""make sure all dependencies are met""" """make sure all dependencies are met"""
# check unix tools deps # check unix tools deps
# diff command is checked in settings.py # diff command is checked in settings.py
@@ -563,7 +582,7 @@ def dependencies_met():
# pylint: enable=C0415 # pylint: enable=C0415
def mirror_file_rights(src, dst): def mirror_file_rights(src: str, dst: str) -> None:
"""mirror file rights of src to dst (can rise exc)""" """mirror file rights of src to dst (can rise exc)"""
if not os.path.exists(src) or not os.path.exists(dst): if not os.path.exists(src) or not os.path.exists(dst):
return return
@@ -571,7 +590,7 @@ def mirror_file_rights(src, dst):
os.chmod(dst, rights) os.chmod(dst, rights)
def get_umask(): def get_umask() -> int:
"""return current umask value""" """return current umask value"""
cur = os.umask(0) cur = os.umask(0)
os.umask(cur) os.umask(cur)
@@ -579,7 +598,7 @@ def get_umask():
return cur return cur
def get_default_file_perms(path, umask): def get_default_file_perms(path: str, umask: int) -> int:
"""get default rights for a file""" """get default rights for a file"""
base = 0o666 base = 0o666
if os.path.isdir(path): if os.path.isdir(path):
@@ -587,14 +606,14 @@ def get_default_file_perms(path, umask):
return base - umask return base - umask
def get_file_perm(path): def get_file_perm(path: str) -> int:
"""return file permission""" """return file permission"""
if not os.path.exists(path): if not os.path.exists(path):
return 0o777 return 0o777
return os.stat(path, follow_symlinks=True).st_mode & 0o777 return os.stat(path, follow_symlinks=True).st_mode & 0o777
def chmod(path, mode, debug=False): def chmod(path: str, mode: int, debug: bool=False) -> bool:
"""change mode of file""" """change mode of file"""
if debug: if debug:
LOG.dbg(f'chmod {mode:o} {path}', force=True) LOG.dbg(f'chmod {mode:o} {path}', force=True)
@@ -602,7 +621,8 @@ def chmod(path, mode, debug=False):
return get_file_perm(path) == mode return get_file_perm(path) == mode
def adapt_workers(options, logger): def adapt_workers(options: Options,
logger: Logger) -> None:
"""adapt number of workers if safe/dry""" """adapt number of workers if safe/dry"""
if options.safe and options.workers > 1: if options.safe and options.workers > 1:
logger.warn('workers set to 1 when --force is not used') logger.warn('workers set to 1 when --force is not used')
@@ -612,16 +632,20 @@ def adapt_workers(options, logger):
options.workers = 1 options.workers = 1
def categorize(function, iterable): def categorize(function: Callable, iterable: List[str]) -> List[str]:
"""separate an iterable into elements for which """
separate an iterable into elements for which
function(element) is true for each element and function(element) is true for each element and
for which function(element) is false for each for which function(element) is false for each
element""" element
"""
return (tuple(filter(function, iterable)), return (tuple(filter(function, iterable)),
tuple(itertools.filterfalse(function, iterable))) tuple(itertools.filterfalse(function, iterable)))
def debug_list(title, elems, debug): def debug_list(title: str,
elems: List[Any],
debug: bool) -> None:
"""pretty print list""" """pretty print list"""
if not debug: if not debug:
return return
@@ -630,7 +654,9 @@ def debug_list(title, elems, debug):
LOG.dbg(f'\t- {elem}', force=debug) LOG.dbg(f'\t- {elem}', force=debug)
def debug_dict(title, elems, debug): def debug_dict(title: str,
elems: Any,
debug: bool) -> None:
"""pretty print dict""" """pretty print dict"""
if not debug: if not debug:
return return
@@ -644,7 +670,7 @@ def debug_dict(title, elems, debug):
LOG.dbg(f'\t- \"{k}\": {val}', force=debug) LOG.dbg(f'\t- \"{k}\": {val}', force=debug)
def check_version(): def check_version() -> None:
""" """
get dotdrop latest version on github get dotdrop latest version on github
compare with "version" compare with "version"
@@ -677,7 +703,10 @@ def check_version():
LOG.warn(msg) LOG.warn(msg)
def pivot_path(path, newdir, striphome=False, logger=None): def pivot_path(path: str,
newdir: str,
striphome: bool=False,
logger: Optional[Logger]=None) -> str:
"""change path to be under newdir""" """change path to be under newdir"""
if logger: if logger:
logger.dbg(f'pivot new dir: \"{newdir}\"') logger.dbg(f'pivot new dir: \"{newdir}\"')
@@ -691,7 +720,7 @@ def pivot_path(path, newdir, striphome=False, logger=None):
return new return new
def is_bin_in_path(command): def is_bin_in_path(command: str) -> bool:
""" """
check binary from command is in path check binary from command is in path
""" """