index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/installer.py | 286 |
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index c02d5717..6e4a6193 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -1,5 +1,4 @@ import time -from typing import Union import logging import os import shutil @@ -7,6 +6,8 @@ import shlex import pathlib import subprocess import glob +from types import ModuleType +from typing import Union, Dict, Any, List, Optional, Iterator, Mapping from .disk import get_partitions_in_use, Partition from .general import SysCommand, generate_password from .hardware import has_uefi, is_vm, cpu_vendor @@ -30,29 +31,29 @@ __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] class InstallationFile: - def __init__(self, installation, filename, owner, mode="w"): + def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"): self.installation = installation self.filename = filename self.owner = owner self.mode = mode self.fh = None - def __enter__(self): + def __enter__(self) -> 'InstallationFile': self.fh = open(self.filename, self.mode) return self - def __exit__(self, *args): + def __exit__(self, *args :str) -> None: self.fh.close() self.installation.chown(self.owner, self.filename) - def write(self, data: Union[str, bytes]): + def write(self, data: Union[str, bytes]) -> int: return self.fh.write(data) - def read(self, *args): + def read(self, *args) -> Union[str, bytes]: return self.fh.read(*args) - def poll(self, *args): - return self.fh.poll(*args) +# def poll(self, *args) -> bool: +# return self.fh.poll(*args) def accessibility_tools_in_use() -> bool: @@ -84,11 +85,12 @@ class Installer: """ - def __init__(self, target, *, base_packages=None, kernels=None): + def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None): if base_packages is None: base_packages = __packages__[:3] if kernels is None: kernels = ['linux'] + self.kernels = kernels self.target = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') @@ -119,18 +121,17 @@ class Installer: self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"] self.KERNEL_PARAMS = [] - def log(self, *args, level=logging.DEBUG, **kwargs): + def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str): """ installer.log() wraps output.log() mainly to set a default log-level for this install session. Any manual override can be done per log() call. """ log(*args, level=level, **kwargs) - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Installer': return self - def __exit__(self, *args, **kwargs): - # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. + def __exit__(self, *args :str, **kwargs :str) -> None: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: @@ -163,10 +164,10 @@ class Installer: return False @property - def partitions(self): + def partitions(self) -> List[Partition]: return get_partitions_in_use(self.target) - def sync_log_to_install_medium(self): + def sync_log_to_install_medium(self) -> bool: # Copy over the install log (if there is one) to the install medium if # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. if self.helper_flags.get('base-strapped', False) is True: @@ -180,90 +181,111 @@ class Installer: return True - def mount_ordered_layout(self, layouts: dict): - from .luks import luks2 - - mountpoints = {} - for blockdevice in layouts: - for partition in layouts[blockdevice]['partitions']: - if (subvolumes := partition.get('btrfs', {}).get('subvolumes', {})): - if partition.get('encrypted',False): - if partition.get('mountpoint',None): - ppath = partition['mountpoint'] - else: - ppath = partition['device_instance'].path - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop" - # Immediately unlock the encrypted device to format the inner volume - with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=False) as unlocked_device: - unlocked_device.mount(f"{self.target}/") - try: - manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes,unlocked_device) - except Exception as e: - # every exception unmounts the physical volume. Otherwise we let the system in an unstable state - unlocked_device.unmount() - raise e - unlocked_device.unmount() - # TODO generate key - else: - self.mount(partition['device_instance'],"/") - try: - manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes) - except Exception as e: - # every exception unmounts the physical volume. Otherwise we let the system in an unstable state - partition['device_instance'].unmount() - raise e - partition['device_instance'].unmount() - else: - mountpoints[partition['mountpoint']] = partition - for mountpoint in sorted([mnt_dest for mnt_dest in mountpoints.keys() if mnt_dest is not None]): - partition = mountpoints[mountpoint] - if partition.get('encrypted', False) and not partition.get('subvolume',None): - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" - if not (password := partition.get('!password', None)): - raise RequirementError(f"Missing mountpoint {mountpoint} encryption password in layout: {partition}") - - with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: - if partition.get('generate-encryption-key-file'): - if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): - cryptkey_dir.mkdir(parents=True) - - # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key - # if we name the device to "xyzloop". - encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" - with open(f"{self.target}{encryption_key_path}", "w") as keyfile: - keyfile.write(generate_password(length=512)) + def _create_keyfile(self,luks_handle , partition :dict, password :str): + """ roiutine to create keyfiles, so it can be moved elsewere + """ + if partition.get('generate-encryption-key-file'): + if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): + cryptkey_dir.mkdir(parents=True) + # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key + # if we name the device to "xyzloop". + if partition.get('mountpoint',None): + encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" + else: + encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key" + with open(f"{self.target}{encryption_key_path}", "w") as keyfile: + keyfile.write(generate_password(length=512)) - os.chmod(f"{self.target}{encryption_key_path}", 0o400) + os.chmod(f"{self.target}{encryption_key_path}", 0o400) - luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) - luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) + luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) + luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {unlocked_device}", level=logging.INFO) - unlocked_device.mount(f"{self.target}{mountpoint}") + def _has_root(self, partition :dict) -> bool: + """ + Determine if an encrypted partition contains root in it + """ + if partition.get("mountpoint") is None: + if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})): + for mountpoint in [sub_list[subvolume] if isinstance(sub_list[subvolume],str) else sub_list[subvolume].get("mountpoint") for subvolume in sub_list]: + if mountpoint == '/': + return True + return False + else: + return False + elif partition.get("mountpoint") == '/': + return True + else: + return False + def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: + from .luks import luks2 + # set the partitions as a list not part of a tree (which we don't need anymore (i think) + list_part = [] + list_luks_handles = [] + for blockdevice in layouts: + list_part.extend(layouts[blockdevice]['partitions']) + + # we manage the encrypted partititons + for partition in [entry for entry in list_part if entry.get('encrypted',False)]: + # open the luks device and all associate stuff + if not (password := partition.get('!password', None)): + raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}") + # i change a bit the naming conventions for the loop device + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) - if partition.get('options',[]): - mount_options = ','.join(partition['options']) - partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options) - else: - partition['device_instance'].mount(f"{self.target}{mountpoint}") + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" + # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used + with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: + if partition.get('generate-encryption-key-file',False) and not self._has_root(partition): + list_luks_handles.append([luks_handle,partition,password]) + # this way all the requesrs will be to the dm_crypt device and not to the physical partition + partition['device_instance'] = unlocked_device + + # we manage the btrfs partitions + for partition in [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]: + self.mount(partition['device_instance'],"/") + try: + new_mountpoints = manage_btrfs_subvolumes(self,partition) + except Exception as e: + # every exception unmounts the physical volume. Otherwise we let the system in an unstable state + partition['device_instance'].unmount() + raise e + partition['device_instance'].unmount() + if new_mountpoints: + list_part.extend(new_mountpoints) + + # we mount. We need to sort by mountpoint to get a good working order + for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']): + mountpoint = partition['mountpoint'] + log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) + if partition.get('filesystem',{}).get('mount_options',[]): + mount_options = ','.join(partition['filesystem']['mount_options']) + partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options) + else: + partition['device_instance'].mount(f"{self.target}{mountpoint}") time.sleep(1) try: get_mount_info(f"{self.target}{mountpoint}", traverse=False) except DiskError: raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).") - def mount(self, partition, mountpoint, create_mountpoint=True): + # once everything is mounted, we generate the key files in the correct place + for handle in list_luks_handles: + ppath = handle[1]['device_instance'].path + log(f"creating key-file for {ppath}",level=logging.INFO) + self._create_keyfile(handle[0],handle[1],handle[2]) + + def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None: if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'): os.makedirs(f'{self.target}{mountpoint}') partition.mount(f'{self.target}{mountpoint}') - def post_install_check(self, *args, **kwargs): + def post_install_check(self, *args :str, **kwargs :str) -> List[bool]: return [step for step, flag in self.helper_flags.items() if flag is False] - def pacstrap(self, *packages, **kwargs): + def pacstrap(self, *packages :str, **kwargs :str) -> bool: if type(packages[0]) in (list, tuple): packages = packages[0] @@ -284,7 +306,7 @@ class Installer: else: self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO) - def set_mirrors(self, mirrors): + def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): if result := plugin.on_mirrors(mirrors): @@ -292,7 +314,7 @@ class Installer: return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') - def genfstab(self, flags='-pU'): + def genfstab(self, flags :str = '-pU') -> bool: self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: @@ -307,11 +329,11 @@ class Installer: return True - def set_hostname(self, hostname: str, *args, **kwargs): + def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: with open(f'{self.target}/etc/hostname', 'w') as fh: fh.write(hostname + '\n') - def set_locale(self, locale, encoding='UTF-8', *args, **kwargs): + def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool: if not len(locale): return True @@ -322,7 +344,7 @@ class Installer: return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False - def set_timezone(self, zone, *args, **kwargs): + def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool: if not zone: return True if not len(zone): @@ -337,6 +359,7 @@ class Installer: (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') return True + else: self.log( f"Time zone {zone} does not exist, continuing with system default.", @@ -344,11 +367,13 @@ class Installer: fg='red' ) - def activate_ntp(self): + return False + + def activate_ntp(self) -> None: log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO) self.activate_time_syncronization() - def activate_time_syncronization(self): + def activate_time_syncronization(self) -> None: self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO) self.enable_service('systemd-timesyncd') @@ -361,11 +386,11 @@ class Installer: with Boot(self) as session: session.SysCommand(["timedatectl", "set-ntp", 'true']) - def enable_espeakup(self): + def enable_espeakup(self) -> None: self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO) self.enable_service('espeakup') - def enable_service(self, *services): + def enable_service(self, *services :str) -> None: for service in services: self.log(f'Enabling service {service}', level=logging.INFO) if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0: @@ -375,19 +400,27 @@ class Installer: if hasattr(plugin, 'on_service'): plugin.on_service(service) - def run_command(self, cmd, *args, **kwargs): + def run_command(self, cmd :str, *args :str, **kwargs :str) -> None: return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}') - def arch_chroot(self, cmd, run_as=None): + def arch_chroot(self, cmd :str, run_as :Optional[str] = None): if run_as: cmd = f"su - {run_as} -c {shlex.quote(cmd)}" return self.run_command(cmd) - def drop_to_shell(self): + def drop_to_shell(self) -> None: subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) - def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs): + def configure_nic(self, + nic :str, + dhcp :bool = True, + ip :Optional[str] = None, + gateway :Optional[str] = None, + dns :Optional[str] = None, + *args :str, + **kwargs :str + ) -> None: from .systemd import Networkd if dhcp: @@ -412,7 +445,7 @@ class Installer: with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf: netconf.write(str(conf)) - def copy_iso_network_config(self, enable_services=False): + def copy_iso_network_config(self, enable_services :bool = False) -> bool: # Copy (if any) iwd password and config files if os.path.isdir('/var/lib/iwd/'): if psk_files := glob.glob('/var/lib/iwd/*.psk'): @@ -427,7 +460,7 @@ class Installer: # This function will be called after minimal_installation() # as a hook for post-installs. This hook is only needed if # base is not installed yet. - def post_install_enable_iwd_service(*args, **kwargs): + def post_install_enable_iwd_service(*args :str, **kwargs :str): self.enable_service('iwd') self.post_base_install.append(post_install_enable_iwd_service) @@ -452,7 +485,7 @@ class Installer: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: - def post_install_enable_networkd_resolved(*args, **kwargs): + def post_install_enable_networkd_resolved(*args :str, **kwargs :str): self.enable_service('systemd-networkd', 'systemd-resolved') self.post_base_install.append(post_install_enable_networkd_resolved) @@ -462,7 +495,7 @@ class Installer: return True - def detect_encryption(self, partition): + def detect_encryption(self, partition :Partition) -> bool: part = Partition(partition.parent, None, autodetect_filesystem=True) if partition.encrypted: return partition @@ -471,7 +504,7 @@ class Installer: return False - def mkinitcpio(self, *flags): + def mkinitcpio(self, *flags :str) -> bool: for plugin in plugins.values(): if hasattr(plugin, 'on_mkinitcpio'): # Allow plugins to override the usage of mkinitcpio altogether. @@ -483,9 +516,10 @@ class Installer: mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") - SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') - def minimal_installation(self): + return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0 + + def minimal_installation(self) -> bool: # Add necessary packages if encrypting the drive # (encrypted partitions default to btrfs for now, so we need btrfs-progs) # TODO: Perhaps this should be living in the function which dictates @@ -562,7 +596,7 @@ class Installer: return True - def setup_swap(self, kind='zram'): + def setup_swap(self, kind :str = 'zram') -> bool: if kind == 'zram': self.log(f"Setting up swap on zram") self.pacstrap('zram-generator') @@ -578,7 +612,18 @@ class Installer: else: raise ValueError(f"Archinstall currently only supports setting up swap on zram") - def add_bootloader(self, bootloader='systemd-bootctl'): + def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool: + """ + Adds a bootloader to the installation instance. + Archinstall supports one of three types: + * systemd-bootctl + * grub + * efistub (beta) + + :param bootloader: Can be one of the three strings + 'systemd-bootctl', 'grub' or 'efistub' (beta) + """ + for plugin in plugins.values(): if hasattr(plugin, 'on_add_bootloader'): # Allow plugins to override the boot-loader handling. @@ -669,6 +714,7 @@ class Installer: base_path,bind_path = split_bind_name(str(root_partition.path)) if bind_path is not None: # and root_fs_type == 'btrfs': options_entry = f"rootflags=subvol={bind_path} " + options_entry + if real_device := self.detect_encryption(root_partition): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) @@ -757,10 +803,19 @@ class Installer: return True - def add_additional_packages(self, *packages): + def add_additional_packages(self, *packages :str) -> bool: return self.pacstrap(*packages) - def install_profile(self, profile): + def install_profile(self, profile :str) -> ModuleType: + """ + Installs a archinstall profile script (.py file). + This profile can be either local, remote or part of the library. + + :param profile: Can be a local path or a remote path (URL) + :return: Returns the imported script as a module, this way + you can access any remaining functions exposed by the profile. + :rtype: module + """ storage['installation_session'] = self if type(profile) == str: @@ -769,13 +824,13 @@ class Installer: self.log(f'Installing network profile {profile}', level=logging.INFO) return profile.install() - def enable_sudo(self, entity: str, group=False): + def enable_sudo(self, entity: str, group :bool = False) -> bool: self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) with open(f'{self.target}/etc/sudoers', 'a') as sudoers: sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') return True - def user_create(self, user: str, password=None, groups=None, sudo=False): + def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None: if groups is None: groups = [] @@ -789,7 +844,8 @@ class Installer: if not handled_by_plugin: self.log(f'Creating user {user}', level=logging.INFO) - SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') + if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0: + raise SystemError(f"Could not create user inside installation: {output}") for plugin in plugins.values(): if hasattr(plugin, 'on_user_created'): @@ -806,24 +862,24 @@ class Installer: if sudo and self.enable_sudo(user): self.helper_flags['user'] = True - def user_set_pw(self, user, password): + def user_set_pw(self, user :str, password :str) -> bool: self.log(f'Setting password for {user}', level=logging.INFO) if user == 'root': # This means the root account isn't locked/disabled with * in /etc/passwd self.helper_flags['user'] = True - SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"") + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"").exit_code == 0 - def user_set_shell(self, user, shell): + def user_set_shell(self, user :str, shell :str) -> bool: self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) - SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0 - def chown(self, owner, path, options=[]): - return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}") + def chown(self, owner :str, path :str, options :List[str] = []) -> bool: + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0 - def create_file(self, filename, owner=None): + def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: return InstallationFile(self, filename, owner) def set_keyboard_language(self, language: str) -> bool: |