Files
userctx/userctx.py

301 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2025-12-17 19:55:08 +03:00
import os, sys, subprocess, tomllib, argparse
COMMAND_LIST = 'list'
2025-12-21 21:05:10 +03:00
COMMAND_APPLY = 'apply'
2025-12-17 19:55:08 +03:00
COMMAND_CTX_NAME = 'context_name'
2025-12-21 21:05:10 +03:00
COMMANDS_ALL = [COMMAND_LIST, COMMAND_APPLY]
SECTION_GENERAL = 'general'
2025-12-21 15:59:56 +03:00
KEY_APPS = 'apps'
KEY_DRY_RUN = 'dry_run'
2025-12-21 15:59:56 +03:00
SECTION_APPS = 'apps'
KEY_SRC_PATH = 'source_path'
KEY_DST_PATH = 'target_path'
KEY_RELOAD = 'reload'
KEY_ACTIONS = 'actions'
2025-12-21 15:59:56 +03:00
KEY_SYMLINK = 'symlink'
KEY_SCRIPT = 'script'
KEY_EXEC = 'exec'
ACTIONS_ALL = [KEY_SYMLINK, KEY_SCRIPT, KEY_EXEC, KEY_RELOAD]
APPS_VALID_KEYS = [KEY_ACTIONS, KEY_SYMLINK, KEY_SCRIPT, KEY_EXEC, KEY_RELOAD, KEY_SRC_PATH, KEY_DST_PATH]
ENV_CONTEXT_NAME = 'CONTEXT_NAME'
ENV_CONTEXT_SRC = 'CONTEXT_SRC'
ENV_CONTEXT_DST = 'CONTEXT_DST'
2025-12-17 19:55:08 +03:00
APP_NAME = os.path.basename(sys.argv[0])
APP_DESC = 'A simple configuration switcher for various usage scenarios.'
HOME = os.path.expanduser('~')
XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.join(HOME, '.config'))
class OS(object):
def __init__(self, chdir='', env=None):
self.shell = os.getenv('SHELL') or 'bash'
if env: env.update(os.environ)
self.env = env
def exec_command(self, command: list[str], quiet=False) -> int:
return subprocess.run(command, capture_output=quiet, env=self.env).returncode
def exec_script(self, path: str) -> None:
return self.exec_command([self.shell, path])
def exec_snippet(self, code: str) -> None:
return self.exec_command([self.shell, "-c", code])
2025-12-21 15:59:56 +03:00
def scan_dir(self, path: str) -> list[str]:
out = []
def scan(dir: str, prefix: str):
entries = list(os.scandir(dir)) # extract values from iterator
out.extend([os.path.join(prefix, e.name) for e in entries if e.is_file()])
for d in [e.name for e in entries if e.is_dir()]:
scan(os.path.join(dir, d), os.path.join(prefix, d))
scan(path, '')
return out
class Task(object):
def __init__(self, name: str, src: str, dst: str, conf: dict, dry_run=True):
self.name = name
self.src = conf.get(KEY_SRC_PATH) or src # default is ~/.config/userctx/<context>/<task>
self.dst = conf.get(KEY_DST_PATH) or dst # default is ~/.config/<task>
self.symlinks = conf.get(KEY_SYMLINK)
self.scripts = conf.get(KEY_SCRIPT)
2025-12-21 15:59:56 +03:00
self.code = conf.get(KEY_EXEC)
self.reload = conf.get(KEY_RELOAD)
self.actions = conf.get(KEY_ACTIONS)
if self.actions is None: # not set, actions = [] is a no-op conf for app
self.actions = [KEY_SYMLINK]
if self.scripts is not None: # may be an empty array
self.actions.append(KEY_SCRIPT)
if self.code is not None:
self.actions.append(KEY_EXEC)
if self.reload is not None:
self.actions.append(KEY_RELOAD)
self.shell = OS(env={ENV_CONTEXT_NAME: self.name, ENV_CONTEXT_SRC: self.src, ENV_CONTEXT_DST: self.dst})
self.dry_run = dry_run
def execute(self) -> None:
2025-12-21 15:59:56 +03:00
if self.dry_run:
print("Configured actions:", self.actions)
for a in self.actions:
if a == KEY_SYMLINK:
self._symlink()
elif a == KEY_SCRIPT:
self._run_scripts()
2025-12-21 15:59:56 +03:00
elif a == KEY_EXEC:
self._run_code(self.code)
elif a == KEY_RELOAD:
self._run_code(self.reload)
def _symlink(self) -> None:
2025-12-21 15:59:56 +03:00
entries = self.shell.scan_dir(self.src)
if not entries:
return
if self.symlinks is None: # will symlink everything we have then
self.symlinks = {'*': '*'}
not_included = [e for e in entries if e not in self.symlinks]
if '*' in self.symlinks:
target = self.symlinks['*']
if target == '*': # symlink."*" = "*" in config
self.symlinks.update(dict(zip(not_included, not_included)))
else: # symlink."*" = "some/path/target.file"
self.symlinks[not_included[0]] = target
del self.symlinks['*']
for k, v in self.symlinks.items():
src, dst = os.path.join(self.src, k), os.path.join(self.dst, v)
if self.dry_run:
print("will remove:", dst)
print("will symlink:", src, "to", dst)
continue
if os.path.lexists(dst): os.remove(dst)
os.symlink(src, dst)
def _run_scripts(self) -> None:
if not self.scripts: # nothing provided explicitly
2025-12-21 15:59:56 +03:00
self.scripts = [e for e in self.shell.scan_dir(self.src) if e.endswith('.sh')]
if self.dry_run:
print(f'will execute scripts from {self.src}:', self.scripts)
return
for s in self.scripts:
self.shell.exec_script(os.path.join(self.src, s))
2025-12-21 15:59:56 +03:00
def _run_code(self, snippet: str) -> None:
if self.dry_run:
2025-12-21 15:59:56 +03:00
print("will execute script:", snippet)
return
2025-12-21 15:59:56 +03:00
self.shell.exec_snippet(snippet)
class ContextManagerConfig(object):
2025-12-21 15:59:56 +03:00
def __init__(self, conf_file_path: str, dry_run: bool):
try:
with open(conf_file_path, 'rb') as f: self.config = tomllib.load(f)
except Exception as e:
raise ValueError('error parsing "{config_file_path}":', e)
def is_arr_of_str(x) -> bool:
return isinstance(x, list) and all(isinstance(s, str) for s in x)
def is_dict_of_str(x) -> bool:
return isinstance(x, dict) and all(isinstance(k, str) and isinstance(v, str) for k, v in x.items())
def user_path(s: str) -> str:
return os.path.expanduser(s)
2025-12-21 15:59:56 +03:00
if SECTION_GENERAL not in self.config or not isinstance(self.config[SECTION_GENERAL], dict):
raise ValueError(f'section "{SECTION_GENERAL}" must be present in the config and must be a valid mapping (table)')
2025-12-21 15:59:56 +03:00
# Will extensively check what's inside config.toml without additinal imports.
# Also, fuck dynamic typing!
2025-12-21 15:59:56 +03:00
self.tasks = self.config[SECTION_GENERAL].get(KEY_APPS)
if not self.tasks or not is_arr_of_str(self.tasks):
2025-12-21 15:59:56 +03:00
raise ValueError(f'invalid config: key "{KEY_APPS}" in section "{SECTION_GENERAL}" must be a non-empty array of strings')
errors, err_count = list(), 0 # fuck it again
2025-12-21 15:59:56 +03:00
self.dry_run = (self.config[SECTION_GENERAL].get(KEY_DRY_RUN) or False) or dry_run
if not isinstance(self.dry_run, bool):
err_count += 1
errors.append(f'{err_count}. key "{KEY_DRY_RUN}" in section "{SECTION_GENERAL} of the config must be bool"')
2025-12-21 15:59:56 +03:00
self.home = user_path(self.config[SECTION_GENERAL].get(KEY_SRC_PATH) or os.path.join(XDG_CONFIG_HOME, 'userctx'))
if not os.path.isabs(self.home):
2025-12-21 15:59:56 +03:00
err_count += 1
errors.append(f'{err_count}. path "{self.home}" in key "{KEY_SRC_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.home}"')
2025-12-21 15:59:56 +03:00
self.managed_path = user_path(self.config[SECTION_GENERAL].get(KEY_DST_PATH) or XDG_CONFIG_HOME)
if not os.path.isabs(self.managed_path):
2025-12-21 15:59:56 +03:00
err_count += 1
errors.append(f'{err_count}. path "{self.managed_path}" in key "{KEY_DST_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.managed_path}"')
if SECTION_APPS not in self.config: # valid case with all defaults
self.config[SECTION_APPS] = dict()
2025-12-21 15:59:56 +03:00
tasks = self.config.get(SECTION_APPS)
for name, t in tasks.items():
2025-12-21 15:59:56 +03:00
errstr = f'for app "{name}"'
for k in list(t.keys()): # we might change t
v = t[k]
2025-12-21 15:59:56 +03:00
if k not in APPS_VALID_KEYS:
err_count += 1
errors.append(f'{err_count}. {errstr} unknown key "{k}"')
continue
2025-12-21 15:59:56 +03:00
if k == KEY_EXEC and not isinstance(v, str):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must be a string')
elif k == KEY_SYMLINK and not is_dict_of_str(v):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must be a table (mapping)')
elif k in [KEY_SCRIPT, KEY_ACTIONS] and not is_arr_of_str(v):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must be array of strings')
elif k == KEY_ACTIONS and not all(a in ACTIONS_ALL for a in v):
err_count += 1
errors.append(f'{err_count}. {errstr} invalid actions found: {[a for a in v if a not in ACTIONS_ALL]} in key "{k}", valid actions are: {ACTIONS_ALL}')
elif k == KEY_RELOAD and not isinstance(v, str):
err_count += 1
errors.append(f'{err_count}. {errstr} "{KEY_RELOAD}" must ba a string')
elif k in [KEY_SRC_PATH, KEY_DST_PATH] and not isinstance(v, str):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must contain a string')
elif k in [KEY_SRC_PATH, KEY_DST_PATH]:
p = user_path(v)
if not os.path.isabs(p):
2025-12-21 15:59:56 +03:00
err_count += 1
errors.append(f'{err_count}. {errstr} path "{v}" in key "{k}" must be either absolute or relative to home: "~/{v}"')
t[k] = p # substituting path (e.g. "~/somedir" -> "/home/user/somedir")
if errors:
2025-12-21 15:59:56 +03:00
print(f'{err_count} errors found in the config "{conf_file_path}":')
print('\n'.join(errors))
raise ValueError('invalid config')
def get_contexts_list(self) -> list[str]:
return [entry.name for entry in os.scandir(self.home) if entry.is_dir()]
def get_tasks_for_context(self, context_name: str) -> list[Task]:
ctx_path = os.path.join(self.home, context_name)
if not (os.path.exists(ctx_path) and os.path.isdir(ctx_path)):
raise ValueError(f'context directory "{context_name}" not found in "{self.home}"')
out = list()
for name in self.tasks:
2025-12-21 15:59:56 +03:00
src_path = os.path.join(ctx_path, name)
dst_path = os.path.join(self.managed_path, name)
t = Task(name, src_path, dst_path, self.config[SECTION_APPS].get(name) or dict(), dry_run=self.dry_run)
out.append(t)
return out
class ContextManager(object):
def __init__(self, config: ContextManagerConfig):
self.config = config
2025-12-21 15:59:56 +03:00
def print_contexts_list(self) -> None:
2025-12-17 19:55:08 +03:00
print('\n'.join(self.config.get_contexts_list()))
2025-12-21 15:59:56 +03:00
def load_context(self, context_name: str) -> None:
print(f'loading and applying context: "{context_name}"')
try:
tasks = self.config.get_tasks_for_context(context_name)
except Exception as e:
print(f'error getting tasks for context "{context_name}":', e)
return
if not tasks:
print(f'no tasks found for context "{context_name}"')
return
errors = list()
for task in tasks:
print(f'running task "{task.name}"...', end=' ')
try:
task.execute()
print("OK")
except Exception as e:
errors.append((task.name, e))
print('FAILED')
if errors:
print('Errors:')
for t in errors:
print(f'task: "{t[0]}", error: "{t[1]}"')
class Runner(object):
def run(self) -> int:
2025-12-17 19:55:08 +03:00
p = argparse.ArgumentParser(prog=APP_NAME, description=APP_DESC)
p.add_argument('-c', '--config', help='path to config file, default: %(default)s',
default=os.path.join(XDG_CONFIG_HOME, 'userctx', 'config.toml'))
2025-12-21 15:59:56 +03:00
p.add_argument('-n', '--nop', action='store_true', default=False,
help='do nothing, just print out what userctx will do')
2025-12-17 19:55:08 +03:00
subp = p.add_subparsers(help='command to execute', dest='command')
2025-12-21 21:05:10 +03:00
applyp = subp.add_parser(COMMAND_APPLY, help='load context (provide name of context)')
applyp.add_argument(COMMAND_CTX_NAME, help='name of context to load')
2025-12-17 19:55:08 +03:00
listp = subp.add_parser(COMMAND_LIST, help='list available contexts')
try: # just in case
args = p.parse_args()
except Exception as e:
print("error parsing command-line arguments:", e)
return 1
try:
2025-12-21 15:59:56 +03:00
config = ContextManagerConfig(args.config, args.nop)
except Exception as e:
print("error loading config:", e)
2025-12-17 19:55:08 +03:00
return 2
try:
ctxmgr = ContextManager(config)
2025-12-17 19:55:08 +03:00
if args.command == COMMAND_LIST:
ctxmgr.print_contexts_list()
2025-12-21 21:05:10 +03:00
elif args.command == COMMAND_APPLY:
2025-12-21 15:59:56 +03:00
ctxmgr.load_context(args.__dict__[COMMAND_CTX_NAME]) # guaranteed to be present by argparse
except Exception as e:
2025-12-17 19:55:08 +03:00
print(f'error executing command:', e)
return 3
return 0
def main():
sys.exit(Runner().run())
if __name__ == '__main__':
main()