#!/usr/bin/env python3 import os, sys, subprocess, tomllib, argparse COMMAND_LIST = 'list' COMMAND_LOAD = 'load' COMMAND_SAVE = 'save' COMMAND_CTX_NAME = 'context_name' COMMANDS_ALL = [COMMAND_LIST, COMMAND_LOAD, COMMAND_SAVE] SECTION_GENERAL = 'general' KEY_CONTEXTS_PATH = 'contexts_path' KEY_TASKS = 'tasks' KEY_DRY_RUN = 'dry_run' KEY_MANAGED_DIR = 'managed_directory' SECTION_TASK = 'task' KEY_SRC_PATH = 'source_path' KEY_DST_PATH = 'target_path' KEY_RELOAD = 'reload' KEY_ACTIONS = 'actions' KEY_SYMLINK = 'symlinks' KEY_COMMAND = 'commands' KEY_SCRIPT = 'scripts' KEY_CODE = 'code' ACTIONS_ALL = [KEY_SYMLINK, KEY_COMMAND, KEY_SCRIPT, KEY_CODE] ENV_CONTEXT_NAME = 'CONTEXT_NAME' ENV_CONTEXT_SRC = 'CONTEXT_SRC' ENV_CONTEXT_DST = 'CONTEXT_DST' APP_NAME = os.path.basename(sys.argv[0]) APP_DESC = 'A simple configuration switcher for various usage scenarios.' HOME = os.path.expanduser('~') XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.join(HOME, '.config')) class OS(object): def __init__(self, chdir='', env=None): self.shell = os.getenv('SHELL') or 'bash' if env: env.update(os.environ) self.env = env def exec_command(self, command: list[str], quiet=False) -> int: return subprocess.run(command, capture_output=quiet, env=self.env).returncode def exec_script(self, path: str) -> None: return self.exec_command([self.shell, path]) def exec_snippet(self, code: str) -> None: return self.exec_command([self.shell, "-c", code]) class Task(object): def __init__(self, name: str, src: str, dst: str, conf: dict, dry_run=True): self.name = name self.src = conf.get(KEY_SRC_PATH) or src # default is ~/.config/userctx// self.dst = conf.get(KEY_DST_PATH) or dst # default is ~/.config/ self.actions = conf.get(KEY_ACTIONS) or [KEY_SYMLINK] # default is to just symlink all self.symlinks = conf.get(KEY_SYMLINK) self.commands = conf.get(KEY_COMMAND) self.scripts = conf.get(KEY_SCRIPT) self.code = conf.get(KEY_CODE) self.reload_command = conf.get(KEY_RELOAD) self.shell = OS(env={ENV_CONTEXT_NAME: self.name, ENV_CONTEXT_SRC: self.src, ENV_CONTEXT_DST: self.dst}) self.dry_run = dry_run def execute(self) -> None: for a in self.actions: if a == KEY_SYMLINK: self._symlink() elif a == KEY_COMMAND: self._run_commands() elif a == KEY_SCRIPT: self._run_scripts() elif a == KEY_CODE: self._run_code() if self.reload_command and not self.dry_run: self.shell.exec_command(self.reload_command) def _symlink(self) -> None: if not self.symlinks: # will symlink everything we have then self.symlinks = [{entry.name: entry.name} for entry in os.scandir(self.src)] for l in self.symlinks: for k, v in l.items(): src, dst = os.path.join(self.src, k), os.path.join(self.dst, v) if self.dry_run: print("will remove:", dst, "will symlink:", src, "to", dst) continue if os.path.lexists(dst): os.remove(dst) os.symlink(src, dst) def _run_scripts(self) -> None: if not self.scripts: # nothing provided explicitly self.scripts = [entry.name for entry in os.scandir(self.src) if entry.is_file()] if self.dry_run: print(f'will execute scripts from {self.src}:', self.scripts) return for s in self.scripts: self.shell.exec_script(os.path.join(self.src, s)) def _run_code(self) -> None: if self.dry_run: print("will execute code:", self.code) return self.shell.exec_snippet(self.code) def _run_commands(self) -> None: if self.dry_run: print("will execute:", self.commands) return for cmd in self.commands: self.shell.exec_command(cmd) class ContextManagerConfig(object): def __init__(self, conf_file_path: str): try: with open(conf_file_path, 'rb') as f: self.config = tomllib.load(f) except Exception as e: raise ValueError('error parsing "{config_file_path}":', e) def is_arr_of_str(x) -> bool: return isinstance(x, list) and all(isinstance(s, str) for s in x) def is_dict_of_str(x) -> bool: return isinstance(x, dict) and all(isinstance(k, str) and isinstance(v, str) for k, v in x.items()) def user_path(s: str) -> str: return os.path.expanduser(s) if SECTION_GENERAL not in self.config: raise ValueError(f'section "{SECTION_GENERAL}" not found in the config') # Will extnsively check what's inside config.toml without additinal imports. # Also, fuck dynamic typing! self.tasks = self.config[SECTION_GENERAL].get(KEY_TASKS) if not self.tasks or not is_arr_of_str(self.tasks): raise ValueError(f'invalid config: key "{KEY_TASKS}" in section "{SECTION_GENERAL}" must be a non-empty array of strings') if SECTION_TASK not in self.config: # valid case with all defaults self.config[SECTION_TASK] = dict() errors, err_count = list(), 0 # fuck it again self.dry_run = self.config[SECTION_GENERAL].get(KEY_DRY_RUN) or False if not isinstance(self.dry_run, bool): err_count += 1 errors.append(f'{err_count}. key "{KEY_DRY_RUN}" in section "{SECTION_GENERAL} of the config must be bool"') self.home = user_path(self.config[SECTION_GENERAL].get(KEY_CONTEXTS_PATH) or os.path.join(XDG_CONFIG_HOME, 'userctx')) if not os.path.isabs(self.home): err_count +=1 errors.append(f'{err_count}. path "{self.home}" in key "{KEY_CONTEXTS_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.home}"') self.managed_path = user_path(self.config[SECTION_GENERAL].get(KEY_MANAGED_DIR) or XDG_CONFIG_HOME) if not os.path.isabs(self.managed_path): err_count +=1 errors.append(f'{err_count}. path "{self.managed_path}" in key "{KEY_MANAGED_DIR}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.managed_path}"') tasks = self.config.get(SECTION_TASK) for name, t in tasks.items(): errstr = f'for task "{name}"' for k in list(t.keys()): # we might change t v = t[k] if k in [KEY_SRC_PATH, KEY_DST_PATH, KEY_CODE] and not isinstance(v, str): err_count +=1 errors.append(f'{err_count}. {errstr} key "{k}" must be a string') continue elif k in [KEY_COMMAND, KEY_SCRIPT, KEY_RELOAD, KEY_ACTIONS, KEY_SYMLINK] and not isinstance(v, list): err_count +=1 errors.append(f'{err_count}. {errstr} key "{k}" must be an array') continue if k == KEY_COMMAND and not all(is_arr_of_str(elem) for elem in v): err_count +=1 errors.append(f'{err_count}. {errstr} all elements of "{KEY_COMMAND}" must be arrays of strings') if k == KEY_SCRIPT and not is_arr_of_str(v): err_count +=1 errors.append(f'{err_count}. {errstr} "{KEY_SCRIPT}" must be array of strings') if k == KEY_SYMLINK and not all(is_dict_of_str(elem) for elem in v): err_count +=1 errors.append(f'{err_count}. {errstr} all elements of "{KEY_SYMLINK}" array must be mappings (inline tables) of strings') if k == KEY_ACTIONS and not is_arr_of_str(v): err_count +=1 errors.append(f'{err_count}. {errstr} "{KEY_ACTIONS}" must be array if strings') if k == KEY_ACTIONS and not all(a in ACTIONS_ALL for a in v): err_count +=1 errors.append(f'{err_count}. {errstr} found invalid actions: {[a for a in v if a not in ACTIONS_ALL]} in key "{KEY_ACTIONS}", valid actions are: {ACTIONS_ALL}') if k == KEY_RELOAD and not is_arr_of_str(v): err_count +=1 errors.append(f'{err_count}. {errstr} "{KEY_RELOAD}" must be array of strings') if k in [KEY_SRC_PATH, KEY_DST_PATH]: p = user_path(v) if not os.path.isabs(p): err_count +=1 errors.append(f'{err_count}. {errstr} path "{v}" in key "{k}" must be either absolute or relative to home: "~/{v}"') t[k] = p # substituting path (e.g. "~/somedir" -> "/home/user/somedir") if errors: print(f'{err_count} errors found in the config ({conf_file_path}):') print('\n'.join(errors)) raise ValueError('invalid config') def get_contexts_list(self) -> list[str]: return [entry.name for entry in os.scandir(self.home) if entry.is_dir()] def get_tasks_for_context(self, context_name: str) -> list[Task]: ctx_path = os.path.join(self.home, context_name) if not (os.path.exists(ctx_path) and os.path.isdir(ctx_path)): raise ValueError(f'context directory "{context_name}" not found in "{self.home}"') out = list() for name in self.tasks: task_path = os.path.join(ctx_path, name) target_path = os.path.join(self.managed_path, name) t = Task(name, task_path, target_path, self.config[SECTION_TASK].get(name) or dict(), dry_run=self.dry_run) out.append(t) return out class ContextManager(object): def __init__(self, config: ContextManagerConfig): self.config = config def print_contexts_list(self) -> str: print('\n'.join(self.config.get_contexts_list())) def apply_context(self, context_name: str) -> None: print(f'loading and applying context: "{context_name}"') try: tasks = self.config.get_tasks_for_context(context_name) except Exception as e: print(f'error getting tasks for context "{context_name}":', e) return if not tasks: print(f'no tasks found for context "{context_name}"') return errors = list() for task in tasks: print(f'running task "{task.name}"...', end=' ') try: task.execute() print("OK") except Exception as e: errors.append((task.name, e)) print('FAILED') if errors: print('Errors:') for t in errors: print(f'task: "{t[0]}", error: "{t[1]}"') class Runner(object): def run(self) -> int: p = argparse.ArgumentParser(prog=APP_NAME, description=APP_DESC) p.add_argument('-c', '--config', help='path to config file, default: %(default)s', default=os.path.join(XDG_CONFIG_HOME, 'userctx', 'config.toml')) subp = p.add_subparsers(help='command to execute', dest='command') loadp = subp.add_parser(COMMAND_LOAD, help='load context (provide name of context)') loadp.add_argument(COMMAND_CTX_NAME, help='name of context to load') listp = subp.add_parser(COMMAND_LIST, help='list available contexts') savep = subp.add_parser(COMMAND_SAVE, help='save current context (provide name of context)') savep.add_argument(COMMAND_CTX_NAME, help='name of context to save') try: # just in case args = p.parse_args() except Exception as e: print("error parsing command-line arguments:", e) return 1 try: config = ContextManagerConfig(args.config) except Exception as e: print("error loading config:", e) return 2 try: ctxmgr = ContextManager(config) if args.command == COMMAND_LIST: ctxmgr.print_contexts_list() elif args.command == COMMAND_LOAD: ctxmgr.apply_context(args.__dict__[COMMAND_CTX_NAME]) # guaranteed to be present by argparse elif args.command == COMMAND_SAVE: raise Exception('"save" not implemented') except Exception as e: print(f'error executing command:', e) return 3 return 0 def main(): sys.exit(Runner().run()) if __name__ == '__main__': main()