working version after refactor

This commit is contained in:
2025-12-21 15:59:56 +03:00
parent 27c7677100
commit f5c0480f88
5 changed files with 220 additions and 399 deletions

View File

@@ -8,21 +8,19 @@ COMMAND_CTX_NAME = 'context_name'
COMMANDS_ALL = [COMMAND_LIST, COMMAND_LOAD, COMMAND_SAVE]
SECTION_GENERAL = 'general'
KEY_CONTEXTS_PATH = 'contexts_path'
KEY_TASKS = 'tasks'
KEY_APPS = 'apps'
KEY_DRY_RUN = 'dry_run'
KEY_MANAGED_DIR = 'managed_directory'
SECTION_TASK = 'task'
SECTION_APPS = 'apps'
KEY_SRC_PATH = 'source_path'
KEY_DST_PATH = 'target_path'
KEY_RELOAD = 'reload'
KEY_ACTIONS = 'actions'
KEY_SYMLINK = 'symlinks'
KEY_COMMAND = 'commands'
KEY_SCRIPT = 'scripts'
KEY_CODE = 'code'
ACTIONS_ALL = [KEY_SYMLINK, KEY_COMMAND, KEY_SCRIPT, KEY_CODE]
KEY_SYMLINK = 'symlink'
KEY_SCRIPT = 'script'
KEY_EXEC = 'exec'
ACTIONS_ALL = [KEY_SYMLINK, KEY_SCRIPT, KEY_EXEC, KEY_RELOAD]
APPS_VALID_KEYS = [KEY_ACTIONS, KEY_SYMLINK, KEY_SCRIPT, KEY_EXEC, KEY_RELOAD, KEY_SRC_PATH, KEY_DST_PATH]
ENV_CONTEXT_NAME = 'CONTEXT_NAME'
ENV_CONTEXT_SRC = 'CONTEXT_SRC'
@@ -48,71 +46,93 @@ class OS(object):
def exec_snippet(self, code: str) -> None:
return self.exec_command([self.shell, "-c", code])
def scan_dir(self, path: str) -> list[str]:
out = []
def scan(dir: str, prefix: str):
entries = list(os.scandir(dir)) # extract values from iterator
out.extend([os.path.join(prefix, e.name) for e in entries if e.is_file()])
for d in [e.name for e in entries if e.is_dir()]:
scan(os.path.join(dir, d), os.path.join(prefix, d))
scan(path, '')
return out
class Task(object):
def __init__(self, name: str, src: str, dst: str, conf: dict, dry_run=True):
self.name = name
self.src = conf.get(KEY_SRC_PATH) or src # default is ~/.config/userctx/<context>/<task>
self.dst = conf.get(KEY_DST_PATH) or dst # default is ~/.config/<task>
self.actions = conf.get(KEY_ACTIONS) or [KEY_SYMLINK] # default is to just symlink all
self.symlinks = conf.get(KEY_SYMLINK)
self.commands = conf.get(KEY_COMMAND)
self.scripts = conf.get(KEY_SCRIPT)
self.code = conf.get(KEY_CODE)
self.reload_command = conf.get(KEY_RELOAD)
self.code = conf.get(KEY_EXEC)
self.reload = conf.get(KEY_RELOAD)
self.actions = conf.get(KEY_ACTIONS)
if self.actions is None: # not set, actions = [] is a no-op conf for app
self.actions = [KEY_SYMLINK]
if self.scripts is not None: # may be an empty array
self.actions.append(KEY_SCRIPT)
if self.code is not None:
self.actions.append(KEY_EXEC)
if self.reload is not None:
self.actions.append(KEY_RELOAD)
self.shell = OS(env={ENV_CONTEXT_NAME: self.name, ENV_CONTEXT_SRC: self.src, ENV_CONTEXT_DST: self.dst})
self.dry_run = dry_run
def execute(self) -> None:
if self.dry_run:
print("Configured actions:", self.actions)
for a in self.actions:
if a == KEY_SYMLINK:
self._symlink()
elif a == KEY_COMMAND:
self._run_commands()
elif a == KEY_SCRIPT:
self._run_scripts()
elif a == KEY_CODE:
self._run_code()
if self.reload_command and not self.dry_run:
self.shell.exec_command(self.reload_command)
elif a == KEY_EXEC:
self._run_code(self.code)
elif a == KEY_RELOAD:
self._run_code(self.reload)
def _symlink(self) -> None:
if not self.symlinks: # will symlink everything we have then
self.symlinks = [{entry.name: entry.name} for entry in os.scandir(self.src)]
for l in self.symlinks:
for k, v in l.items():
src, dst = os.path.join(self.src, k), os.path.join(self.dst, v)
if self.dry_run:
print("will remove:", dst, "will symlink:", src, "to", dst)
continue
if os.path.lexists(dst): os.remove(dst)
os.symlink(src, dst)
entries = self.shell.scan_dir(self.src)
if not entries:
return
if self.symlinks is None: # will symlink everything we have then
self.symlinks = {'*': '*'}
not_included = [e for e in entries if e not in self.symlinks]
if '*' in self.symlinks:
target = self.symlinks['*']
if target == '*': # symlink."*" = "*" in config
self.symlinks.update(dict(zip(not_included, not_included)))
else: # symlink."*" = "some/path/target.file"
self.symlinks[not_included[0]] = target
del self.symlinks['*']
for k, v in self.symlinks.items():
src, dst = os.path.join(self.src, k), os.path.join(self.dst, v)
if self.dry_run:
print("will remove:", dst)
print("will symlink:", src, "to", dst)
continue
if os.path.lexists(dst): os.remove(dst)
os.symlink(src, dst)
def _run_scripts(self) -> None:
if not self.scripts: # nothing provided explicitly
self.scripts = [entry.name for entry in os.scandir(self.src) if entry.is_file()]
self.scripts = [e for e in self.shell.scan_dir(self.src) if e.endswith('.sh')]
if self.dry_run:
print(f'will execute scripts from {self.src}:', self.scripts)
return
for s in self.scripts:
self.shell.exec_script(os.path.join(self.src, s))
def _run_code(self) -> None:
def _run_code(self, snippet: str) -> None:
if self.dry_run:
print("will execute code:", self.code)
print("will execute script:", snippet)
return
self.shell.exec_snippet(self.code)
def _run_commands(self) -> None:
if self.dry_run:
print("will execute:", self.commands)
return
for cmd in self.commands:
self.shell.exec_command(cmd)
self.shell.exec_snippet(snippet)
class ContextManagerConfig(object):
def __init__(self, conf_file_path: str):
def __init__(self, conf_file_path: str, dry_run: bool):
try:
with open(conf_file_path, 'rb') as f: self.config = tomllib.load(f)
except Exception as e:
@@ -127,73 +147,69 @@ class ContextManagerConfig(object):
def user_path(s: str) -> str:
return os.path.expanduser(s)
if SECTION_GENERAL not in self.config:
raise ValueError(f'section "{SECTION_GENERAL}" not found in the config')
if SECTION_GENERAL not in self.config or not isinstance(self.config[SECTION_GENERAL], dict):
raise ValueError(f'section "{SECTION_GENERAL}" must be present in the config and must be a valid mapping (table)')
# Will extnsively check what's inside config.toml without additinal imports.
# Will extensively check what's inside config.toml without additinal imports.
# Also, fuck dynamic typing!
self.tasks = self.config[SECTION_GENERAL].get(KEY_TASKS)
self.tasks = self.config[SECTION_GENERAL].get(KEY_APPS)
if not self.tasks or not is_arr_of_str(self.tasks):
raise ValueError(f'invalid config: key "{KEY_TASKS}" in section "{SECTION_GENERAL}" must be a non-empty array of strings')
if SECTION_TASK not in self.config: # valid case with all defaults
self.config[SECTION_TASK] = dict()
raise ValueError(f'invalid config: key "{KEY_APPS}" in section "{SECTION_GENERAL}" must be a non-empty array of strings')
errors, err_count = list(), 0 # fuck it again
self.dry_run = self.config[SECTION_GENERAL].get(KEY_DRY_RUN) or False
self.dry_run = (self.config[SECTION_GENERAL].get(KEY_DRY_RUN) or False) or dry_run
if not isinstance(self.dry_run, bool):
err_count += 1
errors.append(f'{err_count}. key "{KEY_DRY_RUN}" in section "{SECTION_GENERAL} of the config must be bool"')
self.home = user_path(self.config[SECTION_GENERAL].get(KEY_CONTEXTS_PATH) or os.path.join(XDG_CONFIG_HOME, 'userctx'))
self.home = user_path(self.config[SECTION_GENERAL].get(KEY_SRC_PATH) or os.path.join(XDG_CONFIG_HOME, 'userctx'))
if not os.path.isabs(self.home):
err_count +=1
errors.append(f'{err_count}. path "{self.home}" in key "{KEY_CONTEXTS_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.home}"')
err_count += 1
errors.append(f'{err_count}. path "{self.home}" in key "{KEY_SRC_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.home}"')
self.managed_path = user_path(self.config[SECTION_GENERAL].get(KEY_MANAGED_DIR) or XDG_CONFIG_HOME)
self.managed_path = user_path(self.config[SECTION_GENERAL].get(KEY_DST_PATH) or XDG_CONFIG_HOME)
if not os.path.isabs(self.managed_path):
err_count +=1
errors.append(f'{err_count}. path "{self.managed_path}" in key "{KEY_MANAGED_DIR}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.managed_path}"')
err_count += 1
errors.append(f'{err_count}. path "{self.managed_path}" in key "{KEY_DST_PATH}" of section "{SECTION_GENERAL}" must be absolute or relative to home: "~/{self.managed_path}"')
if SECTION_APPS not in self.config: # valid case with all defaults
self.config[SECTION_APPS] = dict()
tasks = self.config.get(SECTION_TASK)
tasks = self.config.get(SECTION_APPS)
for name, t in tasks.items():
errstr = f'for task "{name}"'
errstr = f'for app "{name}"'
for k in list(t.keys()): # we might change t
v = t[k]
if k in [KEY_SRC_PATH, KEY_DST_PATH, KEY_CODE] and not isinstance(v, str):
err_count +=1
if k not in APPS_VALID_KEYS:
err_count += 1
errors.append(f'{err_count}. {errstr} unknown key "{k}"')
continue
if k == KEY_EXEC and not isinstance(v, str):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must be a string')
continue
elif k in [KEY_COMMAND, KEY_SCRIPT, KEY_RELOAD, KEY_ACTIONS, KEY_SYMLINK] and not isinstance(v, list):
err_count +=1
errors.append(f'{err_count}. {errstr} key "{k}" must be an array')
continue
if k == KEY_COMMAND and not all(is_arr_of_str(elem) for elem in v):
err_count +=1
errors.append(f'{err_count}. {errstr} all elements of "{KEY_COMMAND}" must be arrays of strings')
if k == KEY_SCRIPT and not is_arr_of_str(v):
err_count +=1
errors.append(f'{err_count}. {errstr} "{KEY_SCRIPT}" must be array of strings')
if k == KEY_SYMLINK and not all(is_dict_of_str(elem) for elem in v):
err_count +=1
errors.append(f'{err_count}. {errstr} all elements of "{KEY_SYMLINK}" array must be mappings (inline tables) of strings')
if k == KEY_ACTIONS and not is_arr_of_str(v):
err_count +=1
errors.append(f'{err_count}. {errstr} "{KEY_ACTIONS}" must be array if strings')
if k == KEY_ACTIONS and not all(a in ACTIONS_ALL for a in v):
err_count +=1
errors.append(f'{err_count}. {errstr} found invalid actions: {[a for a in v if a not in ACTIONS_ALL]} in key "{KEY_ACTIONS}", valid actions are: {ACTIONS_ALL}')
if k == KEY_RELOAD and not is_arr_of_str(v):
err_count +=1
errors.append(f'{err_count}. {errstr} "{KEY_RELOAD}" must be array of strings')
if k in [KEY_SRC_PATH, KEY_DST_PATH]:
elif k == KEY_SYMLINK and not is_dict_of_str(v):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must be a table (mapping)')
elif k in [KEY_SCRIPT, KEY_ACTIONS] and not is_arr_of_str(v):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must be array of strings')
elif k == KEY_ACTIONS and not all(a in ACTIONS_ALL for a in v):
err_count += 1
errors.append(f'{err_count}. {errstr} invalid actions found: {[a for a in v if a not in ACTIONS_ALL]} in key "{k}", valid actions are: {ACTIONS_ALL}')
elif k == KEY_RELOAD and not isinstance(v, str):
err_count += 1
errors.append(f'{err_count}. {errstr} "{KEY_RELOAD}" must ba a string')
elif k in [KEY_SRC_PATH, KEY_DST_PATH] and not isinstance(v, str):
err_count += 1
errors.append(f'{err_count}. {errstr} key "{k}" must contain a string')
elif k in [KEY_SRC_PATH, KEY_DST_PATH]:
p = user_path(v)
if not os.path.isabs(p):
err_count +=1
err_count += 1
errors.append(f'{err_count}. {errstr} path "{v}" in key "{k}" must be either absolute or relative to home: "~/{v}"')
t[k] = p # substituting path (e.g. "~/somedir" -> "/home/user/somedir")
if errors:
print(f'{err_count} errors found in the config ({conf_file_path}):')
print(f'{err_count} errors found in the config "{conf_file_path}":')
print('\n'.join(errors))
raise ValueError('invalid config')
@@ -206,9 +222,9 @@ class ContextManagerConfig(object):
raise ValueError(f'context directory "{context_name}" not found in "{self.home}"')
out = list()
for name in self.tasks:
task_path = os.path.join(ctx_path, name)
target_path = os.path.join(self.managed_path, name)
t = Task(name, task_path, target_path, self.config[SECTION_TASK].get(name) or dict(), dry_run=self.dry_run)
src_path = os.path.join(ctx_path, name)
dst_path = os.path.join(self.managed_path, name)
t = Task(name, src_path, dst_path, self.config[SECTION_APPS].get(name) or dict(), dry_run=self.dry_run)
out.append(t)
return out
@@ -217,10 +233,10 @@ class ContextManager(object):
def __init__(self, config: ContextManagerConfig):
self.config = config
def print_contexts_list(self) -> str:
def print_contexts_list(self) -> None:
print('\n'.join(self.config.get_contexts_list()))
def apply_context(self, context_name: str) -> None:
def load_context(self, context_name: str) -> None:
print(f'loading and applying context: "{context_name}"')
try:
tasks = self.config.get_tasks_for_context(context_name)
@@ -250,6 +266,8 @@ class Runner(object):
p = argparse.ArgumentParser(prog=APP_NAME, description=APP_DESC)
p.add_argument('-c', '--config', help='path to config file, default: %(default)s',
default=os.path.join(XDG_CONFIG_HOME, 'userctx', 'config.toml'))
p.add_argument('-n', '--nop', action='store_true', default=False,
help='do nothing, just print out what userctx will do')
subp = p.add_subparsers(help='command to execute', dest='command')
loadp = subp.add_parser(COMMAND_LOAD, help='load context (provide name of context)')
loadp.add_argument(COMMAND_CTX_NAME, help='name of context to load')
@@ -262,7 +280,7 @@ class Runner(object):
print("error parsing command-line arguments:", e)
return 1
try:
config = ContextManagerConfig(args.config)
config = ContextManagerConfig(args.config, args.nop)
except Exception as e:
print("error loading config:", e)
return 2
@@ -271,7 +289,7 @@ class Runner(object):
if args.command == COMMAND_LIST:
ctxmgr.print_contexts_list()
elif args.command == COMMAND_LOAD:
ctxmgr.apply_context(args.__dict__[COMMAND_CTX_NAME]) # guaranteed to be present by argparse
ctxmgr.load_context(args.__dict__[COMMAND_CTX_NAME]) # guaranteed to be present by argparse
elif args.command == COMMAND_SAVE:
raise Exception('"save" not implemented')
except Exception as e: