157 lines
5.8 KiB
Python
157 lines
5.8 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
import os
|
||
|
|
import shutil
|
||
|
|
import subprocess
|
||
|
|
import sys
|
||
|
|
|
||
|
|
import tomllib
|
||
|
|
|
||
|
|
SYSCONFIG_PATH = "/etc/sysconfig.toml"
|
||
|
|
|
||
|
|
# Config sections
|
||
|
|
SECTION_GENERAL = "general"
|
||
|
|
SECTION_USERS = "users"
|
||
|
|
SECTION_GROUPS = "groups"
|
||
|
|
SECTION_CONFIGS = "configs"
|
||
|
|
|
||
|
|
# Config keys
|
||
|
|
KEY_PACKAGES = "packages"
|
||
|
|
KEY_PASSWORD = "password"
|
||
|
|
KEY_PUBKEYS = "pubkeys"
|
||
|
|
KEY_USERS_LIST = "users"
|
||
|
|
KEY_BODY = "body"
|
||
|
|
KEY_PERMISSIONS = "permissions"
|
||
|
|
KEY_OWNER = "owner"
|
||
|
|
|
||
|
|
|
||
|
|
class SystemConfigurator:
|
||
|
|
def __init__(self, path: str):
|
||
|
|
self.config_path = path
|
||
|
|
|
||
|
|
def _parse_config(self):
|
||
|
|
with open(self.config_path, "rb") as f:
|
||
|
|
self.config = tomllib.load(f)
|
||
|
|
|
||
|
|
def _exec_command(self, command: list[str], quiet=False) -> int:
|
||
|
|
return subprocess.run(command, capture_output=quiet).returncode
|
||
|
|
|
||
|
|
def _install_packages(self):
|
||
|
|
packages = self.config.get(SECTION_GENERAL, {}).get(KEY_PACKAGES, [])
|
||
|
|
if not packages:
|
||
|
|
print("No packages to install.")
|
||
|
|
return
|
||
|
|
print(f"Installing packages: {', '.join(packages)}")
|
||
|
|
command = ["apk", "add", "--no-cache"] + packages
|
||
|
|
if self._exec_command(command) != 0:
|
||
|
|
raise RuntimeError(f"Failed to install packages: {', '.join(packages)}")
|
||
|
|
print("Packages installed successfully.")
|
||
|
|
|
||
|
|
def _create_users(self):
|
||
|
|
users = self.config.get(SECTION_USERS, {})
|
||
|
|
if not users:
|
||
|
|
print("No users to create.")
|
||
|
|
return
|
||
|
|
for username, user_config in users.items():
|
||
|
|
print(f"Creating user: {username}")
|
||
|
|
if self._exec_command(["useradd", "-m", "-s", "/bin/sh", username]) != 0:
|
||
|
|
raise RuntimeError(f"Failed to create user: {username}")
|
||
|
|
if KEY_PASSWORD in user_config:
|
||
|
|
print(f"Setting password for user: {username}")
|
||
|
|
proc = subprocess.run(
|
||
|
|
["chpasswd"],
|
||
|
|
input=f"{username}:{user_config[KEY_PASSWORD]}".encode(),
|
||
|
|
capture_output=True,
|
||
|
|
)
|
||
|
|
if proc.returncode != 0:
|
||
|
|
raise RuntimeError(
|
||
|
|
f"Failed to set password for user: {username}: {proc.stderr.decode()}"
|
||
|
|
)
|
||
|
|
if KEY_PUBKEYS in user_config:
|
||
|
|
print(f"Adding SSH key for user: {username}")
|
||
|
|
ssh_dir = os.path.join("/home", username, ".ssh")
|
||
|
|
os.makedirs(ssh_dir, mode=0o700, exist_ok=True)
|
||
|
|
authorized_keys_path = os.path.join(ssh_dir, "authorized_keys")
|
||
|
|
with open(authorized_keys_path, "w") as f:
|
||
|
|
for key in user_config[KEY_PUBKEYS]:
|
||
|
|
f.write(key + "\n")
|
||
|
|
os.chmod(authorized_keys_path, 0o600)
|
||
|
|
shutil.chown(ssh_dir, user=username, group=username)
|
||
|
|
shutil.chown(authorized_keys_path, user=username, group=username)
|
||
|
|
print(f"User {username} created successfully.")
|
||
|
|
|
||
|
|
def _create_groups(self):
|
||
|
|
groups = self.config.get(SECTION_GROUPS, {})
|
||
|
|
if not groups:
|
||
|
|
print("No groups to create.")
|
||
|
|
return
|
||
|
|
for groupname, group_config in groups.items():
|
||
|
|
if self._exec_command(["getent", "group", groupname], quiet=True) == 0:
|
||
|
|
print(f"Group '{groupname}' already exists, skipping creation.")
|
||
|
|
else:
|
||
|
|
print(f"Creating group: {groupname}")
|
||
|
|
if self._exec_command(["groupadd", groupname]) != 0:
|
||
|
|
raise RuntimeError(f"Failed to create group: {groupname}")
|
||
|
|
|
||
|
|
users = group_config.get(KEY_USERS_LIST, [])
|
||
|
|
for username in users:
|
||
|
|
print(f"Adding user {username} to group {groupname}")
|
||
|
|
if self._exec_command(["gpasswd", "-a", username, groupname]) != 0:
|
||
|
|
raise RuntimeError(
|
||
|
|
f"Failed to add user {username} to group {groupname}"
|
||
|
|
)
|
||
|
|
print(f"Group '{groupname}' configured successfully.")
|
||
|
|
|
||
|
|
def _create_configs(self):
|
||
|
|
configs = self.config.get(SECTION_CONFIGS, {})
|
||
|
|
if not configs:
|
||
|
|
print("No configs to create.")
|
||
|
|
return
|
||
|
|
for file_path, config in configs.items():
|
||
|
|
print(f"Creating config file: {file_path}")
|
||
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||
|
|
if KEY_BODY not in config:
|
||
|
|
raise RuntimeError(
|
||
|
|
f"Config file '{file_path}' is missing the mandatory '{KEY_BODY}' key."
|
||
|
|
)
|
||
|
|
with open(file_path, "w") as f:
|
||
|
|
f.write(config[KEY_BODY])
|
||
|
|
if KEY_PERMISSIONS in config:
|
||
|
|
os.chmod(file_path, int(str(config[KEY_PERMISSIONS]), 8))
|
||
|
|
if KEY_OWNER in config:
|
||
|
|
owner_val = config[KEY_OWNER]
|
||
|
|
if ":" not in str(owner_val):
|
||
|
|
raise RuntimeError(
|
||
|
|
f"Config file '{file_path}' has an invalid '{KEY_OWNER}' format. Expected 'user:group'."
|
||
|
|
)
|
||
|
|
user, group = str(owner_val).split(":", 1)
|
||
|
|
shutil.chown(file_path, user=user, group=group)
|
||
|
|
print(f"Config file {file_path} created successfully.")
|
||
|
|
|
||
|
|
def process(self) -> int:
|
||
|
|
steps = [
|
||
|
|
(self._parse_config, 1),
|
||
|
|
(self._install_packages, 2),
|
||
|
|
(self._create_users, 3),
|
||
|
|
(self._create_groups, 4),
|
||
|
|
(self._create_configs, 5),
|
||
|
|
]
|
||
|
|
|
||
|
|
for func, code in steps:
|
||
|
|
try:
|
||
|
|
func()
|
||
|
|
except Exception as e:
|
||
|
|
print(e, file=sys.stderr)
|
||
|
|
return code
|
||
|
|
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
configurator = SystemConfigurator(SYSCONFIG_PATH)
|
||
|
|
exit_code = configurator.process()
|
||
|
|
if exit_code != 0:
|
||
|
|
sys.exit(exit_code)
|
||
|
|
|
||
|
|
if len(sys.argv) > 1:
|
||
|
|
os.execvp(sys.argv[1], sys.argv[1:])
|