import glob import os import re import shlex import shutil import subprocess import time from pathlib import Path from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable from . import disk from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError from .general import SysCommand from .hardware import SysInfo from .locale import LocaleConfiguration from .locale import verify_keyboard_layout, verify_x11_keyboard_layout from .luks import Luks2 from .mirrors import MirrorConfiguration from .models.bootloader import Bootloader from .models.network_configuration import Nic from .models.users import User from .output import log, error, info, warn, debug from . import pacman from .pacman import Pacman from .plugins import plugins from .storage import storage if TYPE_CHECKING: _: Any # Any package that the Installer() is responsible for (optional and the default ones) __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"] # Additional packages that are installed if the user is running the Live ISO with accessibility tools enabled __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] def accessibility_tools_in_use() -> bool: return os.system('systemctl is-active --quiet espeakup.service') == 0 class Installer: def __init__( self, target: Path, disk_config: disk.DiskLayoutConfiguration, disk_encryption: Optional[disk.DiskEncryption] = None, base_packages: List[str] = [], kernels: Optional[List[str]] = None ): """ `Installer()` is the wrapper for most basic installation steps. It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. """ self.base_packages = base_packages or __packages__[:3] self.kernels = kernels or ['linux'] self._disk_config = disk_config self._disk_encryption = disk_encryption or disk.DiskEncryption(disk.EncryptionType.NoEncryption) self.target: Path = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} for kernel in self.kernels: self.base_packages.append(kernel) # If using accessibility tools in the live environment, append those to the packages list if accessibility_tools_in_use(): self.base_packages.extend(__accessibility_packages__) self.post_base_install: List[Callable] = [] # TODO: Figure out which one of these two we'll use.. But currently we're mixing them.. storage['session'] = self storage['installation_session'] = self self._modules: List[str] = [] self._binaries: List[str] = [] self._files: List[str] = [] # systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override. self._hooks: List[str] = [ "base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck" ] self._kernel_params: List[str] = [] self._fstab_entries: List[str] = [] self._zram_enabled = False self.pacman = Pacman(self.target, storage['arguments'].get('silent', False)) def __enter__(self) -> 'Installer': return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: error(exc_val) self.sync_log_to_install_medium() # We avoid printing /mnt/ because that might confuse people if they note it down # and then reboot, and a identical log file will be found in the ISO medium anyway. print(_("[!] A log file has been created here: {}").format( os.path.join(storage['LOG_PATH'], storage['LOG_FILE']))) print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")) raise exc_val if not (missing_steps := self.post_install_check()): log('Installation completed without any errors. You may now reboot.', fg='green') self.sync_log_to_install_medium() return True else: warn('Some required steps were not successfully installed/configured before leaving the installer:') for step in missing_steps: warn(f' - {step}') warn(f"Detailed error logs can be found at: {storage['LOG_PATH']}") warn("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues") self.sync_log_to_install_medium() return False def remove_mod(self, mod: str): if mod in self._modules: self._modules.remove(mod) def append_mod(self, mod: str): if mod not in self._modules: self._modules.append(mod) def _verify_service_stop(self): """ Certain services might be running that affects the system during installation. One such service is "reflector.service" which updates /etc/pacman.d/mirrorlist We need to wait for it before we continue since we opted in to use a custom mirror/region. """ if not storage['arguments'].get('skip_ntp', False): info(_('Waiting for time sync (timedatectl show) to complete.')) _started_wait = time.time() _notified = False while True: if not _notified and time.time() - _started_wait > 5: _notified = True warn( _("Time synchronization not completing, while you wait - check the docs for workarounds: https://archinstall.readthedocs.io/")) time_val = SysCommand('timedatectl show --property=NTPSynchronized --value').decode() if time_val and time_val.strip() == 'yes': break time.sleep(1) else: info( _('Skipping waiting for automatic time sync (this can cause issues if time is out of sync during installation)')) info('Waiting for automatic mirror selection (reflector) to complete.') while self._service_state('reflector') not in ('dead', 'failed', 'exited'): time.sleep(1) # info('Waiting for pacman-init.service to complete.') # while self._service_state('pacman-init') not in ('dead', 'failed', 'exited'): # time.sleep(1) info(_('Waiting for Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.')) # Wait for the timer to kick in while self._service_started('archlinux-keyring-wkd-sync.timer') is None: time.sleep(1) # Wait for the service to enter a finished state while self._service_state('archlinux-keyring-wkd-sync.service') not in ('dead', 'failed', 'exited'): time.sleep(1) def _verify_boot_part(self): """ Check that mounted /boot device has at minimum size for installation The reason this check is here is to catch pre-mounted device configuration and potentially configured one that has not gone through any previous checks (e.g. --silence mode) NOTE: this function should be run AFTER running the mount_ordered_layout function """ boot_mount = self.target / 'boot' lsblk_info = disk.get_lsblk_by_mountpoint(boot_mount) if len(lsblk_info) > 0: if lsblk_info[0].size < disk.Size(200, disk.Unit.MiB, disk.SectorSize.default()): raise DiskError( f'The boot partition mounted at {boot_mount} is not large enough to install a boot loader. ' f'Please resize it to at least 200MiB and re-run the installation.' ) def sanity_check(self): # self._verify_boot_part() self._verify_service_stop() def mount_ordered_layout(self): info('Mounting partitions in order') for mod in self._disk_config.device_modifications: # partitions have to mounted in the right order on btrfs the mountpoint will # be empty as the actual subvolumes are getting mounted instead so we'll use # '/' just for sorting sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/')) enc_partitions = [] if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption: enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions)) # attempt to decrypt all luks partitions luks_handlers = self._prepare_luks_partitions(enc_partitions) for part_mod in sorted_part_mods: if luks_handler := luks_handlers.get(part_mod): # mount encrypted partition self._mount_luks_partition(part_mod, luks_handler) else: # partition is not encrypted self._mount_partition(part_mod) def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[ disk.PartitionModification, Luks2]: return { part_mod: disk.device_handler.unlock_luks2_dev( part_mod.dev_path, part_mod.mapper_name, self._disk_encryption.encryption_password ) for part_mod in partitions if part_mod.mapper_name and part_mod.dev_path } def _mount_partition(self, part_mod: disk.PartitionModification): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined if part_mod.mountpoint and part_mod.dev_path: target = self.target / part_mod.relative_mountpoint disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options) if part_mod.fs_type == disk.FilesystemType.Btrfs and part_mod.dev_path: self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols) def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined if part_mod.mountpoint and luks_handler.mapper_dev: target = self.target / part_mod.relative_mountpoint disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols) def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]): for subvol in subvolumes: mountpoint = self.target / subvol.relative_mountpoint mount_options = subvol.mount_options + [f'subvol={subvol.name}'] disk.device_handler.mount(dev_path, mountpoint, options=mount_options) def generate_key_files(self): for part_mod in self._disk_encryption.partitions: gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod) luks_handler = Luks2( part_mod.safe_dev_path, mapper_name=part_mod.mapper_name, password=self._disk_encryption.encryption_password ) if gen_enc_file and not part_mod.is_root(): info(f'Creating key-file: {part_mod.dev_path}') luks_handler.create_keyfile(self.target) if part_mod.is_root() and not gen_enc_file: if self._disk_encryption.hsm_device: disk.Fido2.fido2_enroll( self._disk_encryption.hsm_device, part_mod, self._disk_encryption.encryption_password ) def sync_log_to_install_medium(self) -> bool: # Copy over the install log (if there is one) to the install medium if # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. if self.helper_flags.get('base-strapped', False) is True: if filename := storage.get('LOG_FILE', None): absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"): os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}") shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}") return True def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'): if file[:1] != '/': file = f"/{file}" if len(file.strip()) <= 0 or file == '/': raise ValueError(f"The filename for the swap file has to be a valid path, not: {self.target}{file}") SysCommand(f'dd if=/dev/zero of={self.target}{file} bs={size} count=1') SysCommand(f'chmod 0600 {self.target}{file}') SysCommand(f'mkswap {self.target}{file}') self._fstab_entries.append(f'{file} none swap defaults 0 0') if enable_resume: resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode() resume_offset = SysCommand( f'/usr/bin/filefrag -v {self.target}{file}' ).decode().split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip() self._hooks.append('resume') self._kernel_params.append(f'resume=UUID={resume_uuid}') self._kernel_params.append(f'resume_offset={resume_offset}') def post_install_check(self, *args: str, **kwargs: str) -> List[str]: return [step for step, flag in self.helper_flags.items() if flag is False] def set_mirrors(self, mirror_config: MirrorConfiguration, on_target: bool = False): """ Set the mirror configuration for the installation. :param mirror_config: The mirror configuration to use. :type mirror_config: MirrorConfiguration :on_target: Whether to set the mirrors on the target system or the live system. :param on_target: bool """ debug('Setting mirrors') for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): if result := plugin.on_mirrors(mirror_config): mirror_config = result if on_target: local_pacman_conf = Path(f'{self.target}/etc/pacman.conf') local_mirrorlist_conf = Path(f'{self.target}/etc/pacman.d/mirrorlist') else: local_pacman_conf = Path('/etc/pacman.conf') local_mirrorlist_conf = Path('/etc/pacman.d/mirrorlist') mirrorlist_config = mirror_config.mirrorlist_config() pacman_config = mirror_config.pacman_config() if pacman_config: debug(f'Pacman config: {pacman_config}') with local_pacman_conf.open('a') as fp: fp.write(pacman_config) if mirrorlist_config: debug(f'Mirrorlist: {mirrorlist_config}') with local_mirrorlist_conf.open('a') as fp: fp.write(mirrorlist_config) def genfstab(self, flags: str = '-pU'): fstab_path = self.target / "etc" / "fstab" info(f"Updating {fstab_path}") try: gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode() except SysCallError as err: raise RequirementError( f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {err}') with open(fstab_path, 'a') as fp: fp.write(gen_fstab) if not fstab_path.is_file(): raise RequirementError(f'Could not create fstab file') for plugin in plugins.values(): if hasattr(plugin, 'on_genfstab'): if plugin.on_genfstab(self) is True: break with open(fstab_path, 'a') as fp: for entry in self._fstab_entries: fp.write(f'{entry}\n') for mod in self._disk_config.device_modifications: for part_mod in mod.partitions: if part_mod.fs_type != disk.FilesystemType.Btrfs: continue with fstab_path.open('r') as fp: fstab = fp.readlines() # Replace the {installation}/etc/fstab with entries # using the compress=zstd where the mountpoint has compression set. for index, line in enumerate(fstab): # So first we grab the mount options by using subvol=.*? as a locator. # And we also grab the mountpoint for the entry, for instance /var/log subvoldef = re.findall(',.*?subvol=.*?[\t ]', line) mountpoint = re.findall('[\t ]/.*?[\t ]', line) if not subvoldef or not mountpoint: continue for sub_vol in part_mod.btrfs_subvols: # We then locate the correct subvolume and check if it's compressed, # and skip entries where compression is already defined # We then sneak in the compress=zstd option if it doesn't already exist: if sub_vol.compress and str(sub_vol.mountpoint) == Path( mountpoint[0].strip()) and ',compress=zstd,' not in line: fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}') break with fstab_path.open('w') as fp: fp.writelines(fstab) def set_hostname(self, hostname: str, *args: str, **kwargs: str) -> None: with open(f'{self.target}/etc/hostname', 'w') as fh: fh.write(hostname + '\n') def set_locale(self, locale_config: LocaleConfiguration) -> bool: modifier = '' lang = locale_config.sys_lang encoding = locale_config.sys_enc # This is a temporary patch to fix #1200 if '.' in locale_config.sys_lang: lang, potential_encoding = locale_config.sys_lang.split('.', 1) # Override encoding if encoding is set to the default parameter # and the "found" encoding differs. if locale_config.sys_enc == 'UTF-8' and locale_config.sys_enc != potential_encoding: encoding = potential_encoding # Make sure we extract the modifier, that way we can put it in if needed. if '@' in locale_config.sys_lang: lang, modifier = locale_config.sys_lang.split('@', 1) modifier = f"@{modifier}" # - End patch locale_gen = self.target / 'etc/locale.gen' locale_gen_lines = locale_gen.read_text().splitlines(True) # A locale entry in /etc/locale.gen may or may not contain the encoding # in the first column of the entry; check for both cases. entry_re = re.compile(rf'#{lang}(\.{encoding})?{modifier} {encoding}') for index, line in enumerate(locale_gen_lines): if entry_re.match(line): uncommented_line = line.removeprefix('#') locale_gen_lines[index] = uncommented_line locale_gen.write_text(''.join(locale_gen_lines)) lang_value = uncommented_line.split()[0] break else: error(f"Invalid locale: language '{locale_config.sys_lang}', encoding '{locale_config.sys_enc}'") return False try: SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen') except SysCallError as e: error(f'Failed to run locale-gen on target: {e}') return False (self.target / 'etc/locale.conf').write_text(f'LANG={lang_value}\n') return True def set_timezone(self, zone: str, *args: str, **kwargs: str) -> bool: if not zone: return True if not len(zone): return True # Redundant for plugin in plugins.values(): if hasattr(plugin, 'on_timezone'): if result := plugin.on_timezone(zone): zone = result if (Path("/usr") / "share" / "zoneinfo" / zone).exists(): (Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') return True else: warn(f'Time zone {zone} does not exist, continuing with system default') return False def activate_time_synchronization(self) -> None: info('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers') self.enable_service('systemd-timesyncd') def enable_espeakup(self) -> None: info('Enabling espeakup.service for speech synthesis (accessibility)') self.enable_service('espeakup') def enable_periodic_trim(self) -> None: info("Enabling periodic TRIM") # fstrim is owned by util-linux, a dependency of both base and systemd. self.enable_service("fstrim.timer") def enable_service(self, services: Union[str, List[str]]) -> None: if isinstance(services, str): services = [services] for service in services: info(f'Enabling service {service}') try: self.arch_chroot(f'systemctl enable {service}') except SysCallError as err: raise ServiceException(f"Unable to start service {service}: {err}") for plugin in plugins.values(): if hasattr(plugin, 'on_service'): plugin.on_service(service) def run_command(self, cmd: str, *args: str, **kwargs: str) -> SysCommand: return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}') def arch_chroot(self, cmd: str, run_as: Optional[str] = None) -> SysCommand: if run_as: cmd = f"su - {run_as} -c {shlex.quote(cmd)}" return self.run_command(cmd) def drop_to_shell(self) -> None: subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) def configure_nic(self, nic: Nic): conf = nic.as_systemd_config() for plugin in plugins.values(): if hasattr(plugin, 'on_configure_nic'): conf = plugin.on_configure_nic( nic.iface, nic.dhcp, nic.ip, nic.gateway, nic.dns ) or conf with open(f"{self.target}/etc/systemd/network/10-{nic.iface}.network", "a") as netconf: netconf.write(str(conf)) def copy_iso_network_config(self, enable_services: bool = False) -> bool: # Copy (if any) iwd password and config files if os.path.isdir('/var/lib/iwd/'): if psk_files := glob.glob('/var/lib/iwd/*.psk'): if not os.path.isdir(f"{self.target}/var/lib/iwd"): os.makedirs(f"{self.target}/var/lib/iwd") if enable_services: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: self.base_packages.append('iwd') # This function will be called after minimal_installation() # as a hook for post-installs. This hook is only needed if # base is not installed yet. def post_install_enable_iwd_service(*args: str, **kwargs: str): self.enable_service('iwd') self.post_base_install.append(post_install_enable_iwd_service) # Otherwise, we can go ahead and add the required package # and enable it's service: else: self.pacman.strap('iwd') self.enable_service('iwd') for psk in psk_files: shutil.copy2(psk, f"{self.target}/var/lib/iwd/{os.path.basename(psk)}") # Copy (if any) systemd-networkd config files if netconfigurations := glob.glob('/etc/systemd/network/*'): if not os.path.isdir(f"{self.target}/etc/systemd/network/"): os.makedirs(f"{self.target}/etc/systemd/network/") for netconf_file in netconfigurations: shutil.copy2(netconf_file, f"{self.target}/etc/systemd/network/{os.path.basename(netconf_file)}") if enable_services: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: def post_install_enable_networkd_resolved(*args: str, **kwargs: str): self.enable_service(['systemd-networkd', 'systemd-resolved']) self.post_base_install.append(post_install_enable_networkd_resolved) # Otherwise, we can go ahead and enable the services else: self.enable_service(['systemd-networkd', 'systemd-resolved']) return True def mkinitcpio(self, flags: List[str]) -> bool: for plugin in plugins.values(): if hasattr(plugin, 'on_mkinitcpio'): # Allow plugins to override the usage of mkinitcpio altogether. if plugin.on_mkinitcpio(self): return True with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit: mkinit.write(f"MODULES=({' '.join(self._modules)})\n") mkinit.write(f"BINARIES=({' '.join(self._binaries)})\n") mkinit.write(f"FILES=({' '.join(self._files)})\n") if not self._disk_encryption.hsm_device: # For now, if we don't use HSM we revert to the old # way of setting up encryption hooks for mkinitcpio. # This is purely for stability reasons, we're going away from this. # * systemd -> udev # * sd-vconsole -> keymap self._hooks = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self._hooks] mkinit.write(f"HOOKS=({' '.join(self._hooks)})\n") try: SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}', peek_output=True) return True except SysCallError as error: if error.worker: log(error.worker._trace_log.decode()) return False def _get_microcode(self) -> Optional[Path]: if not SysInfo.is_vm(): if vendor := SysInfo.cpu_vendor(): return vendor.get_ucode() return None def minimal_installation( self, testing: bool = False, multilib: bool = False, mkinitcpio: bool = True, hostname: str = 'archinstall', locale_config: LocaleConfiguration = LocaleConfiguration.default() ): _disable_fstrim = False for mod in self._disk_config.device_modifications: for part in mod.partitions: if part.fs_type is not None: if (pkg := part.fs_type.installation_pkg) is not None: self.base_packages.append(pkg) if (module := part.fs_type.installation_module) is not None: self._modules.append(module) if (binary := part.fs_type.installation_binary) is not None: self._binaries.append(binary) # https://github.com/archlinux/archinstall/issues/1837 if part.fs_type.fs_type_mount == 'btrfs': _disable_fstrim = True # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: if 'fsck' in self._hooks: self._hooks.remove('fsck') if part in self._disk_encryption.partitions: if self._disk_encryption.hsm_device: # Required by mkinitcpio to add support for fido2-device options self.pacman.strap('libfido2') if 'sd-encrypt' not in self._hooks: self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') else: if 'encrypt' not in self._hooks: self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') if not SysInfo.has_uefi(): self.base_packages.append('grub') if ucode := self._get_microcode(): (self.target / 'boot' / ucode).unlink(missing_ok=True) self.base_packages.append(ucode.stem) else: debug('Archinstall will not install any ucode.') # Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set. # This action takes place on the host system as pacstrap copies over package repository lists. pacman_conf = pacman.Config(self.target) if multilib: info("The multilib flag is set. This system will be installed with the multilib repository enabled.") pacman_conf.enable(pacman.Repo.Multilib) else: info("The multilib flag is not set. This system will be installed without multilib repositories enabled.") if testing: info("The testing flag is set. This system will be installed with testing repositories enabled.") pacman_conf.enable(pacman.Repo.Testing) else: info("The testing flag is not set. This system will be installed without testing repositories enabled.") pacman_conf.apply() self.pacman.strap(self.base_packages) self.helper_flags['base-strapped'] = True pacman_conf.persist() # Periodic TRIM may improve the performance and longevity of SSDs whilst # having no adverse effect on other devices. Most distributions enable # periodic TRIM by default. # # https://github.com/archlinux/archinstall/issues/880 # https://github.com/archlinux/archinstall/issues/1837 # https://github.com/archlinux/archinstall/issues/1841 if not _disable_fstrim: self.enable_periodic_trim() # TODO: Support locale and timezone # os.remove(f'{self.target}/etc/localtime') # sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime') # sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime') self.set_hostname(hostname) self.set_locale(locale_config) # TODO: Use python functions for this SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') if mkinitcpio and not self.mkinitcpio(['-P']): error('Error generating initramfs (continuing anyway)') self.helper_flags['base'] = True # Run registered post-install hooks for function in self.post_base_install: info(f"Running post-installation hook: {function}") function(self) for plugin in plugins.values(): if hasattr(plugin, 'on_install'): plugin.on_install(self) def setup_swap(self, kind: str = 'zram'): if kind == 'zram': info(f"Setting up swap on zram") self.pacman.strap('zram-generator') # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' # shutil.copy2(f"{self.target}{zram_example_location}", f"{self.target}/usr/lib/systemd/zram-generator.conf") with open(f"{self.target}/etc/systemd/zram-generator.conf", "w") as zram_conf: zram_conf.write("[zram0]\n") self.enable_service('systemd-zram-setup@zram0.service') self._zram_enabled = True else: raise ValueError(f"Archinstall currently only supports setting up swap on zram") def _get_efi_partition(self) -> Optional[disk.PartitionModification]: for layout in self._disk_config.device_modifications: if partition := layout.get_efi_partition(): return partition return None def _get_boot_partition(self) -> Optional[disk.PartitionModification]: for layout in self._disk_config.device_modifications: if boot := layout.get_boot_partition(): return boot return None def _get_root_partition(self) -> Optional[disk.PartitionModification]: for mod in self._disk_config.device_modifications: if root := mod.get_root_partition(): return root return None def _get_kernel_params( self, root_partition: disk.PartitionModification, id_root: bool = True, partuuid: bool = True ) -> List[str]: kernel_parameters = [] if root_partition in self._disk_encryption.partitions: # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) if self._disk_encryption and self._disk_encryption.hsm_device: debug(f'Root partition is an encrypted device, identifying by UUID: {root_partition.uuid}') # Note: UUID must be used, not PARTUUID for sd-encrypt to work kernel_parameters.append(f'rd.luks.name={root_partition.uuid}=root') # Note: tpm2-device and fido2-device don't play along very well: # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 kernel_parameters.append('rd.luks.options=fido2-device=auto,password-echo=no') elif partuuid: debug(f'Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}') kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:root') else: debug(f'Root partition is an encrypted device, identifying by UUID: {root_partition.uuid}') kernel_parameters.append(f'cryptdevice=UUID={root_partition.uuid}:root') if id_root: kernel_parameters.append('root=/dev/mapper/root') elif id_root: if partuuid: debug(f'Identifying root partition by PARTUUID: {root_partition.partuuid}') kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid}') else: debug(f'Identifying root partition by UUID: {root_partition.uuid}') kernel_parameters.append(f'root=UUID={root_partition.uuid}') # Zswap should be disabled when using zram. # https://github.com/archlinux/archinstall/issues/881 if self._zram_enabled: kernel_parameters.append('zswap.enabled=0') if id_root: for sub_vol in root_partition.btrfs_subvols: if sub_vol.is_root(): kernel_parameters.append(f'rootflags=subvol={sub_vol.name}') break kernel_parameters.append('rw') kernel_parameters.append(f'rootfstype={root_partition.safe_fs_type.fs_type_mount}') kernel_parameters.extend(self._kernel_params) debug(f'kernel parameters: {" ".join(kernel_parameters)}') return kernel_parameters def _add_systemd_bootloader( self, boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification, efi_partition: Optional[disk.PartitionModification], uki_enabled: bool = False ): self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): raise HardwareIncompatibilityError # TODO: Ideally we would want to check if another config # points towards the same disk and/or partition. # And in which case we should do some clean up. bootctl_options = [] if efi_partition and boot_partition != efi_partition: bootctl_options.append(f'--esp-path={efi_partition.mountpoint}') bootctl_options.append(f'--boot-path={boot_partition.mountpoint}') # Install the boot loader try: SysCommand(f"/usr/bin/arch-chroot {self.target} bootctl {' '.join(bootctl_options)} install") except SysCallError: # Fallback, try creating the boot loader without touching the EFI variables SysCommand(f"/usr/bin/arch-chroot {self.target} bootctl --no-variables {' '.join(bootctl_options)} install") # Ensure that the $BOOT/loader/ directory exists before we try to create files in it. # # As mentioned in https://github.com/archlinux/archinstall/pull/1859 - we store the # loader entries in $BOOT/loader/ rather than $ESP/loader/ # The current reasoning being that $BOOT works in both use cases as well # as being tied to the current installation. This may change. loader_dir = self.target / 'boot/loader' loader_dir.mkdir(parents=True, exist_ok=True) default_kernel = self.kernels[0] if uki_enabled: default_entry = f'arch-{default_kernel}.efi' else: entry_name = self.init_time + '_{kernel}{variant}.conf' default_entry = entry_name.format(kernel=default_kernel, variant='') default = f'default {default_entry}' # Modify or create a loader.conf loader_conf = loader_dir / 'loader.conf' try: loader_data = loader_conf.read_text().splitlines() except FileNotFoundError: loader_data = [ default, 'timeout 15' ] else: for index, line in enumerate(loader_data): if line.startswith('default'): loader_data[index] = default elif line.startswith('#timeout'): # We add in the default timeout to support dual-boot loader_data[index] = line.removeprefix('#') loader_conf.write_text('\n'.join(loader_data) + '\n') if uki_enabled: return # Ensure that the $BOOT/loader/entries/ directory exists before we try to create files in it entries_dir = loader_dir / 'entries' entries_dir.mkdir(parents=True, exist_ok=True) comments = ( '# Created by: archinstall', f'# Created on: {self.init_time}' ) microcode = [] if ucode := self._get_microcode(): microcode.append(f'initrd /{ucode}') else: debug('Archinstall will not add any ucode to systemd-boot config.') options = 'options ' + ' '.join(self._get_kernel_params(root_partition)) for kernel in self.kernels: for variant in ("", "-fallback"): # Setup the loader entry entry = [ *comments, f'title Arch Linux ({kernel}{variant})', f'linux /vmlinuz-{kernel}', *microcode, f'initrd /initramfs-{kernel}{variant}.img', options, ] name = entry_name.format(kernel=kernel, variant=variant) entry_conf = entries_dir / name entry_conf.write_text('\n'.join(entry) + '\n') self.helper_flags['bootloader'] = 'systemd' def _add_grub_bootloader( self, boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification, efi_partition: Optional[disk.PartitionModification] ): self.pacman.strap('grub') # no need? grub_default = self.target / 'etc/default/grub' config = grub_default.read_text() kernel_parameters = ' '.join(self._get_kernel_params(root_partition, False, False)) config = re.sub(r'(GRUB_CMDLINE_LINUX=")("\n)', rf'\1{kernel_parameters}\2', config, 1) grub_default.write_text(config) info(f"GRUB boot partition: {boot_partition.dev_path}") if boot_partition == root_partition and root_partition.mountpoint: boot_dir = root_partition.mountpoint / 'boot' elif boot_partition.mountpoint: boot_dir = boot_partition.mountpoint else: raise ValueError('Could not detect boot directory') command = [ '/usr/bin/arch-chroot', str(self.target), 'grub-install', '--debug' ] if SysInfo.has_uefi(): if not efi_partition: raise ValueError('Could not detect efi partition') info(f"GRUB EFI partition: {efi_partition.dev_path}") self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? boot_dir_arg = [] if boot_partition != efi_partition: boot_dir_arg.append(f'--boot-directory={boot_dir}') add_options = [ '--target=x86_64-efi', f'--efi-directory={efi_partition.mountpoint}', *boot_dir_arg, '--bootloader-id=GRUB', '--removable' ] command.extend(add_options) try: SysCommand(command, peek_output=True) except SysCallError: try: SysCommand(command, peek_output=True) except SysCallError as err: raise DiskError(f"Could not install GRUB to {self.target}{efi_partition.mountpoint}: {err}") else: info(f"GRUB boot partition: {boot_partition.dev_path}") parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path) add_options = [ '--target=i386-pc', '--recheck', str(parent_dev_path) ] try: SysCommand(command + add_options, peek_output=True) except SysCallError as err: raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {err}") try: SysCommand( f'/usr/bin/arch-chroot {self.target} ' f'grub-mkconfig -o {boot_dir}/grub/grub.cfg' ) except SysCallError as err: raise DiskError(f"Could not configure GRUB: {err}") self.helper_flags['bootloader'] = "grub" def _add_limine_bootloader( self, boot_partition: disk.PartitionModification, efi_partition: Optional[disk.PartitionModification], root_partition: disk.PartitionModification ): self.pacman.strap('limine') info(f"Limine boot partition: {boot_partition.dev_path}") limine_path = self.target / 'usr' / 'share' / 'limine' hook_command = None if SysInfo.has_uefi(): if not efi_partition: raise ValueError('Could not detect efi partition') elif not efi_partition.mountpoint: raise ValueError('EFI partition is not mounted') info(f"Limine EFI partition: {efi_partition.dev_path}") try: efi_dir_path = self.target / efi_partition.mountpoint.relative_to('/') / 'EFI' / 'BOOT' efi_dir_path.mkdir(parents=True, exist_ok=True) for file in ('BOOTIA32.EFI', 'BOOTX64.EFI'): shutil.copy(limine_path / file, efi_dir_path) except Exception as err: raise DiskError(f'Failed to install Limine in {self.target}{efi_partition.mountpoint}: {err}') hook_command = f'/usr/bin/cp /usr/share/limine/BOOTIA32.EFI {efi_partition.mountpoint}/EFI/BOOT/' \ f' && /usr/bin/cp /usr/share/limine/BOOTX64.EFI {efi_partition.mountpoint}/EFI/BOOT/' else: parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path) if unique_path := disk.device_handler.get_unique_path_for_device(parent_dev_path): parent_dev_path = unique_path try: # The `limine-bios.sys` file contains stage 3 code. shutil.copy(limine_path / 'limine-bios.sys', self.target / 'boot') # `limine bios-install` deploys the stage 1 and 2 to the disk. SysCommand(f'/usr/bin/arch-chroot {self.target} limine bios-install {parent_dev_path}', peek_output=True) except Exception as err: raise DiskError(f'Failed to install Limine on {parent_dev_path}: {err}') hook_command = f'/usr/bin/limine bios-install {parent_dev_path}' \ f' && /usr/bin/cp /usr/share/limine/limine-bios.sys /boot/' hook_contents = f'''[Trigger] Operation = Install Operation = Upgrade Type = Package Target = limine [Action] Description = Deploying Limine after upgrade... When = PostTransaction Exec = /bin/sh -c "{hook_command}" ''' hooks_dir = self.target / 'etc' / 'pacman.d' / 'hooks' hooks_dir.mkdir(parents=True, exist_ok=True) hook_path = hooks_dir / '99-limine.hook' hook_path.write_text(hook_contents) microcode = [] if ucode := self._get_microcode(): microcode = [f'MODULE_PATH=boot:///{ucode}'] kernel_params = ' '.join(self._get_kernel_params(root_partition)) config_contents = 'TIMEOUT=5\n' for kernel in self.kernels: for variant in ('', '-fallback'): entry = [ f'PROTOCOL=linux', f'KERNEL_PATH=boot:///vmlinuz-{kernel}', *microcode, f'MODULE_PATH=boot:///initramfs-{kernel}{variant}.img', f'CMDLINE={kernel_params}', ] config_contents += f'\n:Arch Linux ({kernel}{variant})\n' config_contents += '\n'.join([f' {it}' for it in entry]) + '\n' config_path = self.target / 'boot' / 'limine.cfg' config_path.write_text(config_contents) self.helper_flags['bootloader'] = "limine" def _add_efistub_bootloader( self, boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification, uki_enabled: bool = False ): self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): raise HardwareIncompatibilityError # TODO: Ideally we would want to check if another config # points towards the same disk and/or partition. # And in which case we should do some clean up. if not uki_enabled: loader = '/vmlinuz-{kernel}' microcode = [] if ucode := self._get_microcode(): microcode.append(f'initrd=/{ucode}') else: debug('Archinstall will not add any ucode to firmware boot entry.') entries = ( *microcode, 'initrd=/initramfs-{kernel}.img', *self._get_kernel_params(root_partition) ) cmdline = [' '.join(entries)] else: loader = '/EFI/Linux/arch-{kernel}.efi' cmdline = [] parent_dev_path = disk.device_handler.get_parent_device_path(boot_partition.safe_dev_path) cmd_template = ( 'efibootmgr', '--create', '--disk', str(parent_dev_path), '--part', str(boot_partition.partn), '--label', 'Arch Linux ({kernel})', '--loader', loader, '--unicode', *cmdline, '--verbose' ) for kernel in self.kernels: # Setup the firmware entry cmd = [arg.format(kernel=kernel) for arg in cmd_template] SysCommand(cmd) self.helper_flags['bootloader'] = "efistub" def _config_uki( self, root_partition: disk.PartitionModification, efi_partition: Optional[disk.PartitionModification] ): if not efi_partition or not efi_partition.mountpoint: raise ValueError(f'Could not detect ESP at mountpoint {self.target}') # Set up kernel command line with open(self.target / 'etc/kernel/cmdline', 'w') as cmdline: kernel_parameters = self._get_kernel_params(root_partition) cmdline.write(' '.join(kernel_parameters) + '\n') ucode = self._get_microcode() diff_mountpoint = None if efi_partition.mountpoint != Path('/efi'): diff_mountpoint = str(efi_partition.mountpoint) image_re = re.compile('(.+_image="/([^"]+).+\n)') uki_re = re.compile('#((.+_uki=")/[^/]+(.+\n))') # Modify .preset files for kernel in self.kernels: preset = self.target / 'etc/mkinitcpio.d' / (kernel + '.preset') config = preset.read_text().splitlines(True) for index, line in enumerate(config): if not ucode and line.startswith('ALL_microcode='): config[index] = '#' + line # Avoid storing redundant image file elif m := image_re.match(line): image = self.target / m.group(2) image.unlink(missing_ok=True) config[index] = '#' + m.group(1) elif m := uki_re.match(line): if diff_mountpoint: config[index] = m.group(2) + diff_mountpoint + m.group(3) else: config[index] = m.group(1) elif line.startswith('#default_options='): config[index] = line.removeprefix('#') preset.write_text(''.join(config)) # Directory for the UKIs uki_dir = self.target / efi_partition.relative_mountpoint / 'EFI/Linux' uki_dir.mkdir(parents=True, exist_ok=True) # Build the UKIs if not self.mkinitcpio(['-P']): error('Error generating initramfs (continuing anyway)') def add_bootloader(self, bootloader: Bootloader, uki_enabled: bool = False): """ Adds a bootloader to the installation instance. Archinstall supports one of three types: * systemd-bootctl * grub * limine (beta) * efistub (beta) :param bootloader: Type of bootloader to be added """ for plugin in plugins.values(): if hasattr(plugin, 'on_add_bootloader'): # Allow plugins to override the boot-loader handling. # This allows for bot configuring and installing bootloaders. if plugin.on_add_bootloader(self): return True efi_partition = self._get_efi_partition() boot_partition = self._get_boot_partition() root_partition = self._get_root_partition() if boot_partition is None: raise ValueError(f'Could not detect boot at mountpoint {self.target}') if root_partition is None: raise ValueError(f'Could not detect root at mountpoint {self.target}') info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}') if uki_enabled: self._config_uki(root_partition, efi_partition) match bootloader: case Bootloader.Systemd: self._add_systemd_bootloader(boot_partition, root_partition, efi_partition, uki_enabled) case Bootloader.Grub: self._add_grub_bootloader(boot_partition, root_partition, efi_partition) case Bootloader.Efistub: self._add_efistub_bootloader(boot_partition, root_partition, uki_enabled) case Bootloader.Limine: self._add_limine_bootloader(boot_partition, efi_partition, root_partition) def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: return self.pacman.strap(packages) def _enable_users(self, service: str, users: List[User]): for user in users: self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username) def enable_sudo(self, entity: str, group :bool = False): info(f'Enabling sudo permissions for {entity}') sudoers_dir = f"{self.target}/etc/sudoers.d" # Creates directory if not exists if not (sudoers_path := Path(sudoers_dir)).exists(): sudoers_path.mkdir(parents=True) # Guarantees sudoer confs directory recommended perms os.chmod(sudoers_dir, 0o440) # Appends a reference to the sudoers file, because if we are here sudoers.d did not exist yet with open(f'{self.target}/etc/sudoers', 'a') as sudoers: sudoers.write('@includedir /etc/sudoers.d\n') # We count how many files are there already so we know which number to prefix the file with num_of_rules_already = len(os.listdir(sudoers_dir)) file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc # Guarantees that entity str does not contain invalid characters for a linux file name: # \ / : * ? " < > | safe_entity_file_name = re.sub(r'(\\|\/|:|\*|\?|"|<|>|\|)', '', entity) rule_file_name = f"{sudoers_dir}/{file_num_str}_{safe_entity_file_name}" with open(rule_file_name, 'a') as sudoers: sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') # Guarantees sudoer conf file recommended perms os.chmod(Path(rule_file_name), 0o440) def create_users(self, users: Union[User, List[User]]): if not isinstance(users, list): users = [users] for user in users: self.user_create(user.username, user.password, user.groups, user.sudo) def user_create(self, user: str, password: Optional[str] = None, groups: Optional[List[str]] = None, sudo: bool = False) -> None: if groups is None: groups = [] # This plugin hook allows for the plugin to handle the creation of the user. # Password and Group management is still handled by user_create() handled_by_plugin = False for plugin in plugins.values(): if hasattr(plugin, 'on_user_create'): if result := plugin.on_user_create(self, user): handled_by_plugin = result if not handled_by_plugin: info(f'Creating user {user}') try: SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') except SysCallError as err: raise SystemError(f"Could not create user inside installation: {err}") for plugin in plugins.values(): if hasattr(plugin, 'on_user_created'): if result := plugin.on_user_created(self, user): handled_by_plugin = result if password: self.user_set_pw(user, password) if groups: for group in groups: SysCommand(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}') if sudo and self.enable_sudo(user): self.helper_flags['user'] = True def user_set_pw(self, user :str, password :str) -> bool: info(f'Setting password for {user}') if user == 'root': # This means the root account isn't locked/disabled with * in /etc/passwd self.helper_flags['user'] = True combo = f'{user}:{password}' echo = shlex.join(['echo', combo]) sh = shlex.join(['sh', '-c', echo]) try: SysCommand(f"/usr/bin/arch-chroot {self.target} " + sh[:-1] + " | chpasswd'") return True except SysCallError: return False def user_set_shell(self, user :str, shell :str) -> bool: info(f'Setting shell for {user} to {shell}') try: SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") return True except SysCallError: return False def chown(self, owner :str, path :str, options :List[str] = []) -> bool: cleaned_path = path.replace('\'', '\\\'') try: SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'") return True except SysCallError: return False def set_keyboard_language(self, language: str) -> bool: info(f"Setting keyboard language to {language}") if len(language.strip()): if not verify_keyboard_layout(language): error(f"Invalid keyboard language specified: {language}") return False # In accordance with https://github.com/archlinux/archinstall/issues/107#issuecomment-841701968 # Setting an empty keymap first, allows the subsequent call to set layout for both console and x11. from .boot import Boot with Boot(self) as session: os.system('/usr/bin/systemd-run --machine=archinstall --pty localectl set-keymap ""') try: session.SysCommand(["localectl", "set-keymap", language]) except SysCallError as err: raise ServiceException(f"Unable to set locale '{language}' for console: {err}") info(f"Keyboard language for this installation is now set to: {language}") else: info('Keyboard language was not changed from default (no language specified)') return True def set_x11_keyboard_language(self, language: str) -> bool: """ A fallback function to set x11 layout specifically and separately from console layout. This isn't strictly necessary since .set_keyboard_language() does this as well. """ info(f"Setting x11 keyboard language to {language}") if len(language.strip()): if not verify_x11_keyboard_layout(language): error(f"Invalid x11-keyboard language specified: {language}") return False from .boot import Boot with Boot(self) as session: session.SysCommand(["localectl", "set-x11-keymap", '""']) try: session.SysCommand(["localectl", "set-x11-keymap", language]) except SysCallError as err: raise ServiceException(f"Unable to set locale '{language}' for X11: {err}") else: info(f'X11-Keyboard language was not changed from default (no language specified)') return True def _service_started(self, service_name: str) -> Optional[str]: if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'): service_name += '.service' # Just to be safe last_execution_time = SysCommand( f"systemctl show --property=ActiveEnterTimestamp --no-pager {service_name}", environment_vars={'SYSTEMD_COLORS': '0'} ).decode().lstrip('ActiveEnterTimestamp=') if not last_execution_time: return None return last_execution_time def _service_state(self, service_name: str) -> str: if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'): service_name += '.service' # Just to be safe return SysCommand( f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'} ).decode()