Files
userctx/userctx.py

294 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import os, sys, subprocess, tomllib
COMMAND_LIST = 'list'
COMMAND_LOAD = 'load'
COMMAND_SAVE = 'save'
COMMANDS_ALL = [COMMAND_LIST, COMMAND_LOAD, COMMAND_SAVE]
SECTION_GENERAL = 'general'
KEY_CONTEXTS_PATH = 'contexts_path'
KEY_TASKS = 'tasks'
KEY_DRY_RUN = 'dry_run'
KEY_MANAGED_DIR = 'managed_directory'
SECTION_TASK = 'task'
KEY_SRC_PATH = 'source_path'
KEY_DST_PATH = 'target_path'
KEY_RELOAD = 'reload'
KEY_ACTIONS = 'actions'
KEY_SYMLINK = 'symlinks'
KEY_COMMAND = 'commands'
KEY_SCRIPT = 'scripts'
KEY_CODE = 'code'
ACTIONS_ALL = [KEY_SYMLINK, KEY_COMMAND, KEY_SCRIPT, KEY_CODE]
ENV_CONTEXT_NAME = 'CONTEXT_NAME'
ENV_CONTEXT_SRC = 'CONTEXT_SRC'
ENV_CONTEXT_DST = 'CONTEXT_DST'
HELP_MESSAGE = f'''Usage: {os.path.basename(sys.argv[0])} <command> [context_name]
Commands are:
{COMMAND_LIST} - to list installed contexts (from $HOME/.config/userctx)
{COMMAND_LOAD} - to load context "context_name"
{COMMAND_SAVE} - to save current configs of managed apps as context
"context_name" - must be a directory name present in $HOME/.config/userctx'''
HOME = os.path.expanduser('~')
XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.join(HOME, '.config'))
class OS(object):
def __init__(self, chdir='', env=None):
self.shell = os.getenv('SHELL') or 'bash'
if env: env.update(os.environ)
self.env = env
def exec_command(self, command: list[str], quiet=False) -> int:
return subprocess.run(command, capture_output=quiet, env=self.env).returncode
def exec_script(self, path: str) -> None:
return self.exec_command([self.shell, path])
def exec_snippet(self, code: str) -> None:
return self.exec_command([self.shell, "-c", code])
class Task(object):
def __init__(self, name: str, src: str, dst: str, conf: dict, dry_run=True):
self.name = name
self.src = conf.get(KEY_SRC_PATH) or src # default is ~/.config/userctx/<context>/<task>
self.dst = conf.get(KEY_DST_PATH) or dst # default is ~/.config/<task>
self.actions = conf.get(KEY_ACTIONS) or [KEY_SYMLINK] # default is to just symlink all
self.symlinks = conf.get(KEY_SYMLINK)
self.commands = conf.get(KEY_COMMAND)
self.scripts = conf.get(KEY_SCRIPT)
self.code = conf.get(KEY_CODE)
self.reload_command = conf.get(KEY_RELOAD)
self.shell = OS(env={ENV_CONTEXT_NAME: self.name, ENV_CONTEXT_SRC: self.src, ENV_CONTEXT_DST: self.dst})
self.dry_run = dry_run
def execute(self) -> None:
for a in self.actions:
if a == KEY_SYMLINK:
self._symlink()
elif a == KEY_COMMAND:
self._run_commands()
elif a == KEY_SCRIPT:
self._run_scripts()
elif a == KEY_CODE:
self._run_code()
if self.reload_command and not self.dry_run:
self.shell.exec_command(self.reload_command)
def _symlink(self) -> None:
if not self.symlinks: # will symlink everything we have then
self.symlinks = [{entry.name: entry.name} for entry in os.scandir(self.src)]
for l in self.symlinks:
for k, v in l.items():
src, dst = os.path.join(self.src, k), os.path.join(self.dst, v)
if self.dry_run:
print("will remove:", dst, "will symlink:", src, "to", dst)
continue
if os.path.lexists(dst): os.remove(dst)
os.symlink(src, dst)
def _run_scripts(self) -> None:
if not self.scripts: # nothing provided explicitly
self.scripts = [entry.name for entry in os.scandir(self.src) if entry.is_file()]
if self.dry_run:
print(f'will execute scripts from {self.src}:', self.scripts)
return
for s in self.scripts:
self.shell.exec_script(os.path.join(self.src, s))
def _run_code(self) -> None:
if self.dry_run:
print("will execute code:", self.code)
return
self.shell.exec_snippet(self.code)
def _run_commands(self) -> None:
if self.dry_run:
print("will execute:", self.commands)
return
for cmd in self.commands:
self.shell.exec_command(cmd)
class ContextManagerConfig(object):
def __init__(self, conf_file_path: str):
try:
with open(conf_file_path, 'rb') as f: self.config = tomllib.load(f)
except Exception as e:
raise ValueError('error parsing "{config_file_path}":', e)
def is_arr_of_str(x) -> bool:
return isinstance(x, list) and all(isinstance(s, str) for s in x)
def is_dict_of_str(x) -> bool:
return isinstance(x, dict) and all(isinstance(k, str) and isinstance(v, str) for k, v in x.items())
def user_path(s: str) -> str:
return os.path.expanduser(s)
if SECTION_GENERAL not in self.config:
raise ValueError(f'section "{SECTION_GENERAL}" not found in the config')
# Will extnsively check what's inside config.toml without additinal imports.
# Also, fuck dynamic typing!
self.tasks = self.config[SECTION_GENERAL].get(KEY_TASKS)
if not self.tasks or not is_arr_of_str(self.tasks):
raise ValueError(f'invalid config: key "{KEY_TASKS}" in section "{SECTION_GENERAL}" must be a non-empty array of strings')
if SECTION_TASK not in self.config: # valid case with all defaults
self.config[SECTION_TASK] = dict()
errors, err_count = list(), 0 # fuck it again
self.dry_run = self.config[SECTION_GENERAL].get(KEY_DRY_RUN) or False
if not isinstance(self.dry_run, bool):
err_count += 1
errors.append(f'{err_count}. key "{KEY_DRY_RUN}" in section "{SECTION_GENERAL} of the config must be bool"')
self.home = user_path(self.config[SECTION_GENERAL].get(KEY_CONTEXTS_PATH) or os.path.join(XDG_CONFIG_HOME, 'userctx'))
if not os.path.isabs(self.home):
err_count +=1
errors.append(f'{err_count}. path "{self.home}" in key "{KEY_CONTEXTS_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.home}"')
self.managed_path = user_path(self.config[SECTION_GENERAL].get(KEY_MANAGED_DIR) or XDG_CONFIG_HOME)
if not os.path.isabs(self.managed_path):
err_count +=1
errors.append(f'{err_count}. path "{self.managed_path}" in key "{KEY_MANAGED_DIR}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.managed_path}"')
tasks = self.config.get(SECTION_TASK)
for name, t in tasks.items():
errstr = f'for task "{name}"'
for k in list(t.keys()): # we might change t
v = t[k]
if k in [KEY_SRC_PATH, KEY_DST_PATH, KEY_CODE] and not isinstance(v, str):
err_count +=1
errors.append(f'{err_count}. {errstr} key "{k}" must be a string')
continue
elif k in [KEY_COMMAND, KEY_SCRIPT, KEY_RELOAD, KEY_ACTIONS, KEY_SYMLINK] and not isinstance(v, list):
err_count +=1
errors.append(f'{err_count}. {errstr} key "{k}" must be an array')
continue
if k == KEY_COMMAND and not all(is_arr_of_str(elem) for elem in v):
err_count +=1
errors.append(f'{err_count}. {errstr} all elements of "{KEY_COMMAND}" must be arrays of strings')
if k == KEY_SCRIPT and not is_arr_of_str(v):
err_count +=1
errors.append(f'{err_count}. {errstr} "{KEY_SCRIPT}" must be array of strings')
if k == KEY_SYMLINK and not all(is_dict_of_str(elem) for elem in v):
err_count +=1
errors.append(f'{err_count}. {errstr} all elements of "{KEY_SYMLINK}" array must be mappings (inline tables) of strings')
if k == KEY_ACTIONS and not is_arr_of_str(v):
err_count +=1
errors.append(f'{err_count}. {errstr} "{KEY_ACTIONS}" must be array if strings')
if k == KEY_ACTIONS and not all(a in ACTIONS_ALL for a in v):
err_count +=1
errors.append(f'{err_count}. {errstr} found invalid actions: {[a for a in v if a not in ACTIONS_ALL]} in key "{KEY_ACTIONS}", valid actions are: {ACTIONS_ALL}')
if k == KEY_RELOAD and not is_arr_of_str(v):
err_count +=1
errors.append(f'{err_count}. {errstr} "{KEY_RELOAD}" must be array of strings')
if k in [KEY_SRC_PATH, KEY_DST_PATH]:
p = user_path(v)
if not os.path.isabs(p):
err_count +=1
errors.append(f'{err_count}. {errstr} path "{v}" in key "{k}" must be either absolute or relative to home: "~/{v}"')
t[k] = p # substituting path (e.g. "~/somedir" -> "/home/user/somedir")
if errors:
print(f'{err_count} errors found in the config ({conf_file_path}):')
print('\n'.join(errors))
raise ValueError('invalid config')
def get_contexts_list(self) -> list[str]:
return [entry.name for entry in os.scandir(self.home) if entry.is_dir()]
def get_tasks_for_context(self, context_name: str) -> list[Task]:
ctx_path = os.path.join(self.home, context_name)
if not (os.path.exists(ctx_path) and os.path.isdir(ctx_path)):
raise ValueError(f'context directory "{context_name}" not found in "{self.home}"')
out = list()
for name in self.tasks:
task_path = os.path.join(ctx_path, name)
target_path = os.path.join(self.managed_path, name)
t = Task(name, task_path, target_path, self.config[SECTION_TASK].get(name) or dict(), dry_run=self.dry_run)
out.append(t)
return out
class ContextManager(object):
def __init__(self, config: ContextManagerConfig):
self.config = config
def get_contexts_list(self) -> str:
return '\n'.join(self.config.get_contexts_list())
def apply_context(self, context_name: str) -> None:
print(f'loading and applying context: "{context_name}"')
try:
tasks = self.config.get_tasks_for_context(context_name)
except Exception as e:
print(f'error getting tasks for context "{context_name}":', e)
return
if not tasks:
print(f'no tasks found for context "{context_name}"')
return
errors = list()
for task in tasks:
print(f'running task "{task.name}"...', end=' ')
try:
task.execute()
print("OK")
except Exception as e:
errors.append((task.name, e))
print('FAILED')
if errors:
print('Errors:')
for t in errors:
print(f'task: "{t[0]}", error: "{t[1]}"')
class Runner(object):
def __init__(self):
self.cmd = sys.argv[1] if len(sys.argv) > 1 else ''
self.context_name = ' '.join(sys.argv[2:]) if len(sys.argv) > 2 else ''
self.config_file_path = os.path.join(XDG_CONFIG_HOME, 'userctx', 'config.toml')
def run(self) -> int:
if self.cmd == '':
print(HELP_MESSAGE)
return 0
if self.cmd not in COMMANDS_ALL:
print(f'invalid command. valid commands are: {", ".join(COMMANDS_ALL)}')
return 1
if self.cmd in [COMMAND_LOAD, COMMAND_SAVE] and self.context_name == '':
print('invalid command. please provide name of context to apply')
return 2
try:
config = ContextManagerConfig(self.config_file_path)
except Exception as e:
print("error loading config:", e)
return 3
try:
ctxmgr = ContextManager(config)
if self.cmd == COMMAND_LIST:
print(ctxmgr.get_contexts_list())
elif self.cmd == COMMAND_LOAD:
ctxmgr.apply_context(self.context_name)
elif self.cmd == COMMAND_SAVE:
print('save not implemented')
except Exception as e:
print(f'error executing command "{self.cmd}":', e)
return 4
return 0
def main():
sys.exit(Runner().run())
if __name__ == '__main__':
main()