301 lines
13 KiB
Python
301 lines
13 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
import os, sys, subprocess, tomllib, argparse
|
||
|
|
|
||
|
|
COMMAND_LIST = 'list'
|
||
|
|
COMMAND_APPLY = 'apply'
|
||
|
|
COMMAND_CTX_NAME = 'context_name'
|
||
|
|
COMMANDS_ALL = [COMMAND_LIST, COMMAND_APPLY]
|
||
|
|
|
||
|
|
SECTION_GENERAL = 'general'
|
||
|
|
KEY_APPS = 'apps'
|
||
|
|
KEY_DRY_RUN = 'dry_run'
|
||
|
|
|
||
|
|
SECTION_APPS = 'apps'
|
||
|
|
KEY_SRC_PATH = 'source_path'
|
||
|
|
KEY_DST_PATH = 'target_path'
|
||
|
|
KEY_RELOAD = 'reload'
|
||
|
|
KEY_ACTIONS = 'actions'
|
||
|
|
KEY_SYMLINK = 'symlink'
|
||
|
|
KEY_SCRIPT = 'script'
|
||
|
|
KEY_EXEC = 'exec'
|
||
|
|
ACTIONS_ALL = [KEY_SYMLINK, KEY_SCRIPT, KEY_EXEC, KEY_RELOAD]
|
||
|
|
APPS_VALID_KEYS = [KEY_ACTIONS, KEY_SYMLINK, KEY_SCRIPT, KEY_EXEC, KEY_RELOAD, KEY_SRC_PATH, KEY_DST_PATH]
|
||
|
|
|
||
|
|
ENV_CONTEXT_NAME = 'CONTEXT_NAME'
|
||
|
|
ENV_CONTEXT_SRC = 'CONTEXT_SRC'
|
||
|
|
ENV_CONTEXT_DST = 'CONTEXT_DST'
|
||
|
|
|
||
|
|
APP_NAME = os.path.basename(sys.argv[0])
|
||
|
|
APP_DESC = 'A simple configuration switcher for various usage scenarios.'
|
||
|
|
HOME = os.path.expanduser('~')
|
||
|
|
XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.join(HOME, '.config'))
|
||
|
|
|
||
|
|
class OS(object):
|
||
|
|
def __init__(self, chdir='', env=None):
|
||
|
|
self.shell = os.getenv('SHELL') or 'bash'
|
||
|
|
if env: env.update(os.environ)
|
||
|
|
self.env = env
|
||
|
|
|
||
|
|
def exec_command(self, command: list[str], quiet=False) -> int:
|
||
|
|
return subprocess.run(command, capture_output=quiet, env=self.env).returncode
|
||
|
|
|
||
|
|
def exec_script(self, path: str) -> None:
|
||
|
|
return self.exec_command([self.shell, path])
|
||
|
|
|
||
|
|
def exec_snippet(self, code: str) -> None:
|
||
|
|
return self.exec_command([self.shell, "-c", code])
|
||
|
|
|
||
|
|
def scan_dir(self, path: str) -> list[str]:
|
||
|
|
out = []
|
||
|
|
def scan(dir: str, prefix: str):
|
||
|
|
entries = list(os.scandir(dir)) # extract values from iterator
|
||
|
|
out.extend([os.path.join(prefix, e.name) for e in entries if e.is_file()])
|
||
|
|
for d in [e.name for e in entries if e.is_dir()]:
|
||
|
|
scan(os.path.join(dir, d), os.path.join(prefix, d))
|
||
|
|
scan(path, '')
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
class Task(object):
|
||
|
|
def __init__(self, name: str, src: str, dst: str, conf: dict, dry_run=True):
|
||
|
|
self.name = name
|
||
|
|
self.src = conf.get(KEY_SRC_PATH) or src # default is ~/.config/userctx/<context>/<task>
|
||
|
|
self.dst = conf.get(KEY_DST_PATH) or dst # default is ~/.config/<task>
|
||
|
|
self.symlinks = conf.get(KEY_SYMLINK)
|
||
|
|
self.scripts = conf.get(KEY_SCRIPT)
|
||
|
|
self.code = conf.get(KEY_EXEC)
|
||
|
|
self.reload = conf.get(KEY_RELOAD)
|
||
|
|
self.actions = conf.get(KEY_ACTIONS)
|
||
|
|
if self.actions is None: # not set, actions = [] is a no-op conf for app
|
||
|
|
self.actions = [KEY_SYMLINK]
|
||
|
|
if self.scripts is not None: # may be an empty array
|
||
|
|
self.actions.append(KEY_SCRIPT)
|
||
|
|
if self.code is not None:
|
||
|
|
self.actions.append(KEY_EXEC)
|
||
|
|
if self.reload is not None:
|
||
|
|
self.actions.append(KEY_RELOAD)
|
||
|
|
self.shell = OS(env={ENV_CONTEXT_NAME: self.name, ENV_CONTEXT_SRC: self.src, ENV_CONTEXT_DST: self.dst})
|
||
|
|
self.dry_run = dry_run
|
||
|
|
|
||
|
|
def execute(self) -> None:
|
||
|
|
if self.dry_run:
|
||
|
|
print("Configured actions:", self.actions)
|
||
|
|
for a in self.actions:
|
||
|
|
if a == KEY_SYMLINK:
|
||
|
|
self._symlink()
|
||
|
|
elif a == KEY_SCRIPT:
|
||
|
|
self._run_scripts()
|
||
|
|
elif a == KEY_EXEC:
|
||
|
|
self._run_code(self.code)
|
||
|
|
elif a == KEY_RELOAD:
|
||
|
|
self._run_code(self.reload)
|
||
|
|
|
||
|
|
def _symlink(self) -> None:
|
||
|
|
entries = self.shell.scan_dir(self.src)
|
||
|
|
if not entries:
|
||
|
|
return
|
||
|
|
if self.symlinks is None: # will symlink everything we have then
|
||
|
|
self.symlinks = {'*': '*'}
|
||
|
|
not_included = [e for e in entries if e not in self.symlinks]
|
||
|
|
|
||
|
|
if '*' in self.symlinks:
|
||
|
|
target = self.symlinks['*']
|
||
|
|
if target == '*': # symlink."*" = "*" in config
|
||
|
|
self.symlinks.update(dict(zip(not_included, not_included)))
|
||
|
|
else: # symlink."*" = "some/path/target.file"
|
||
|
|
self.symlinks[not_included[0]] = target
|
||
|
|
del self.symlinks['*']
|
||
|
|
|
||
|
|
for k, v in self.symlinks.items():
|
||
|
|
src, dst = os.path.join(self.src, k), os.path.join(self.dst, v)
|
||
|
|
if self.dry_run:
|
||
|
|
print("will remove:", dst)
|
||
|
|
print("will symlink:", src, "to", dst)
|
||
|
|
continue
|
||
|
|
if os.path.lexists(dst): os.remove(dst)
|
||
|
|
os.symlink(src, dst)
|
||
|
|
|
||
|
|
def _run_scripts(self) -> None:
|
||
|
|
if not self.scripts: # nothing provided explicitly
|
||
|
|
self.scripts = [e for e in self.shell.scan_dir(self.src) if e.endswith('.sh')]
|
||
|
|
if self.dry_run:
|
||
|
|
print(f'will execute scripts from {self.src}:', self.scripts)
|
||
|
|
return
|
||
|
|
for s in self.scripts:
|
||
|
|
self.shell.exec_script(os.path.join(self.src, s))
|
||
|
|
|
||
|
|
def _run_code(self, snippet: str) -> None:
|
||
|
|
if self.dry_run:
|
||
|
|
print("will execute script:", snippet)
|
||
|
|
return
|
||
|
|
self.shell.exec_snippet(snippet)
|
||
|
|
|
||
|
|
class ContextManagerConfig(object):
|
||
|
|
def __init__(self, conf_file_path: str, dry_run: bool):
|
||
|
|
try:
|
||
|
|
with open(conf_file_path, 'rb') as f: self.config = tomllib.load(f)
|
||
|
|
except Exception as e:
|
||
|
|
raise ValueError('error parsing "{config_file_path}":', e)
|
||
|
|
|
||
|
|
def is_arr_of_str(x) -> bool:
|
||
|
|
return isinstance(x, list) and all(isinstance(s, str) for s in x)
|
||
|
|
|
||
|
|
def is_dict_of_str(x) -> bool:
|
||
|
|
return isinstance(x, dict) and all(isinstance(k, str) and isinstance(v, str) for k, v in x.items())
|
||
|
|
|
||
|
|
def user_path(s: str) -> str:
|
||
|
|
return os.path.expanduser(s)
|
||
|
|
|
||
|
|
if SECTION_GENERAL not in self.config or not isinstance(self.config[SECTION_GENERAL], dict):
|
||
|
|
raise ValueError(f'section "{SECTION_GENERAL}" must be present in the config and must be a valid mapping (table)')
|
||
|
|
|
||
|
|
# Will extensively check what's inside config.toml without additinal imports.
|
||
|
|
# Also, fuck dynamic typing!
|
||
|
|
self.tasks = self.config[SECTION_GENERAL].get(KEY_APPS)
|
||
|
|
if not self.tasks or not is_arr_of_str(self.tasks):
|
||
|
|
raise ValueError(f'invalid config: key "{KEY_APPS}" in section "{SECTION_GENERAL}" must be a non-empty array of strings')
|
||
|
|
|
||
|
|
errors, err_count = list(), 0 # fuck it again
|
||
|
|
self.dry_run = (self.config[SECTION_GENERAL].get(KEY_DRY_RUN) or False) or dry_run
|
||
|
|
if not isinstance(self.dry_run, bool):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. key "{KEY_DRY_RUN}" in section "{SECTION_GENERAL} of the config must be bool"')
|
||
|
|
|
||
|
|
self.home = user_path(self.config[SECTION_GENERAL].get(KEY_SRC_PATH) or os.path.join(XDG_CONFIG_HOME, 'userctx'))
|
||
|
|
if not os.path.isabs(self.home):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. path "{self.home}" in key "{KEY_SRC_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.home}"')
|
||
|
|
|
||
|
|
self.managed_path = user_path(self.config[SECTION_GENERAL].get(KEY_DST_PATH) or XDG_CONFIG_HOME)
|
||
|
|
if not os.path.isabs(self.managed_path):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. path "{self.managed_path}" in key "{KEY_DST_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.managed_path}"')
|
||
|
|
|
||
|
|
if SECTION_APPS not in self.config: # valid case with all defaults
|
||
|
|
self.config[SECTION_APPS] = dict()
|
||
|
|
|
||
|
|
tasks = self.config.get(SECTION_APPS)
|
||
|
|
for name, t in tasks.items():
|
||
|
|
errstr = f'for app "{name}"'
|
||
|
|
for k in list(t.keys()): # we might change t
|
||
|
|
v = t[k]
|
||
|
|
if k not in APPS_VALID_KEYS:
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} unknown key "{k}"')
|
||
|
|
continue
|
||
|
|
if k == KEY_EXEC and not isinstance(v, str):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} key "{k}" must be a string')
|
||
|
|
elif k == KEY_SYMLINK and not is_dict_of_str(v):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} key "{k}" must be a table (mapping)')
|
||
|
|
elif k in [KEY_SCRIPT, KEY_ACTIONS] and not is_arr_of_str(v):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} key "{k}" must be array of strings')
|
||
|
|
elif k == KEY_ACTIONS and not all(a in ACTIONS_ALL for a in v):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} invalid actions found: {[a for a in v if a not in ACTIONS_ALL]} in key "{k}", valid actions are: {ACTIONS_ALL}')
|
||
|
|
elif k == KEY_RELOAD and not isinstance(v, str):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} "{KEY_RELOAD}" must ba a string')
|
||
|
|
elif k in [KEY_SRC_PATH, KEY_DST_PATH] and not isinstance(v, str):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} key "{k}" must contain a string')
|
||
|
|
elif k in [KEY_SRC_PATH, KEY_DST_PATH]:
|
||
|
|
p = user_path(v)
|
||
|
|
if not os.path.isabs(p):
|
||
|
|
err_count += 1
|
||
|
|
errors.append(f'{err_count}. {errstr} path "{v}" in key "{k}" must be either absolute or relative to home: "~/{v}"')
|
||
|
|
t[k] = p # substituting path (e.g. "~/somedir" -> "/home/user/somedir")
|
||
|
|
if errors:
|
||
|
|
print(f'{err_count} errors found in the config "{conf_file_path}":')
|
||
|
|
print('\n'.join(errors))
|
||
|
|
raise ValueError('invalid config')
|
||
|
|
|
||
|
|
def get_contexts_list(self) -> list[str]:
|
||
|
|
return [entry.name for entry in os.scandir(self.home) if entry.is_dir()]
|
||
|
|
|
||
|
|
def get_tasks_for_context(self, context_name: str) -> list[Task]:
|
||
|
|
ctx_path = os.path.join(self.home, context_name)
|
||
|
|
if not (os.path.exists(ctx_path) and os.path.isdir(ctx_path)):
|
||
|
|
raise ValueError(f'context directory "{context_name}" not found in "{self.home}"')
|
||
|
|
out = list()
|
||
|
|
for name in self.tasks:
|
||
|
|
src_path = os.path.join(ctx_path, name)
|
||
|
|
dst_path = os.path.join(self.managed_path, name)
|
||
|
|
t = Task(name, src_path, dst_path, self.config[SECTION_APPS].get(name) or dict(), dry_run=self.dry_run)
|
||
|
|
out.append(t)
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
class ContextManager(object):
|
||
|
|
def __init__(self, config: ContextManagerConfig):
|
||
|
|
self.config = config
|
||
|
|
|
||
|
|
def list_contexts(self) -> None:
|
||
|
|
print('\n'.join(self.config.get_contexts_list()))
|
||
|
|
|
||
|
|
def apply_context(self, context_name: str) -> None:
|
||
|
|
print(f'loading and applying context: "{context_name}"')
|
||
|
|
try:
|
||
|
|
tasks = self.config.get_tasks_for_context(context_name)
|
||
|
|
except Exception as e:
|
||
|
|
print(f'error getting tasks for context "{context_name}":', e)
|
||
|
|
return
|
||
|
|
if not tasks:
|
||
|
|
print(f'no tasks found for context "{context_name}"')
|
||
|
|
return
|
||
|
|
errors = list()
|
||
|
|
for task in tasks:
|
||
|
|
print(f'running task "{task.name}"...', end=' ')
|
||
|
|
try:
|
||
|
|
task.execute()
|
||
|
|
print("OK")
|
||
|
|
except Exception as e:
|
||
|
|
errors.append((task.name, e))
|
||
|
|
print('FAILED')
|
||
|
|
if errors:
|
||
|
|
print('Errors:')
|
||
|
|
for t in errors:
|
||
|
|
print(f'task: "{t[0]}", error: "{t[1]}"')
|
||
|
|
|
||
|
|
|
||
|
|
class Runner(object):
|
||
|
|
def run(self) -> int:
|
||
|
|
p = argparse.ArgumentParser(prog=APP_NAME, description=APP_DESC)
|
||
|
|
p.add_argument('-c', '--config', help='path to config file, default: %(default)s',
|
||
|
|
default=os.path.join(XDG_CONFIG_HOME, 'userctx', 'config.toml'))
|
||
|
|
p.add_argument('-n', '--nop', action='store_true', default=False,
|
||
|
|
help='do nothing, just print out what userctx will do')
|
||
|
|
subp = p.add_subparsers(help='command to execute', dest='command')
|
||
|
|
applyp = subp.add_parser(COMMAND_APPLY, help='load context (provide name of context)')
|
||
|
|
applyp.add_argument(COMMAND_CTX_NAME, help='name of context to load')
|
||
|
|
listp = subp.add_parser(COMMAND_LIST, help='list available contexts')
|
||
|
|
try: # just in case
|
||
|
|
args = p.parse_args()
|
||
|
|
except Exception as e:
|
||
|
|
print("error parsing command-line arguments:", e)
|
||
|
|
return 1
|
||
|
|
try:
|
||
|
|
config = ContextManagerConfig(args.config, args.nop)
|
||
|
|
except Exception as e:
|
||
|
|
print("error loading config:", e)
|
||
|
|
return 2
|
||
|
|
try:
|
||
|
|
ctxmgr = ContextManager(config)
|
||
|
|
if args.command == COMMAND_LIST:
|
||
|
|
ctxmgr.list_contexts()
|
||
|
|
elif args.command == COMMAND_APPLY:
|
||
|
|
ctxmgr.apply_context(args.__dict__[COMMAND_CTX_NAME]) # guaranteed to be present by argparse
|
||
|
|
except Exception as e:
|
||
|
|
print(f'error executing command:', e)
|
||
|
|
return 3
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
sys.exit(Runner().run())
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
main()
|