index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/installer.py | 1881 |
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index f1c7b3db..8292a3be 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -1,323 +1,427 @@ -import time -import logging +import glob import os import re -import shutil import shlex -import pathlib +import shutil import subprocess -import glob -from types import ModuleType -from typing import Union, Dict, Any, List, Optional, Iterator, Mapping, TYPE_CHECKING -from .disk import get_partitions_in_use, Partition -from .general import SysCommand, generate_password -from .hardware import has_uefi, is_vm, cpu_vendor -from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout -from .disk.helpers import findmnt -from .mirrors import use_mirrors -from .models.disk_encryption import DiskEncryption -from .plugins import plugins -from .storage import storage -from .output import log -from .profiles import Profile -from .disk.partition import get_mount_fs_type +import time +from pathlib import Path +from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable + +from . import disk from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError +from .general import SysCommand +from .hardware import SysInfo +from .locale import LocaleConfiguration +from .locale import verify_keyboard_layout, verify_x11_keyboard_layout +from .luks import Luks2 +from .mirrors import MirrorConfiguration +from .models.bootloader import Bootloader +from .models.network_configuration import Nic from .models.users import User -from .models.subvolume import Subvolume -from .hsm import Fido2 +from .output import log, error, info, warn, debug +from . import pacman +from .pacman import Pacman +from .plugins import plugins +from .storage import storage if TYPE_CHECKING: _: Any - # Any package that the Installer() is responsible for (optional and the default ones) __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"] # Additional packages that are installed if the user is running the Live ISO with accessibility tools enabled __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] -from .pacman import run_pacman -from .models.network_configuration import NetworkConfiguration - - -class InstallationFile: - def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"): - self.installation = installation - self.filename = filename - self.owner = owner - self.mode = mode - self.fh = None - - def __enter__(self) -> 'InstallationFile': - self.fh = open(self.filename, self.mode) - return self - - def __exit__(self, *args :str) -> None: - self.fh.close() - self.installation.chown(self.owner, self.filename) - - def write(self, data: Union[str, bytes]) -> int: - return self.fh.write(data) - - def read(self, *args) -> Union[str, bytes]: - return self.fh.read(*args) - -# def poll(self, *args) -> bool: -# return self.fh.poll(*args) - def accessibility_tools_in_use() -> bool: return os.system('systemctl is-active --quiet espeakup.service') == 0 class Installer: - """ - `Installer()` is the wrapper for most basic installation steps. - It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. - - :param partition: Requires a partition as the first argument, this is - so that the installer can mount to `mountpoint` and strap packages there. - :type partition: class:`archinstall.Partition` - - :param boot_partition: There's two reasons for needing a boot partition argument, - The first being so that `mkinitcpio` can place the `vmlinuz` kernel at the right place - during the `pacstrap` or `linux` and the base packages for a minimal installation. - The second being when :py:func:`~archinstall.Installer.add_bootloader` is called, - A `boot_partition` must be known to the installer before this is called. - :type boot_partition: class:`archinstall.Partition` - - :param profile: A profile to install, this is optional and can be called later manually. - This just simplifies the process by not having to call :py:func:`~archinstall.Installer.install_profile` later on. - :type profile: str, optional - - :param hostname: The given /etc/hostname for the machine. - :type hostname: str, optional - - """ - - def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None): - if base_packages is None: - base_packages = __packages__[:3] - if kernels is None: - self.kernels = ['linux'] - else: - self.kernels = kernels - self.target = target + def __init__( + self, + target: Path, + disk_config: disk.DiskLayoutConfiguration, + disk_encryption: Optional[disk.DiskEncryption] = None, + base_packages: List[str] = [], + kernels: Optional[List[str]] = None + ): + """ + `Installer()` is the wrapper for most basic installation steps. + It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. + """ + self._base_packages = base_packages or __packages__[:3] + self.kernels = kernels or ['linux'] + self._disk_config = disk_config + + self._disk_encryption = disk_encryption or disk.DiskEncryption(disk.EncryptionType.NoEncryption) + self.target: Path = target + self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) + self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} - self.helper_flags = { - 'base': False, - 'bootloader': False - } - - self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages for kernel in self.kernels: - self.base_packages.append(kernel) + self._base_packages.append(kernel) # If using accessibility tools in the live environment, append those to the packages list if accessibility_tools_in_use(): - self.base_packages.extend(__accessibility_packages__) + self._base_packages.extend(__accessibility_packages__) - self.post_base_install = [] + self.post_base_install: List[Callable] = [] # TODO: Figure out which one of these two we'll use.. But currently we're mixing them.. storage['session'] = self storage['installation_session'] = self - self.MODULES = [] - self.BINARIES = [] - self.FILES = [] + self._modules: List[str] = [] + self._binaries: List[str] = [] + self._files: List[str] = [] + # systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override. - self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"] - self.KERNEL_PARAMS = [] - self.FSTAB_ENTRIES = [] + self._hooks: List[str] = [ + "base", "systemd", "autodetect", "microcode", "keyboard", + "sd-vconsole", "modconf", "block", "filesystems", "fsck" + ] + self._kernel_params: List[str] = [] + self._fstab_entries: List[str] = [] self._zram_enabled = False + self._disable_fstrim = False - self._disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption') - - def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str): - """ - installer.log() wraps output.log() mainly to set a default log-level for this install session. - Any manual override can be done per log() call. - """ - log(*args, level=level, **kwargs) + self.pacman = Pacman(self.target, storage['arguments'].get('silent', False)) - def __enter__(self, *args :str, **kwargs :str) -> 'Installer': + def __enter__(self) -> 'Installer': return self - def __exit__(self, *args :str, **kwargs :str) -> None: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - - if len(args) >= 2 and args[1]: - self.log(args[1], level=logging.ERROR, fg='red') + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + error(exc_val) self.sync_log_to_install_medium() # We avoid printing /mnt/<log path> because that might confuse people if they note it down # and then reboot, and a identical log file will be found in the ISO medium anyway. - print(_("[!] A log file has been created here: {}").format(os.path.join(storage['LOG_PATH'], storage['LOG_FILE']))) + print(_("[!] A log file has been created here: {}").format( + os.path.join(storage['LOG_PATH'], storage['LOG_FILE']))) print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")) - raise args[1] + raise exc_val if not (missing_steps := self.post_install_check()): - self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO) + log('Installation completed without any errors. You may now reboot.', fg='green') self.sync_log_to_install_medium() - return True else: - self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING) + warn('Some required steps were not successfully installed/configured before leaving the installer:') + for step in missing_steps: - self.log(f' - {step}', fg='red', level=logging.WARNING) + warn(f' - {step}') - self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING) - self.log("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING) + warn(f"Detailed error logs can be found at: {storage['LOG_PATH']}") + warn("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues") self.sync_log_to_install_medium() return False - @property - def partitions(self) -> List[Partition]: - return get_partitions_in_use(self.target).values() + def remove_mod(self, mod: str): + if mod in self._modules: + self._modules.remove(mod) - def sync_log_to_install_medium(self) -> bool: - # Copy over the install log (if there is one) to the install medium if - # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. - if self.helper_flags.get('base-strapped', False) is True: - if filename := storage.get('LOG_FILE', None): - absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) + def append_mod(self, mod: str): + if mod not in self._modules: + self._modules.append(mod) - if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"): - os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}") + def _verify_service_stop(self): + """ + Certain services might be running that affects the system during installation. + One such service is "reflector.service" which updates /etc/pacman.d/mirrorlist + We need to wait for it before we continue since we opted in to use a custom mirror/region. + """ - shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}") + if not storage['arguments'].get('skip_ntp', False): + info(_('Waiting for time sync (timedatectl show) to complete.')) - return True + _started_wait = time.time() + _notified = False + while True: + if not _notified and time.time() - _started_wait > 5: + _notified = True + warn( + _("Time synchronization not completing, while you wait - check the docs for workarounds: https://archinstall.readthedocs.io/")) - def _create_keyfile(self,luks_handle , partition :dict, password :str): - """ roiutine to create keyfiles, so it can be moved elsewhere - """ - if self._disk_encryption and self._disk_encryption.generate_encryption_file(partition): - if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): - cryptkey_dir.mkdir(parents=True) - # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key - # if we name the device to "xyzloop". - if partition.get('mountpoint',None): - encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" - else: - encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key" - with open(f"{self.target}{encryption_key_path}", "w") as keyfile: - keyfile.write(generate_password(length=512)) + time_val = SysCommand('timedatectl show --property=NTPSynchronized --value').decode() + if time_val and time_val.strip() == 'yes': + break + time.sleep(1) + else: + info( + _('Skipping waiting for automatic time sync (this can cause issues if time is out of sync during installation)')) + + info('Waiting for automatic mirror selection (reflector) to complete.') + while self._service_state('reflector') not in ('dead', 'failed', 'exited'): + time.sleep(1) + + # info('Waiting for pacman-init.service to complete.') + # while self._service_state('pacman-init') not in ('dead', 'failed', 'exited'): + # time.sleep(1) - os.chmod(f"{self.target}{encryption_key_path}", 0o400) + info(_('Waiting for Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.')) + # Wait for the timer to kick in + while self._service_started('archlinux-keyring-wkd-sync.timer') is None: + time.sleep(1) - luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) - luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) + # Wait for the service to enter a finished state + while self._service_state('archlinux-keyring-wkd-sync.service') not in ('dead', 'failed', 'exited'): + time.sleep(1) - def _has_root(self, partition :dict) -> bool: + def _verify_boot_part(self): """ - Determine if an encrypted partition contains root in it + Check that mounted /boot device has at minimum size for installation + The reason this check is here is to catch pre-mounted device configuration and potentially + configured one that has not gone through any previous checks (e.g. --silence mode) + + NOTE: this function should be run AFTER running the mount_ordered_layout function """ - if partition.get("mountpoint") is None: - if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})): - for mountpoint in [sub_list[subvolume].get("mountpoint") if isinstance(subvolume, dict) else subvolume.mountpoint for subvolume in sub_list]: - if mountpoint == '/': - return True - return False - else: - return False - elif partition.get("mountpoint") == '/': - return True - else: - return False + boot_mount = self.target / 'boot' + lsblk_info = disk.get_lsblk_by_mountpoint(boot_mount) + + if len(lsblk_info) > 0: + if lsblk_info[0].size < disk.Size(200, disk.Unit.MiB, disk.SectorSize.default()): + raise DiskError( + f'The boot partition mounted at {boot_mount} is not large enough to install a boot loader. ' + f'Please resize it to at least 200MiB and re-run the installation.' + ) - def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: - from .luks import luks2 - from .disk.btrfs import setup_subvolumes, mount_subvolume - - # set the partitions as a list not part of a tree (which we don't need anymore (i think) - list_part = [] - list_luks_handles = [] - for blockdevice in layouts: - list_part.extend(layouts[blockdevice]['partitions']) - - # TODO: Implement a proper mount-queue system that does not depend on return values. - mount_queue = {} - - # we manage the encrypted partititons - if self._disk_encryption: - for partition in self._disk_encryption.all_partitions: - # open the luks device and all associate stuff - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" - - # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used - with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device: - if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition): - list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password]) - # this way all the requesrs will be to the dm_crypt device and not to the physical partition - partition['device_instance'] = unlocked_device - - if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False: - if self._disk_encryption.hsm_device: - Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password) - - btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])] - - for partition in btrfs_subvolumes: - device_instance = partition['device_instance'] - mount_options = partition.get('filesystem', {}).get('mount_options', []) - self.mount(device_instance, "/", options=','.join(mount_options)) - setup_subvolumes(installation=self, partition_dict=partition) - device_instance.unmount() - - # We then handle any special cases, such as btrfs - for partition in btrfs_subvolumes: - subvolumes: List[Subvolume] = partition['btrfs']['subvolumes'] - for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint): - # We cache the mount call for later - mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume( - installation=self, - device=device, - subvolume=sub_vol - ) + def sanity_check(self): + # self._verify_boot_part() + self._verify_service_stop() + + def mount_ordered_layout(self): + debug('Mounting ordered layout') + + luks_handlers: Dict[Any, Luks2] = {} + + match self._disk_encryption.encryption_type: + case disk.EncryptionType.NoEncryption: + self._mount_lvm_layout() + case disk.EncryptionType.Luks: + luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions) + case disk.EncryptionType.LvmOnLuks: + luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions) + self._import_lvm() + self._mount_lvm_layout(luks_handlers) + case disk.EncryptionType.LuksOnLvm: + self._import_lvm() + luks_handlers = self._prepare_luks_lvm(self._disk_encryption.lvm_volumes) + self._mount_lvm_layout(luks_handlers) + + # mount all regular partitions + self._mount_partition_layout(luks_handlers) + + def _mount_partition_layout(self, luks_handlers: Dict[Any, Luks2]): + debug('Mounting partition layout') + + # do not mount any PVs part of the LVM configuration + pvs = [] + if self._disk_config.lvm_config: + pvs = self._disk_config.lvm_config.get_all_pvs() + + for mod in self._disk_config.device_modifications: + not_pv_part_mods = list(filter(lambda x: x not in pvs, mod.partitions)) + + # partitions have to mounted in the right order on btrfs the mountpoint will + # be empty as the actual subvolumes are getting mounted instead so we'll use + # '/' just for sorting + sorted_part_mods = sorted(not_pv_part_mods, key=lambda x: x.mountpoint or Path('/')) + + for part_mod in sorted_part_mods: + if luks_handler := luks_handlers.get(part_mod): + self._mount_luks_partition(part_mod, luks_handler) + else: + self._mount_partition(part_mod) - # We mount ordinary partitions, and we sort them by the mountpoint - for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']): - mountpoint = partition['mountpoint'] - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) + def _mount_lvm_layout(self, luks_handlers: Dict[Any, Luks2] = {}): + lvm_config = self._disk_config.lvm_config - if partition.get('filesystem',{}).get('mount_options',[]): - mount_options = ','.join(partition['filesystem']['mount_options']) - mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}", options=mount_options: instance.mount(target, options=options) - else: - mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target) + if not lvm_config: + debug('No lvm config defined to be mounted') + return - log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.DEBUG, fg="white") + debug('Mounting LVM layout') - # We mount everything by sorting on the mountpoint itself. - for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]): - frozen_func() + for vg in lvm_config.vol_groups: + sorted_vol = sorted(vg.volumes, key=lambda x: x.mountpoint or Path('/')) - time.sleep(1) + for vol in sorted_vol: + if luks_handler := luks_handlers.get(vol): + self._mount_luks_volume(vol, luks_handler) + else: + self._mount_lvm_vol(vol) + + def _prepare_luks_partitions( + self, + partitions: List[disk.PartitionModification] + ) -> Dict[disk.PartitionModification, Luks2]: + return { + part_mod: disk.device_handler.unlock_luks2_dev( + part_mod.dev_path, + part_mod.mapper_name, + self._disk_encryption.encryption_password + ) + for part_mod in partitions + if part_mod.mapper_name and part_mod.dev_path + } - try: - findmnt(pathlib.Path(f"{self.target}{mountpoint}"), traverse=False) - except DiskError: - raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).") + def _import_lvm(self): + lvm_config = self._disk_config.lvm_config + + if not lvm_config: + debug('No lvm config defined to be imported') + return + + for vg in lvm_config.vol_groups: + disk.device_handler.lvm_import_vg(vg) + + for vol in vg.volumes: + disk.device_handler.lvm_vol_change(vol, True) + + def _prepare_luks_lvm( + self, + lvm_volumes: List[disk.LvmVolume] + ) -> Dict[disk.LvmVolume, Luks2]: + return { + vol: disk.device_handler.unlock_luks2_dev( + vol.dev_path, + vol.mapper_name, + self._disk_encryption.encryption_password + ) + for vol in lvm_volumes + if vol.mapper_name and vol.dev_path + } + + def _mount_partition(self, part_mod: disk.PartitionModification): + # it would be none if it's btrfs as the subvolumes will have the mountpoints defined + if part_mod.mountpoint and part_mod.dev_path: + target = self.target / part_mod.relative_mountpoint + disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options) + + if part_mod.fs_type == disk.FilesystemType.Btrfs and part_mod.dev_path: + self._mount_btrfs_subvol( + part_mod.dev_path, + part_mod.btrfs_subvols, + part_mod.mount_options + ) + + def _mount_lvm_vol(self, volume: disk.LvmVolume): + if volume.fs_type != disk.FilesystemType.Btrfs: + if volume.mountpoint and volume.dev_path: + target = self.target / volume.relative_mountpoint + disk.device_handler.mount(volume.dev_path, target, options=volume.mount_options) + + if volume.fs_type == disk.FilesystemType.Btrfs and volume.dev_path: + self._mount_btrfs_subvol(volume.dev_path, volume.btrfs_subvols, volume.mount_options) + + def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2): + if part_mod.fs_type != disk.FilesystemType.Btrfs: + if part_mod.mountpoint and luks_handler.mapper_dev: + target = self.target / part_mod.relative_mountpoint + disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) + + if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: + self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options) + + def _mount_luks_volume(self, volume: disk.LvmVolume, luks_handler: Luks2): + if volume.fs_type != disk.FilesystemType.Btrfs: + if volume.mountpoint and luks_handler.mapper_dev: + target = self.target / volume.relative_mountpoint + disk.device_handler.mount(luks_handler.mapper_dev, target, options=volume.mount_options) + + if volume.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: + self._mount_btrfs_subvol(luks_handler.mapper_dev, volume.btrfs_subvols, volume.mount_options) + + def _mount_btrfs_subvol( + self, + dev_path: Path, + subvolumes: List[disk.SubvolumeModification], + mount_options: List[str] = [] + ): + for subvol in subvolumes: + mountpoint = self.target / subvol.relative_mountpoint + mount_options = mount_options + [f'subvol={subvol.name}'] + disk.device_handler.mount(dev_path, mountpoint, options=mount_options) + + def generate_key_files(self): + match self._disk_encryption.encryption_type: + case disk.EncryptionType.Luks: + self._generate_key_files_partitions() + case disk.EncryptionType.LuksOnLvm: + self._generate_key_file_lvm_volumes() + case disk.EncryptionType.LvmOnLuks: + # currently LvmOnLuks only supports a single + # partitioning layout (boot + partition) + # so we won't need any keyfile generation atm + pass + + def _generate_key_files_partitions(self): + for part_mod in self._disk_encryption.partitions: + gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod) + + luks_handler = Luks2( + part_mod.safe_dev_path, + mapper_name=part_mod.mapper_name, + password=self._disk_encryption.encryption_password + ) + + if gen_enc_file and not part_mod.is_root(): + debug(f'Creating key-file: {part_mod.dev_path}') + luks_handler.create_keyfile(self.target) + + if part_mod.is_root() and not gen_enc_file: + if self._disk_encryption.hsm_device: + disk.Fido2.fido2_enroll( + self._disk_encryption.hsm_device, + part_mod.safe_dev_path, + self._disk_encryption.encryption_password + ) + + def _generate_key_file_lvm_volumes(self): + for vol in self._disk_encryption.lvm_volumes: + gen_enc_file = self._disk_encryption.should_generate_encryption_file(vol) + + luks_handler = Luks2( + vol.safe_dev_path, + mapper_name=vol.mapper_name, + password=self._disk_encryption.encryption_password + ) + + if gen_enc_file and not vol.is_root(): + info(f'Creating key-file: {vol.dev_path}') + luks_handler.create_keyfile(self.target) - # once everything is mounted, we generate the key files in the correct place - for handle in list_luks_handles: - ppath = handle[1]['device_instance'].path - log(f"creating key-file for {ppath}",level=logging.INFO) - self._create_keyfile(handle[0],handle[1],handle[2]) + if vol.is_root() and not gen_enc_file: + if self._disk_encryption.hsm_device: + disk.Fido2.fido2_enroll( + self._disk_encryption.hsm_device, + vol.safe_dev_path, + self._disk_encryption.encryption_password + ) + + def sync_log_to_install_medium(self) -> bool: + # Copy over the install log (if there is one) to the install medium if + # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. + if self.helper_flags.get('base-strapped', False) is True: + if filename := storage.get('LOG_FILE', None): + absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) + + if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"): + os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}") - def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True, options='') -> None: - if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'): - os.makedirs(f'{self.target}{mountpoint}') + shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}") - partition.mount(f'{self.target}{mountpoint}', options=options) + return True def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'): if file[:1] != '/': @@ -329,176 +433,137 @@ class Installer: SysCommand(f'chmod 0600 {self.target}{file}') SysCommand(f'mkswap {self.target}{file}') - self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0') + self._fstab_entries.append(f'{file} none swap defaults 0 0') if enable_resume: - resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip() - resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip() + resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode() + resume_offset = SysCommand( + f'/usr/bin/filefrag -v {self.target}{file}' + ).decode().split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip() - self.HOOKS.append('resume') - self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}') - self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}') + self._hooks.append('resume') + self._kernel_params.append(f'resume=UUID={resume_uuid}') + self._kernel_params.append(f'resume_offset={resume_offset}') - def post_install_check(self, *args :str, **kwargs :str) -> List[str]: + def post_install_check(self, *args: str, **kwargs: str) -> List[str]: return [step for step, flag in self.helper_flags.items() if flag is False] - def enable_multilib_repository(self): - # Set up a regular expression pattern of a commented line containing 'multilib' within [] - pattern = re.compile(r"^#\s*\[multilib\]$") - - # This is used to track if the previous line is a match, so we end up uncommenting the line after the block. - matched = False - - # Read in the lines from the original file - with open("/etc/pacman.conf", "r") as pacman_conf: - lines = pacman_conf.readlines() - - # Open the file again in write mode, to replace the contents - with open("/etc/pacman.conf", "w") as pacman_conf: - for line in lines: - if pattern.match(line): - # If this is the [] block containing 'multilib', uncomment it and set the matched tracking boolean. - pacman_conf.write(line.lstrip('#')) - matched = True - elif matched: - # The previous line was a match for [.*multilib.*]. - # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist' - pacman_conf.write(line.lstrip('#')) - matched = False # Reset the state of matched to False. - else: - pacman_conf.write(line) - - def enable_testing_repositories(self, enable_multilib_testing=False): - # Set up a regular expression pattern of a commented line containing 'testing' within [] - pattern = re.compile("^#\\[.*testing.*\\]$") - - # This is used to track if the previous line is a match, so we end up uncommenting the line after the block. - matched = False - - # Read in the lines from the original file - with open("/etc/pacman.conf", "r") as pacman_conf: - lines = pacman_conf.readlines() - - # Open the file again in write mode, to replace the contents - with open("/etc/pacman.conf", "w") as pacman_conf: - for line in lines: - if pattern.match(line) and (enable_multilib_testing or 'multilib' not in line): - # If this is the [] block containing 'testing', uncomment it and set the matched tracking boolean. - pacman_conf.write(line.lstrip('#')) - matched = True - elif matched: - # The previous line was a match for [.*testing.*]. - # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist' - pacman_conf.write(line.lstrip('#')) - matched = False # Reset the state of matched to False. - else: - pacman_conf.write(line) - - def pacstrap(self, *packages :str, **kwargs :str) -> bool: - if type(packages[0]) in (list, tuple): - packages = packages[0] - - for plugin in plugins.values(): - if hasattr(plugin, 'on_pacstrap'): - if (result := plugin.on_pacstrap(packages)): - packages = result + def set_mirrors(self, mirror_config: MirrorConfiguration, on_target: bool = False): + """ + Set the mirror configuration for the installation. - self.log(f'Installing packages: {packages}', level=logging.INFO) + :param mirror_config: The mirror configuration to use. + :type mirror_config: MirrorConfiguration - # TODO: We technically only need to run the -Syy once. - try: - run_pacman('-Syy', default_cmd='/usr/bin/pacman') - except SysCallError as error: - self.log(f'Could not sync a new package database: {error}', level=logging.ERROR, fg="red") + :on_target: Whether to set the mirrors on the target system or the live system. + :param on_target: bool + """ + debug('Setting mirrors') - if storage['arguments'].get('silent', False) is False: - if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self.pacstrap(*packages, **kwargs) + for plugin in plugins.values(): + if hasattr(plugin, 'on_mirrors'): + if result := plugin.on_mirrors(mirror_config): + mirror_config = result - raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red") + if on_target: + local_pacman_conf = Path(f'{self.target}/etc/pacman.conf') + local_mirrorlist_conf = Path(f'{self.target}/etc/pacman.d/mirrorlist') + else: + local_pacman_conf = Path('/etc/pacman.conf') + local_mirrorlist_conf = Path('/etc/pacman.d/mirrorlist') - try: - SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) - return True - except SysCallError as error: - self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red") + mirrorlist_config = mirror_config.mirrorlist_config() + pacman_config = mirror_config.pacman_config() - if storage['arguments'].get('silent', False) is False: - if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self.pacstrap(*packages, **kwargs) + if pacman_config: + debug(f'Pacman config: {pacman_config}') - raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") + with local_pacman_conf.open('a') as fp: + fp.write(pacman_config) - def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: - for plugin in plugins.values(): - if hasattr(plugin, 'on_mirrors'): - if result := plugin.on_mirrors(mirrors): - mirrors = result + if mirrorlist_config: + debug(f'Mirrorlist: {mirrorlist_config}') - return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') + with local_mirrorlist_conf.open('a') as fp: + fp.write(mirrorlist_config) - def genfstab(self, flags :str = '-pU') -> bool: - self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) + def genfstab(self, flags: str = '-pU'): + fstab_path = self.target / "etc" / "fstab" + info(f"Updating {fstab_path}") try: - fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}') - except SysCallError as error: - raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}') + gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').output() + except SysCallError as err: + raise RequirementError( + f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {err}') - with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: - fstab_fh.write(fstab.decode()) + with open(fstab_path, 'ab') as fp: + fp.write(gen_fstab) - if not os.path.isfile(f'{self.target}/etc/fstab'): - raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}') + if not fstab_path.is_file(): + raise RequirementError(f'Could not create fstab file') for plugin in plugins.values(): if hasattr(plugin, 'on_genfstab'): if plugin.on_genfstab(self) is True: break - with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: - for entry in self.FSTAB_ENTRIES: - fstab_fh.write(f'{entry}\n') + with open(fstab_path, 'a') as fp: + for entry in self._fstab_entries: + fp.write(f'{entry}\n') - return True - - def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: + def set_hostname(self, hostname: str): with open(f'{self.target}/etc/hostname', 'w') as fh: fh.write(hostname + '\n') - def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool: - if not len(locale): - return True - + def set_locale(self, locale_config: LocaleConfiguration) -> bool: modifier = '' + lang = locale_config.sys_lang + encoding = locale_config.sys_enc # This is a temporary patch to fix #1200 - if '.' in locale: - locale, potential_encoding = locale.split('.', 1) + if '.' in locale_config.sys_lang: + lang, potential_encoding = locale_config.sys_lang.split('.', 1) # Override encoding if encoding is set to the default parameter # and the "found" encoding differs. - if encoding == 'UTF-8' and encoding != potential_encoding: + if locale_config.sys_enc == 'UTF-8' and locale_config.sys_enc != potential_encoding: encoding = potential_encoding # Make sure we extract the modifier, that way we can put it in if needed. - if '@' in locale: - locale, modifier = locale.split('@', 1) + if '@' in locale_config.sys_lang: + lang, modifier = locale_config.sys_lang.split('@', 1) modifier = f"@{modifier}" # - End patch - with open(f'{self.target}/etc/locale.gen', 'a') as fh: - fh.write(f'{locale}.{encoding}{modifier} {encoding}\n') - with open(f'{self.target}/etc/locale.conf', 'w') as fh: - fh.write(f'LANG={locale}.{encoding}{modifier}\n') + locale_gen = self.target / 'etc/locale.gen' + locale_gen_lines = locale_gen.read_text().splitlines(True) + + # A locale entry in /etc/locale.gen may or may not contain the encoding + # in the first column of the entry; check for both cases. + entry_re = re.compile(rf'#{lang}(\.{encoding})?{modifier} {encoding}') + + for index, line in enumerate(locale_gen_lines): + if entry_re.match(line): + uncommented_line = line.removeprefix('#') + locale_gen_lines[index] = uncommented_line + locale_gen.write_text(''.join(locale_gen_lines)) + lang_value = uncommented_line.split()[0] + break + else: + error(f"Invalid locale: language '{locale_config.sys_lang}', encoding '{locale_config.sys_enc}'") + return False try: SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen') - return True - except SysCallError: + except SysCallError as e: + error(f'Failed to run locale-gen on target: {e}') return False - def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool: + (self.target / 'etc/locale.conf').write_text(f'LANG={lang_value}\n') + return True + + def set_timezone(self, zone: str) -> bool: if not zone: return True if not len(zone): @@ -509,62 +574,49 @@ class Installer: if result := plugin.on_timezone(zone): zone = result - if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists(): - (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) + if (Path("/usr") / "share" / "zoneinfo" / zone).exists(): + (Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') return True else: - self.log( - f"Time zone {zone} does not exist, continuing with system default.", - level=logging.WARNING, - fg='red' - ) + warn(f'Time zone {zone} does not exist, continuing with system default') return False - def activate_ntp(self) -> None: - log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO) - self.activate_time_syncronization() - - def activate_time_syncronization(self) -> None: - self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO) + def activate_time_synchronization(self) -> None: + info('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers') self.enable_service('systemd-timesyncd') - with open(f"{self.target}/etc/systemd/timesyncd.conf", "w") as fh: - fh.write("[Time]\n") - fh.write("NTP=0.arch.pool.ntp.org 1.arch.pool.ntp.org 2.arch.pool.ntp.org 3.arch.pool.ntp.org\n") - fh.write("FallbackNTP=0.pool.ntp.org 1.pool.ntp.org 0.fr.pool.ntp.org\n") - - from .systemd import Boot - with Boot(self) as session: - session.SysCommand(["timedatectl", "set-ntp", 'true']) - def enable_espeakup(self) -> None: - self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO) + info('Enabling espeakup.service for speech synthesis (accessibility)') self.enable_service('espeakup') def enable_periodic_trim(self) -> None: - self.log("Enabling periodic TRIM") + info("Enabling periodic TRIM") # fstrim is owned by util-linux, a dependency of both base and systemd. self.enable_service("fstrim.timer") - def enable_service(self, *services :str) -> None: + def enable_service(self, services: Union[str, List[str]]) -> None: + if isinstance(services, str): + services = [services] + for service in services: - self.log(f'Enabling service {service}', level=logging.INFO) + info(f'Enabling service {service}') + try: self.arch_chroot(f'systemctl enable {service}') - except SysCallError as error: - raise ServiceException(f"Unable to start service {service}: {error}") + except SysCallError as err: + raise ServiceException(f"Unable to start service {service}: {err}") for plugin in plugins.values(): if hasattr(plugin, 'on_service'): plugin.on_service(service) - def run_command(self, cmd :str, *args :str, **kwargs :str) -> None: + def run_command(self, cmd: str, *args: str, **kwargs: str) -> SysCommand: return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}') - def arch_chroot(self, cmd :str, run_as :Optional[str] = None): + def arch_chroot(self, cmd: str, run_as: Optional[str] = None) -> SysCommand: if run_as: cmd = f"su - {run_as} -c {shlex.quote(cmd)}" @@ -573,38 +625,23 @@ class Installer: def drop_to_shell(self) -> None: subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) - def configure_nic(self, network_config: NetworkConfiguration) -> None: - from .systemd import Networkd - - if network_config.dhcp: - conf = Networkd(Match={"Name": network_config.iface}, Network={"DHCP": "yes"}) - else: - network = {"Address": network_config.ip} - if network_config.gateway: - network["Gateway"] = network_config.gateway - if network_config.dns: - dns = network_config.dns - network["DNS"] = dns if isinstance(dns, list) else [dns] - - conf = Networkd(Match={"Name": network_config.iface}, Network=network) + def configure_nic(self, nic: Nic): + conf = nic.as_systemd_config() for plugin in plugins.values(): if hasattr(plugin, 'on_configure_nic'): - new_conf = plugin.on_configure_nic( - network_config.iface, - network_config.dhcp, - network_config.ip, - network_config.gateway, - network_config.dns - ) - - if new_conf: - conf = new_conf - - with open(f"{self.target}/etc/systemd/network/10-{network_config.iface}.network", "a") as netconf: + conf = plugin.on_configure_nic( + nic.iface, + nic.dhcp, + nic.ip, + nic.gateway, + nic.dns + ) or conf + + with open(f"{self.target}/etc/systemd/network/10-{nic.iface}.network", "a") as netconf: netconf.write(str(conf)) - def copy_iso_network_config(self, enable_services :bool = False) -> bool: + def copy_iso_network_config(self, enable_services: bool = False) -> bool: # Copy (if any) iwd password and config files if os.path.isdir('/var/lib/iwd/'): if psk_files := glob.glob('/var/lib/iwd/*.psk'): @@ -614,19 +651,19 @@ class Installer: if enable_services: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: - self.base_packages.append('iwd') + self._base_packages.append('iwd') # This function will be called after minimal_installation() # as a hook for post-installs. This hook is only needed if # base is not installed yet. - def post_install_enable_iwd_service(*args :str, **kwargs :str): + def post_install_enable_iwd_service(*args: str, **kwargs: str): self.enable_service('iwd') self.post_base_install.append(post_install_enable_iwd_service) # Otherwise, we can go ahead and add the required package # and enable it's service: else: - self.pacstrap('iwd') + self.pacman.strap('iwd') self.enable_service('iwd') for psk in psk_files: @@ -644,179 +681,208 @@ class Installer: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: - def post_install_enable_networkd_resolved(*args :str, **kwargs :str): - self.enable_service('systemd-networkd', 'systemd-resolved') + def post_install_enable_networkd_resolved(*args: str, **kwargs: str): + self.enable_service(['systemd-networkd', 'systemd-resolved']) self.post_base_install.append(post_install_enable_networkd_resolved) # Otherwise, we can go ahead and enable the services else: - self.enable_service('systemd-networkd', 'systemd-resolved') + self.enable_service(['systemd-networkd', 'systemd-resolved']) return True - def detect_encryption(self, partition :Partition) -> bool: - from .disk.mapperdev import MapperDev - from .disk.dmcryptdev import DMCryptDev - from .disk.helpers import get_filesystem_type - - if type(partition) is MapperDev: - # Returns MapperDev.partition - return partition.partition - elif type(partition) is DMCryptDev: - return partition.MapperDev.partition - elif get_filesystem_type(partition.path) == 'crypto_LUKS': - return partition - - return False - - def mkinitcpio(self, *flags :str) -> bool: + def mkinitcpio(self, flags: List[str]) -> bool: for plugin in plugins.values(): if hasattr(plugin, 'on_mkinitcpio'): # Allow plugins to override the usage of mkinitcpio altogether. if plugin.on_mkinitcpio(self): return True - # mkinitcpio will error out if there's no vconsole. - if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False: - with vconsole.open('w') as fh: - fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") - with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit: - mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n") - mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") - mkinit.write(f"FILES=({' '.join(self.FILES)})\n") + mkinit.write(f"MODULES=({' '.join(self._modules)})\n") + mkinit.write(f"BINARIES=({' '.join(self._binaries)})\n") + mkinit.write(f"FILES=({' '.join(self._files)})\n") - if self._disk_encryption and not self._disk_encryption.hsm_device: + if not self._disk_encryption.hsm_device: # For now, if we don't use HSM we revert to the old # way of setting up encryption hooks for mkinitcpio. # This is purely for stability reasons, we're going away from this. # * systemd -> udev # * sd-vconsole -> keymap - self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS] + self._hooks = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self._hooks] - mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") + mkinit.write(f"HOOKS=({' '.join(self._hooks)})\n") try: - SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') + SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}', peek_output=True) return True - except SysCallError: + except SysCallError as error: + if error.worker: + log(error.worker._trace_log.decode()) return False - def minimal_installation( - self, testing: bool = False, multilib: bool = False, - hostname: str = 'archinstall', locales: List[str] = ['en_US.UTF-8 UTF-8']) -> bool: - # Add necessary packages if encrypting the drive - # (encrypted partitions default to btrfs for now, so we need btrfs-progs) - # TODO: Perhaps this should be living in the function which dictates - # the partitioning. Leaving here for now. - - for partition in self.partitions: - if partition.filesystem == 'btrfs': - # if partition.encrypted: - if 'btrfs-progs' not in self.base_packages: - self.base_packages.append('btrfs-progs') - if partition.filesystem == 'xfs': - if 'xfs' not in self.base_packages: - self.base_packages.append('xfsprogs') - if partition.filesystem == 'f2fs': - if 'f2fs' not in self.base_packages: - self.base_packages.append('f2fs-tools') - - # Configure mkinitcpio to handle some specific use cases. - if partition.filesystem == 'btrfs': - if 'btrfs' not in self.MODULES: - self.MODULES.append('btrfs') - if '/usr/bin/btrfs' not in self.BINARIES: - self.BINARIES.append('/usr/bin/btrfs') - # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. - if partition.filesystem == 'ntfs3' and partition.mountpoint == self.target: - if 'fsck' in self.HOOKS: - self.HOOKS.remove('fsck') - - if self.detect_encryption(partition): - if self._disk_encryption and self._disk_encryption.hsm_device: - # Required bby mkinitcpio to add support for fido2-device options - self.pacstrap('libfido2') - - if 'sd-encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt') - else: - if 'encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') - - if not has_uefi(): - self.base_packages.append('grub') - - if not is_vm(): - vendor = cpu_vendor() - if vendor == "AuthenticAMD": - self.base_packages.append("amd-ucode") - if (ucode := pathlib.Path(f"{self.target}/boot/amd-ucode.img")).exists(): - ucode.unlink() - elif vendor == "GenuineIntel": - self.base_packages.append("intel-ucode") - if (ucode := pathlib.Path(f"{self.target}/boot/intel-ucode.img")).exists(): - ucode.unlink() + def _get_microcode(self) -> Optional[Path]: + if not SysInfo.is_vm(): + if vendor := SysInfo.cpu_vendor(): + return vendor.get_ucode() + return None + + def _handle_partition_installation(self): + pvs = [] + if self._disk_config.lvm_config: + pvs = self._disk_config.lvm_config.get_all_pvs() + + for mod in self._disk_config.device_modifications: + for part in mod.partitions: + if part in pvs or part.fs_type is None: + continue + + if (pkg := part.fs_type.installation_pkg) is not None: + self._base_packages.append(pkg) + if (module := part.fs_type.installation_module) is not None: + self._modules.append(module) + if (binary := part.fs_type.installation_binary) is not None: + self._binaries.append(binary) + + # https://github.com/archlinux/archinstall/issues/1837 + if part.fs_type.fs_type_mount == 'btrfs': + self._disable_fstrim = True + + # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. + if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: + if 'fsck' in self._hooks: + self._hooks.remove('fsck') + + if part in self._disk_encryption.partitions: + if self._disk_encryption.hsm_device: + # Required by mkinitcpio to add support for fido2-device options + self.pacman.strap('libfido2') + + if 'sd-encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') + else: + if 'encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') + + def _handle_lvm_installation(self): + if not self._disk_config.lvm_config: + return + + self.add_additional_packages('lvm2') + self._hooks.insert(self._hooks.index('filesystems') - 1, 'lvm2') + + for vg in self._disk_config.lvm_config.vol_groups: + for vol in vg.volumes: + if vol.fs_type is not None: + if (pkg := vol.fs_type.installation_pkg) is not None: + self._base_packages.append(pkg) + if (module := vol.fs_type.installation_module) is not None: + self._modules.append(module) + if (binary := vol.fs_type.installation_binary) is not None: + self._binaries.append(binary) + + if vol.fs_type.fs_type_mount == 'btrfs': + self._disable_fstrim = True + + # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. + if vol.fs_type.fs_type_mount == 'ntfs3' and vol.mountpoint == self.target: + if 'fsck' in self._hooks: + self._hooks.remove('fsck') + + if self._disk_encryption.encryption_type in [disk.EncryptionType.LvmOnLuks, disk.EncryptionType.LuksOnLvm]: + if self._disk_encryption.hsm_device: + # Required by mkinitcpio to add support for fido2-device options + self.pacman.strap('libfido2') + + if 'sd-encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('lvm2') - 1, 'sd-encrypt') else: - self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode.", level=logging.DEBUG) + if 'encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('lvm2') - 1, 'encrypt') + + def minimal_installation( + self, + testing: bool = False, + multilib: bool = False, + mkinitcpio: bool = True, + hostname: str = 'archinstall', + locale_config: LocaleConfiguration = LocaleConfiguration.default() + ): + if self._disk_config.lvm_config: + self._handle_lvm_installation() + else: + self._handle_partition_installation() + + if not SysInfo.has_uefi(): + self._base_packages.append('grub') + + if ucode := self._get_microcode(): + (self.target / 'boot' / ucode).unlink(missing_ok=True) + self._base_packages.append(ucode.stem) + else: + debug('Archinstall will not install any ucode.') # Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set. # This action takes place on the host system as pacstrap copies over package repository lists. + pacman_conf = pacman.Config(self.target) if multilib: - self.log("The multilib flag is set. This system will be installed with the multilib repository enabled.") - self.enable_multilib_repository() + info("The multilib flag is set. This system will be installed with the multilib repository enabled.") + pacman_conf.enable(pacman.Repo.Multilib) else: - self.log("The multilib flag is not set. This system will be installed without multilib repositories enabled.") + info("The multilib flag is not set. This system will be installed without multilib repositories enabled.") if testing: - self.log("The testing flag is set. This system will be installed with testing repositories enabled.") - self.enable_testing_repositories(multilib) + info("The testing flag is set. This system will be installed with testing repositories enabled.") + pacman_conf.enable(pacman.Repo.Testing) else: - self.log("The testing flag is not set. This system will be installed without testing repositories enabled.") + info("The testing flag is not set. This system will be installed without testing repositories enabled.") + + pacman_conf.apply() - self.pacstrap(self.base_packages) + self.pacman.strap(self._base_packages) self.helper_flags['base-strapped'] = True - # This handles making sure that the repositories we enabled persist on the installed system - if multilib or testing: - shutil.copy2("/etc/pacman.conf", f"{self.target}/etc/pacman.conf") + pacman_conf.persist() # Periodic TRIM may improve the performance and longevity of SSDs whilst # having no adverse effect on other devices. Most distributions enable # periodic TRIM by default. # # https://github.com/archlinux/archinstall/issues/880 - self.enable_periodic_trim() + # https://github.com/archlinux/archinstall/issues/1837 + # https://github.com/archlinux/archinstall/issues/1841 + if not self._disable_fstrim: + self.enable_periodic_trim() # TODO: Support locale and timezone # os.remove(f'{self.target}/etc/localtime') # sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime') # sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime') self.set_hostname(hostname) - self.set_locale(*locales[0].split()) + self.set_locale(locale_config) + self.set_keyboard_language(locale_config.kb_layout) # TODO: Use python functions for this SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') - self.mkinitcpio('-P') + if mkinitcpio and not self.mkinitcpio(['-P']): + error('Error generating initramfs (continuing anyway)') self.helper_flags['base'] = True # Run registered post-install hooks for function in self.post_base_install: - self.log(f"Running post-installation hook: {function}", level=logging.INFO) + info(f"Running post-installation hook: {function}") function(self) for plugin in plugins.values(): if hasattr(plugin, 'on_install'): plugin.on_install(self) - return True - - def setup_swap(self, kind :str = 'zram') -> bool: + def setup_swap(self, kind: str = 'zram'): if kind == 'zram': - self.log(f"Setting up swap on zram") - self.pacstrap('zram-generator') + info(f"Setting up swap on zram") + self.pacman.strap('zram-generator') # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' @@ -827,224 +893,532 @@ class Installer: self.enable_service('systemd-zram-setup@zram0.service') self._zram_enabled = True - - return True else: raise ValueError(f"Archinstall currently only supports setting up swap on zram") - def add_systemd_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: - self.pacstrap('efibootmgr') + def _get_efi_partition(self) -> Optional[disk.PartitionModification]: + for layout in self._disk_config.device_modifications: + if partition := layout.get_efi_partition(): + return partition + return None + + def _get_boot_partition(self) -> Optional[disk.PartitionModification]: + for layout in self._disk_config.device_modifications: + if boot := layout.get_boot_partition(): + return boot + return None + + def _get_root(self) -> Optional[disk.PartitionModification | disk.LvmVolume]: + if self._disk_config.lvm_config: + return self._disk_config.lvm_config.get_root_volume() + else: + for mod in self._disk_config.device_modifications: + if root := mod.get_root_partition(): + return root + return None + + def _get_luks_uuid_from_mapper_dev(self, mapper_dev_path: Path) -> str: + lsblk_info = disk.get_lsblk_info(mapper_dev_path, reverse=True, full_dev_path=True) + + if not lsblk_info.children or not lsblk_info.children[0].uuid: + raise ValueError('Unable to determine UUID of luks superblock') + + return lsblk_info.children[0].uuid + + def _get_kernel_params_partition( + self, + root_partition: disk.PartitionModification, + id_root: bool = True, + partuuid: bool = True + ) -> List[str]: + kernel_parameters = [] + + if root_partition in self._disk_encryption.partitions: + # TODO: We need to detect if the encrypted device is a whole disk encryption, + # or simply a partition encryption. Right now we assume it's a partition (and we always have) + + if self._disk_encryption and self._disk_encryption.hsm_device: + debug(f'Root partition is an encrypted device, identifying by UUID: {root_partition.uuid}') + # Note: UUID must be used, not PARTUUID for sd-encrypt to work + kernel_parameters.append(f'rd.luks.name={root_partition.uuid}=root') + # Note: tpm2-device and fido2-device don't play along very well: + # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 + kernel_parameters.append('rd.luks.options=fido2-device=auto,password-echo=no') + elif partuuid: + debug(f'Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}') + kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:root') + else: + debug(f'Root partition is an encrypted device, identifying by UUID: {root_partition.uuid}') + kernel_parameters.append(f'cryptdevice=UUID={root_partition.uuid}:root') + + if id_root: + kernel_parameters.append('root=/dev/mapper/root') + elif id_root: + if partuuid: + debug(f'Identifying root partition by PARTUUID: {root_partition.partuuid}') + kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid}') + else: + debug(f'Identifying root partition by UUID: {root_partition.uuid}') + kernel_parameters.append(f'root=UUID={root_partition.uuid}') + + return kernel_parameters + + def _get_kernel_params_lvm( + self, + lvm: disk.LvmVolume + ) -> List[str]: + kernel_parameters = [] + + match self._disk_encryption.encryption_type: + case disk.EncryptionType.LvmOnLuks: + if not lvm.vg_name: + raise ValueError(f'Unable to determine VG name for {lvm.name}') + + pv_seg_info = disk.device_handler.lvm_pvseg_info(lvm.vg_name, lvm.name) - if not has_uefi(): + if not pv_seg_info: + raise ValueError(f'Unable to determine PV segment info for {lvm.vg_name}/{lvm.name}') + + uuid = self._get_luks_uuid_from_mapper_dev(pv_seg_info.pv_name) + + if self._disk_encryption.hsm_device: + debug(f'LvmOnLuks, encrypted root partition, HSM, identifying by UUID: {uuid}') + kernel_parameters.append(f'rd.luks.name={uuid}=cryptlvm root={lvm.safe_dev_path}') + else: + debug(f'LvmOnLuks, encrypted root partition, identifying by UUID: {uuid}') + kernel_parameters.append(f'cryptdevice=UUID={uuid}:cryptlvm root={lvm.safe_dev_path}') + case disk.EncryptionType.LuksOnLvm: + uuid = self._get_luks_uuid_from_mapper_dev(lvm.mapper_path) + + if self._disk_encryption.hsm_device: + debug(f'LuksOnLvm, encrypted root partition, HSM, identifying by UUID: {uuid}') + kernel_parameters.append(f'rd.luks.name={uuid}=root root=/dev/mapper/root') + else: + debug(f'LuksOnLvm, encrypted root partition, identifying by UUID: {uuid}') + kernel_parameters.append(f'cryptdevice=UUID={uuid}:root root=/dev/mapper/root') + case disk.EncryptionType.NoEncryption: + debug(f'Identifying root lvm by mapper device: {lvm.dev_path}') + kernel_parameters.append(f'root={lvm.safe_dev_path}') + + return kernel_parameters + + def _get_kernel_params( + self, + root: disk.PartitionModification | disk.LvmVolume, + id_root: bool = True, + partuuid: bool = True + ) -> List[str]: + kernel_parameters = [] + + if isinstance(root, disk.LvmVolume): + kernel_parameters = self._get_kernel_params_lvm(root) + else: + kernel_parameters = self._get_kernel_params_partition(root, id_root, partuuid) + + # Zswap should be disabled when using zram. + # https://github.com/archlinux/archinstall/issues/881 + if self._zram_enabled: + kernel_parameters.append('zswap.enabled=0') + + if id_root: + for sub_vol in root.btrfs_subvols: + if sub_vol.is_root(): + kernel_parameters.append(f'rootflags=subvol={sub_vol.name}') + break + + kernel_parameters.append('rw') + + kernel_parameters.append(f'rootfstype={root.safe_fs_type.fs_type_mount}') + kernel_parameters.extend(self._kernel_params) + + debug(f'kernel parameters: {" ".join(kernel_parameters)}') + + return kernel_parameters + + def _add_systemd_bootloader( + self, + boot_partition: disk.PartitionModification, + root: disk.PartitionModification | disk.LvmVolume, + efi_partition: Optional[disk.PartitionModification], + uki_enabled: bool = False + ): + debug('Installing systemd bootloader') + + self.pacman.strap('efibootmgr') + + if not SysInfo.has_uefi(): raise HardwareIncompatibilityError + # TODO: Ideally we would want to check if another config # points towards the same disk and/or partition. # And in which case we should do some clean up. + bootctl_options = [] + + if efi_partition and boot_partition != efi_partition: + bootctl_options.append(f'--esp-path={efi_partition.mountpoint}') + bootctl_options.append(f'--boot-path={boot_partition.mountpoint}') # Install the boot loader try: - SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install') + SysCommand(f"/usr/bin/arch-chroot {self.target} bootctl {' '.join(bootctl_options)} install") except SysCallError: # Fallback, try creating the boot loader without touching the EFI variables - SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install') + SysCommand(f"/usr/bin/arch-chroot {self.target} bootctl --no-variables {' '.join(bootctl_options)} install") - # Ensure that the /boot/loader directory exists before we try to create files in it - if not os.path.exists(f'{self.target}/boot/loader'): - os.makedirs(f'{self.target}/boot/loader') + # Ensure that the $BOOT/loader/ directory exists before we try to create files in it. + # + # As mentioned in https://github.com/archlinux/archinstall/pull/1859 - we store the + # loader entries in $BOOT/loader/ rather than $ESP/loader/ + # The current reasoning being that $BOOT works in both use cases as well + # as being tied to the current installation. This may change. + loader_dir = self.target / 'boot/loader' + loader_dir.mkdir(parents=True, exist_ok=True) + + default_kernel = self.kernels[0] + if uki_enabled: + default_entry = f'arch-{default_kernel}.efi' + else: + entry_name = self.init_time + '_{kernel}{variant}.conf' + default_entry = entry_name.format(kernel=default_kernel, variant='') + + default = f'default {default_entry}' # Modify or create a loader.conf - if os.path.isfile(f'{self.target}/boot/loader/loader.conf'): - with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader: - loader_data = loader.read().split('\n') - else: + loader_conf = loader_dir / 'loader.conf' + + try: + loader_data = loader_conf.read_text().splitlines() + except FileNotFoundError: loader_data = [ - f"default {self.init_time}", - "timeout 15" + default, + 'timeout 15' ] - - with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader: - for line in loader_data: - if line[:8] == 'default ': - loader.write(f'default {self.init_time}_{self.kernels[0]}\n') - elif line[:8] == '#timeout' and 'timeout 15' not in loader_data: + else: + for index, line in enumerate(loader_data): + if line.startswith('default'): + loader_data[index] = default + elif line.startswith('#timeout'): # We add in the default timeout to support dual-boot - loader.write(f"{line[1:]}\n") - else: - loader.write(f"{line}\n") + loader_data[index] = line.removeprefix('#') + + loader_conf.write_text('\n'.join(loader_data) + '\n') - # Ensure that the /boot/loader/entries directory exists before we try to create files in it - if not os.path.exists(f'{self.target}/boot/loader/entries'): - os.makedirs(f'{self.target}/boot/loader/entries') + if uki_enabled: + return + + # Ensure that the $BOOT/loader/entries/ directory exists before we try to create files in it + entries_dir = loader_dir / 'entries' + entries_dir.mkdir(parents=True, exist_ok=True) + + comments = ( + '# Created by: archinstall', + f'# Created on: {self.init_time}' + ) + + options = 'options ' + ' '.join(self._get_kernel_params(root)) for kernel in self.kernels: for variant in ("", "-fallback"): # Setup the loader entry - with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}{variant}.conf', 'w') as entry: - entry.write('# Created by: archinstall\n') - entry.write(f'# Created on: {self.init_time}\n') - entry.write(f'title Arch Linux ({kernel}{variant})\n') - entry.write(f"linux /vmlinuz-{kernel}\n") - if not is_vm(): - vendor = cpu_vendor() - if vendor == "AuthenticAMD": - entry.write("initrd /amd-ucode.img\n") - elif vendor == "GenuineIntel": - entry.write("initrd /intel-ucode.img\n") - else: - self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG) - entry.write(f"initrd /initramfs-{kernel}{variant}.img\n") - # blkid doesn't trigger on loopback devices really well, - # so we'll use the old manual method until we get that sorted out. - root_fs_type = get_mount_fs_type(root_partition.filesystem) - - if root_fs_type is not None: - options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n' - else: - options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n' - - for subvolume in root_partition.subvolumes: - if subvolume.root is True and subvolume.name != '<FS_TREE>': - options_entry = f"rootflags=subvol={subvolume.name} " + options_entry - - # Zswap should be disabled when using zram. - # - # https://github.com/archlinux/archinstall/issues/881 - if self._zram_enabled: - options_entry = "zswap.enabled=0 " + options_entry - - if real_device := self.detect_encryption(root_partition): - # TODO: We need to detect if the encrypted device is a whole disk encryption, - # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG) - - kernel_options = f"options" - - if self._disk_encryption.hsm_device: - # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work - kernel_options += f" rd.luks.name={real_device.uuid}=luksdev" - # Note: tpm2-device and fido2-device don't play along very well: - # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 - kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no" - else: - kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev" - - entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}') - - if self._disk_encryption and self._disk_encryption.hsm_device: - # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work - kernel_options += f" rd.luks.name={real_device.uuid}=luksdev" - # Note: tpm2-device and fido2-device don't play along very well: - # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 - kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no" - else: - log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) - entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}') + entry = [ + *comments, + f'title Arch Linux ({kernel}{variant})', + f'linux /vmlinuz-{kernel}', + f'initrd /initramfs-{kernel}{variant}.img', + options, + ] - self.helper_flags['bootloader'] = "systemd" + name = entry_name.format(kernel=kernel, variant=variant) + entry_conf = entries_dir / name + entry_conf.write_text('\n'.join(entry) + '\n') - return True + self.helper_flags['bootloader'] = 'systemd' - def add_grub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: - self.pacstrap('grub') # no need? + def _add_grub_bootloader( + self, + boot_partition: disk.PartitionModification, + root: disk.PartitionModification | disk.LvmVolume, + efi_partition: Optional[disk.PartitionModification] + ): + debug('Installing grub bootloader') - root_fs_type = get_mount_fs_type(root_partition.filesystem) + self.pacman.strap('grub') # no need? - if real_device := self.detect_encryption(root_partition): - root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip() - _file = "/etc/default/grub" - add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'" - enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'" + grub_default = self.target / 'etc/default/grub' + config = grub_default.read_text() - log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO) - SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") - SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}") - else: - _file = "/etc/default/grub" - add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'" - SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") + kernel_parameters = ' '.join(self._get_kernel_params(root, False, False)) + config = re.sub(r'(GRUB_CMDLINE_LINUX=")("\n)', rf'\1{kernel_parameters}\2', config, 1) + + grub_default.write_text(config) + + info(f"GRUB boot partition: {boot_partition.dev_path}") + + boot_dir = Path('/boot') + + command = [ + '/usr/bin/arch-chroot', + str(self.target), + 'grub-install', + '--debug' + ] + + if SysInfo.has_uefi(): + if not efi_partition: + raise ValueError('Could not detect efi partition') + + info(f"GRUB EFI partition: {efi_partition.dev_path}") + + self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + + boot_dir_arg = [] + if boot_partition.mountpoint and boot_partition.mountpoint != boot_dir: + boot_dir_arg.append(f'--boot-directory={boot_partition.mountpoint}') + boot_dir = boot_partition.mountpoint + + add_options = [ + '--target=x86_64-efi', + f'--efi-directory={efi_partition.mountpoint}', + *boot_dir_arg, + '--bootloader-id=GRUB', + '--removable' + ] + + command.extend(add_options) - log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO) - if has_uefi(): - self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) + SysCommand(command, peek_output=True) except SysCallError: try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) - except SysCallError as error: - raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}") + SysCommand(command, peek_output=True) + except SysCallError as err: + raise DiskError(f"Could not install GRUB to {self.target}{efi_partition.mountpoint}: {err}") else: + info(f"GRUB boot partition: {boot_partition.dev_path}") + + parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path) + + add_options = [ + '--target=i386-pc', + '--recheck', + str(parent_dev_path) + ] + try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True) - except SysCallError as error: - raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}") + SysCommand(command + add_options, peek_output=True) + except SysCallError as err: + raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {err}") try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg') - except SysCallError as error: - raise DiskError(f"Could not configure GRUB: {error}") + SysCommand( + f'/usr/bin/arch-chroot {self.target} ' + f'grub-mkconfig -o {boot_dir}/grub/grub.cfg' + ) + except SysCallError as err: + raise DiskError(f"Could not configure GRUB: {err}") self.helper_flags['bootloader'] = "grub" - return True + def _add_limine_bootloader( + self, + boot_partition: disk.PartitionModification, + efi_partition: Optional[disk.PartitionModification], + root: disk.PartitionModification | disk.LvmVolume + ): + debug('Installing limine bootloader') + + self.pacman.strap('limine') + + info(f"Limine boot partition: {boot_partition.dev_path}") + + limine_path = self.target / 'usr' / 'share' / 'limine' + hook_command = None + + if SysInfo.has_uefi(): + if not efi_partition: + raise ValueError('Could not detect efi partition') + elif not efi_partition.mountpoint: + raise ValueError('EFI partition is not mounted') + + info(f"Limine EFI partition: {efi_partition.dev_path}") + + try: + efi_dir_path = self.target / efi_partition.mountpoint.relative_to('/') / 'EFI' / 'BOOT' + efi_dir_path.mkdir(parents=True, exist_ok=True) + + for file in ('BOOTIA32.EFI', 'BOOTX64.EFI'): + shutil.copy(limine_path / file, efi_dir_path) + except Exception as err: + raise DiskError(f'Failed to install Limine in {self.target}{efi_partition.mountpoint}: {err}') + + hook_command = f'/usr/bin/cp /usr/share/limine/BOOTIA32.EFI {efi_partition.mountpoint}/EFI/BOOT/' \ + f' && /usr/bin/cp /usr/share/limine/BOOTX64.EFI {efi_partition.mountpoint}/EFI/BOOT/' + else: + parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path) + + if unique_path := disk.device_handler.get_unique_path_for_device(parent_dev_path): + parent_dev_path = unique_path + + try: + # The `limine-bios.sys` file contains stage 3 code. + shutil.copy(limine_path / 'limine-bios.sys', self.target / 'boot') + + # `limine bios-install` deploys the stage 1 and 2 to the disk. + SysCommand(f'/usr/bin/arch-chroot {self.target} limine bios-install {parent_dev_path}', peek_output=True) + except Exception as err: + raise DiskError(f'Failed to install Limine on {parent_dev_path}: {err}') + + hook_command = f'/usr/bin/limine bios-install {parent_dev_path}' \ + f' && /usr/bin/cp /usr/share/limine/limine-bios.sys /boot/' + + hook_contents = f'''[Trigger] +Operation = Install +Operation = Upgrade +Type = Package +Target = limine + +[Action] +Description = Deploying Limine after upgrade... +When = PostTransaction +Exec = /bin/sh -c "{hook_command}" +''' + + hooks_dir = self.target / 'etc' / 'pacman.d' / 'hooks' + hooks_dir.mkdir(parents=True, exist_ok=True) + + hook_path = hooks_dir / '99-limine.hook' + hook_path.write_text(hook_contents) + + kernel_params = ' '.join(self._get_kernel_params(root)) + config_contents = 'TIMEOUT=5\n' + + for kernel in self.kernels: + for variant in ('', '-fallback'): + entry = [ + f'PROTOCOL=linux', + f'KERNEL_PATH=boot:///vmlinuz-{kernel}', + f'MODULE_PATH=boot:///initramfs-{kernel}{variant}.img', + f'CMDLINE={kernel_params}', + ] + + config_contents += f'\n:Arch Linux ({kernel}{variant})\n' + config_contents += '\n'.join([f' {it}' for it in entry]) + '\n' - def add_efistub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: - self.pacstrap('efibootmgr') + config_path = self.target / 'boot' / 'limine.cfg' + config_path.write_text(config_contents) - if not has_uefi(): + self.helper_flags['bootloader'] = "limine" + + def _add_efistub_bootloader( + self, + boot_partition: disk.PartitionModification, + root: disk.PartitionModification | disk.LvmVolume, + uki_enabled: bool = False + ): + debug('Installing efistub bootloader') + + self.pacman.strap('efibootmgr') + + if not SysInfo.has_uefi(): raise HardwareIncompatibilityError + # TODO: Ideally we would want to check if another config # points towards the same disk and/or partition. # And in which case we should do some clean up. - root_fs_type = get_mount_fs_type(root_partition.filesystem) + if not uki_enabled: + loader = '/vmlinuz-{kernel}' + + entries = ( + 'initrd=/initramfs-{kernel}.img', + *self._get_kernel_params(root) + ) + + cmdline = [' '.join(entries)] + else: + loader = '/EFI/Linux/arch-{kernel}.efi' + cmdline = [] + + parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path) + + cmd_template = ( + 'efibootmgr', + '--create', + '--disk', str(parent_dev_path), + '--part', str(boot_partition.partn), + '--label', 'Arch Linux ({kernel})', + '--loader', loader, + '--unicode', *cmdline, + '--verbose' + ) for kernel in self.kernels: # Setup the firmware entry + cmd = [arg.format(kernel=kernel) for arg in cmd_template] + SysCommand(cmd) + + self.helper_flags['bootloader'] = "efistub" - label = f'Arch Linux ({kernel})' - loader = f"/vmlinuz-{kernel}" + def _config_uki( + self, + root: disk.PartitionModification | disk.LvmVolume, + efi_partition: Optional[disk.PartitionModification] + ): + if not efi_partition or not efi_partition.mountpoint: + raise ValueError(f'Could not detect ESP at mountpoint {self.target}') - kernel_parameters = [] + # Set up kernel command line + with open(self.target / 'etc/kernel/cmdline', 'w') as cmdline: + kernel_parameters = self._get_kernel_params(root) + cmdline.write(' '.join(kernel_parameters) + '\n') - if not is_vm(): - vendor = cpu_vendor() - if vendor == "AuthenticAMD": - kernel_parameters.append("initrd=\\amd-ucode.img") - elif vendor == "GenuineIntel": - kernel_parameters.append("initrd=\\intel-ucode.img") - else: - self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.", level=logging.DEBUG) + diff_mountpoint = None - kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img") + if efi_partition.mountpoint != Path('/efi'): + diff_mountpoint = str(efi_partition.mountpoint) - # blkid doesn't trigger on loopback devices really well, - # so we'll use the old manual method until we get that sorted out. - if real_device := self.detect_encryption(root_partition): - # TODO: We need to detect if the encrypted device is a whole disk encryption, - # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.part_uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.part_uuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') - else: - log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'root=PARTUUID={root_partition.part_uuid} rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') + image_re = re.compile('(.+_image="/([^"]+).+\n)') + uki_re = re.compile('#((.+_uki=")/[^/]+(.+\n))') - SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') + # Modify .preset files + for kernel in self.kernels: + preset = self.target / 'etc/mkinitcpio.d' / (kernel + '.preset') + config = preset.read_text().splitlines(True) + + for index, line in enumerate(config): + # Avoid storing redundant image file + if m := image_re.match(line): + image = self.target / m.group(2) + image.unlink(missing_ok=True) + config[index] = '#' + m.group(1) + elif m := uki_re.match(line): + if diff_mountpoint: + config[index] = m.group(2) + diff_mountpoint + m.group(3) + else: + config[index] = m.group(1) + elif line.startswith('#default_options='): + config[index] = line.removeprefix('#') - self.helper_flags['bootloader'] = "efistub" + preset.write_text(''.join(config)) - return True + # Directory for the UKIs + uki_dir = self.target / efi_partition.relative_mountpoint / 'EFI/Linux' + uki_dir.mkdir(parents=True, exist_ok=True) - def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool: + # Build the UKIs + if not self.mkinitcpio(['-P']): + error('Error generating initramfs (continuing anyway)') + + def add_bootloader(self, bootloader: Bootloader, uki_enabled: bool = False): """ Adds a bootloader to the installation instance. Archinstall supports one of three types: * systemd-bootctl * grub + * limine (beta) * efistub (beta) - :param bootloader: Can be one of the three strings - 'systemd-bootctl', 'grub' or 'efistub' (beta) + :param bootloader: Type of bootloader to be added """ for plugin in plugins.values(): @@ -1054,61 +1428,41 @@ class Installer: if plugin.on_add_bootloader(self): return True - if type(self.target) == str: - self.target = pathlib.Path(self.target) + efi_partition = self._get_efi_partition() + boot_partition = self._get_boot_partition() + root = self._get_root() - boot_partition = None - root_partition = None - for partition in self.partitions: - if self.target / 'boot' in partition.mountpoints: - boot_partition = partition - elif self.target in partition.mountpoints: - root_partition = partition + if boot_partition is None: + raise ValueError(f'Could not detect boot at mountpoint {self.target}') - if boot_partition is None or root_partition is None: - raise ValueError(f"Could not detect root ({root_partition}) or boot ({boot_partition}) in {self.target} based on: {self.partitions}") + if root is None: + raise ValueError(f'Could not detect root at mountpoint {self.target}') - self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO) + info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}') - if bootloader == 'systemd-bootctl': - self.add_systemd_bootloader(boot_partition, root_partition) - elif bootloader == "grub-install": - self.add_grub_bootloader(boot_partition, root_partition) - elif bootloader == 'efistub': - self.add_efistub_bootloader(boot_partition, root_partition) - else: - raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}") + if uki_enabled: + self._config_uki(root, efi_partition) - return True - - def add_additional_packages(self, *packages :str) -> bool: - return self.pacstrap(*packages) + match bootloader: + case Bootloader.Systemd: + self._add_systemd_bootloader(boot_partition, root, efi_partition, uki_enabled) + case Bootloader.Grub: + self._add_grub_bootloader(boot_partition, root, efi_partition) + case Bootloader.Efistub: + self._add_efistub_bootloader(boot_partition, root, uki_enabled) + case Bootloader.Limine: + self._add_limine_bootloader(boot_partition, efi_partition, root) - def install_profile(self, profile :str) -> ModuleType: - """ - Installs a archinstall profile script (.py file). - This profile can be either local, remote or part of the library. + def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: + return self.pacman.strap(packages) - :param profile: Can be a local path or a remote path (URL) - :return: Returns the imported script as a module, this way - you can access any remaining functions exposed by the profile. - :rtype: module - """ - storage['installation_session'] = self - - if type(profile) == str: - profile = Profile(self, profile) - - self.log(f'Installing archinstall profile {profile}', level=logging.INFO) - return profile.install() - - def enable_sudo(self, entity: str, group :bool = False): - self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) + def enable_sudo(self, entity: str, group: bool = False): + info(f'Enabling sudo permissions for {entity}') sudoers_dir = f"{self.target}/etc/sudoers.d" # Creates directory if not exists - if not (sudoers_path := pathlib.Path(sudoers_dir)).exists(): + if not (sudoers_path := Path(sudoers_dir)).exists(): sudoers_path.mkdir(parents=True) # Guarantees sudoer confs directory recommended perms os.chmod(sudoers_dir, 0o440) @@ -1118,7 +1472,7 @@ class Installer: # We count how many files are there already so we know which number to prefix the file with num_of_rules_already = len(os.listdir(sudoers_dir)) - file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc + file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc # Guarantees that entity str does not contain invalid characters for a linux file name: # \ / : * ? " < > | @@ -1130,7 +1484,7 @@ class Installer: sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') # Guarantees sudoer conf file recommended perms - os.chmod(pathlib.Path(rule_file_name), 0o440) + os.chmod(Path(rule_file_name), 0o440) def create_users(self, users: Union[User, List[User]]): if not isinstance(users, list): @@ -1139,7 +1493,8 @@ class Installer: for user in users: self.user_create(user.username, user.password, user.groups, user.sudo) - def user_create(self, user :str, password :Optional[str] = None, groups :Optional[List[str]] = None, sudo :bool = False) -> None: + def user_create(self, user: str, password: Optional[str] = None, groups: Optional[List[str]] = None, + sudo: bool = False) -> None: if groups is None: groups = [] @@ -1152,11 +1507,11 @@ class Installer: handled_by_plugin = result if not handled_by_plugin: - self.log(f'Creating user {user}', level=logging.INFO) + info(f'Creating user {user}') try: SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') - except SysCallError as error: - raise SystemError(f"Could not create user inside installation: {error}") + except SysCallError as err: + raise SystemError(f"Could not create user inside installation: {err}") for plugin in plugins.values(): if hasattr(plugin, 'on_user_created'): @@ -1173,8 +1528,8 @@ class Installer: if sudo and self.enable_sudo(user): self.helper_flags['user'] = True - def user_set_pw(self, user :str, password :str) -> bool: - self.log(f'Setting password for {user}', level=logging.INFO) + def user_set_pw(self, user: str, password: str) -> bool: + info(f'Setting password for {user}') if user == 'root': # This means the root account isn't locked/disabled with * in /etc/passwd @@ -1190,8 +1545,8 @@ class Installer: except SysCallError: return False - def user_set_shell(self, user :str, shell :str) -> bool: - self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) + def user_set_shell(self, user: str, shell: str) -> bool: + info(f'Setting shell for {user} to {shell}') try: SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") @@ -1199,7 +1554,7 @@ class Installer: except SysCallError: return False - def chown(self, owner :str, path :str, options :List[str] = []) -> bool: + def chown(self, owner: str, path: str, options: List[str] = []) -> bool: cleaned_path = path.replace('\'', '\\\'') try: SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'") @@ -1207,53 +1562,75 @@ class Installer: except SysCallError: return False - def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: - return InstallationFile(self, filename, owner) - def set_keyboard_language(self, language: str) -> bool: - log(f"Setting keyboard language to {language}", level=logging.INFO) + info(f"Setting keyboard language to {language}") + if len(language.strip()): if not verify_keyboard_layout(language): - self.log(f"Invalid keyboard language specified: {language}", fg="red", level=logging.ERROR) + error(f"Invalid keyboard language specified: {language}") return False # In accordance with https://github.com/archlinux/archinstall/issues/107#issuecomment-841701968 # Setting an empty keymap first, allows the subsequent call to set layout for both console and x11. - from .systemd import Boot + from .boot import Boot with Boot(self) as session: os.system('/usr/bin/systemd-run --machine=archinstall --pty localectl set-keymap ""') try: session.SysCommand(["localectl", "set-keymap", language]) - except SysCallError as error: - raise ServiceException(f"Unable to set locale '{language}' for console: {error}") + except SysCallError as err: + raise ServiceException(f"Unable to set locale '{language}' for console: {err}") - self.log(f"Keyboard language for this installation is now set to: {language}") + info(f"Keyboard language for this installation is now set to: {language}") else: - self.log('Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO) + info('Keyboard language was not changed from default (no language specified)') return True def set_x11_keyboard_language(self, language: str) -> bool: - log(f"Setting x11 keyboard language to {language}", level=logging.INFO) """ A fallback function to set x11 layout specifically and separately from console layout. This isn't strictly necessary since .set_keyboard_language() does this as well. """ + info(f"Setting x11 keyboard language to {language}") + if len(language.strip()): if not verify_x11_keyboard_layout(language): - self.log(f"Invalid x11-keyboard language specified: {language}", fg="red", level=logging.ERROR) + error(f"Invalid x11-keyboard language specified: {language}") return False - from .systemd import Boot + from .boot import Boot with Boot(self) as session: session.SysCommand(["localectl", "set-x11-keymap", '""']) try: session.SysCommand(["localectl", "set-x11-keymap", language]) - except SysCallError as error: - raise ServiceException(f"Unable to set locale '{language}' for X11: {error}") + except SysCallError as err: + raise ServiceException(f"Unable to set locale '{language}' for X11: {err}") else: - self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO) + info(f'X11-Keyboard language was not changed from default (no language specified)') return True + + def _service_started(self, service_name: str) -> Optional[str]: + if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'): + service_name += '.service' # Just to be safe + + last_execution_time = SysCommand( + f"systemctl show --property=ActiveEnterTimestamp --no-pager {service_name}", + environment_vars={'SYSTEMD_COLORS': '0'} + ).decode().lstrip('ActiveEnterTimestamp=') + + if not last_execution_time: + return None + + return last_execution_time + + def _service_state(self, service_name: str) -> str: + if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'): + service_name += '.service' # Just to be safe + + return SysCommand( + f'systemctl show --no-pager -p SubState --value {service_name}', + environment_vars={'SYSTEMD_COLORS': '0'} + ).decode() |