index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/filesystem.py | 622 |
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 1083df53..5c11896e 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,301 +1,381 @@ from __future__ import annotations + +import signal +import sys import time -import logging -import json -import pathlib -from typing import Optional, Dict, Any, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -from ..models.disk_encryption import DiskEncryption +from pathlib import Path +from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set + +from .device_handler import device_handler +from .device_model import ( + DiskLayoutConfiguration, DiskLayoutType, PartitionTable, + FilesystemType, DiskEncryption, LvmVolumeGroup, + Size, Unit, SectorSize, PartitionModification, EncryptionType, + LvmVolume, LvmConfiguration +) +from ..hardware import SysInfo +from ..luks import Luks2 +from ..menu import Menu +from ..output import debug, info +from ..general import SysCommand if TYPE_CHECKING: - from .blockdevice import BlockDevice _: Any -from .partition import Partition -from .validators import valid_fs_type -from ..exceptions import DiskError, SysCallError -from ..general import SysCommand -from ..output import log -from ..storage import storage -GPT = 0b00000001 -MBR = 0b00000010 +class FilesystemHandler: + def __init__( + self, + disk_config: DiskLayoutConfiguration, + enc_conf: Optional[DiskEncryption] = None + ): + self._disk_config = disk_config + self._enc_config = enc_conf + + def perform_filesystem_operations(self, show_countdown: bool = True): + if self._disk_config.config_type == DiskLayoutType.Pre_mount: + debug('Disk layout configuration is set to pre-mount, not performing any operations') + return + + device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications)) + + if not device_mods: + debug('No modifications required') + return + + device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods]) + + # Issue a final warning before we continue with something un-revertable. + # We mention the drive one last time, and count from 5 to 0. + print(str(_(' ! Formatting {} in ')).format(device_paths)) + + if show_countdown: + self._do_countdown() + + # Setup the blockdevice, filesystem (and optionally encryption). + # Once that's done, we'll hand over to perform_installation() + partition_table = PartitionTable.GPT + if SysInfo.has_uefi() is False: + partition_table = PartitionTable.MBR + + for mod in device_mods: + device_handler.partition(mod, partition_table=partition_table) + + if self._disk_config.lvm_config: + for mod in device_mods: + if boot_part := mod.get_boot_partition(): + debug(f'Formatting boot partition: {boot_part.dev_path}') + self._format_partitions( + [boot_part], + mod.device_path + ) + + self.perform_lvm_operations() + else: + for mod in device_mods: + self._format_partitions( + mod.partitions, + mod.device_path + ) -# A sane default is 5MiB, that allows for plenty of buffer for GRUB on MBR -# but also 4MiB for memory cards for instance. And another 1MiB to avoid issues. -# (we've been pestered by disk issues since the start, so please let this be here for a few versions) -DEFAULT_PARTITION_START = '5MiB' + for part_mod in mod.partitions: + if part_mod.fs_type == FilesystemType.Btrfs: + device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config) -class Filesystem: - # TODO: - # When instance of a HDD is selected, check all usages and gracefully unmount them - # as well as close any crypto handles. - def __init__(self, blockdevice :BlockDevice, mode :int): - self.blockdevice = blockdevice - self.mode = mode + def _format_partitions( + self, + partitions: List[PartitionModification], + device_path: Path + ): + """ + Format can be given an overriding path, for instance /dev/null to test + the formatting functionality and in essence the support for the given filesystem. + """ - def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': - return self + # don't touch existing partitions + create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()] - def __repr__(self) -> str: - return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" + self._validate_partitions(create_or_modify_parts) - def __exit__(self, *args :str, **kwargs :str) -> bool: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - if len(args) >= 2 and args[1]: - raise args[1] + # make sure all devices are unmounted + device_handler.umount_all_existing(device_path) - SysCommand('sync') - return True + for part_mod in create_or_modify_parts: + # partition will be encrypted + if self._enc_config is not None and part_mod in self._enc_config.partitions: + device_handler.format_encrypted( + part_mod.safe_dev_path, + part_mod.mapper_name, + part_mod.safe_fs_type, + self._enc_config + ) + else: + device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path) + + # synchronize with udev before using lsblk + SysCommand('udevadm settle') + + lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path) + + part_mod.partn = lsblk_info.partn + part_mod.partuuid = lsblk_info.partuuid + part_mod.uuid = lsblk_info.uuid + + def _validate_partitions(self, partitions: List[PartitionModification]): + checks = { + # verify that all partitions have a path set (which implies that they have been created) + lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'), + # crypto luks is not a valid file system type + lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError( + 'Crypto luks cannot be set as a filesystem type'), + # file system type must be set + lambda x: x.fs_type is None: ValueError('File system type must be set for modification') + } + + for check, exc in checks.items(): + found = next(filter(check, partitions), None) + if found is not None: + raise exc + + def perform_lvm_operations(self): + info('Setting up LVM config...') + + if not self._disk_config.lvm_config: + return + + if self._enc_config: + self._setup_lvm_encrypted( + self._disk_config.lvm_config, + self._enc_config + ) + else: + self._setup_lvm(self._disk_config.lvm_config) + self._format_lvm_vols(self._disk_config.lvm_config) + + def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption): + if enc_config.encryption_type == EncryptionType.LvmOnLuks: + enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False) + + self._setup_lvm(lvm_config, enc_mods) + self._format_lvm_vols(lvm_config) + + # export the lvm group safely otherwise the Luks cannot be closed + self._safely_close_lvm(lvm_config) + + for luks in enc_mods.values(): + luks.lock() + elif enc_config.encryption_type == EncryptionType.LuksOnLvm: + self._setup_lvm(lvm_config) + enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False) + self._format_lvm_vols(lvm_config, enc_vols) + + for luks in enc_vols.values(): + luks.lock() + + self._safely_close_lvm(lvm_config) + + def _safely_close_lvm(self, lvm_config: LvmConfiguration): + for vg in lvm_config.vol_groups: + for vol in vg.volumes: + device_handler.lvm_vol_change(vol, False) + + device_handler.lvm_export_vg(vg) + + def _setup_lvm( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + self._lvm_create_pvs(lvm_config, enc_mods) + + for vg in lvm_config.vol_groups: + pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods) + + device_handler.lvm_vg_create(pv_dev_paths, vg.name) + + # figure out what the actual available size in the group is + vg_info = device_handler.lvm_group_info(vg.name) + + if not vg_info: + raise ValueError('Unable to fetch VG info') + + # the actual available LVM Group size will be smaller than the + # total PVs size due to reserved metadata storage etc. + # so we'll have a look at the total avail. size, check the delta + # to the desired sizes and subtract some equally from the actually + # created volume + avail_size = vg_info.vg_size + desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default())) + + delta = desired_size - avail_size + max_vol_offset = delta.convert(Unit.B) + + max_vol = max(vg.volumes, key=lambda x: x.length) + + for lv in vg.volumes: + offset = max_vol_offset if lv == max_vol else None + + debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}') + device_handler.lvm_vol_create(vg.name, lv, offset) - def partuuid_to_index(self, uuid :str) -> Optional[int]: - for i in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - # We'll use unreliable lbslk to grab children under the /dev/<device> - output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8')) - - for device in output['blockdevices']: - for index, partition in enumerate(device.get('children', [])): - # But we'll use blkid to reliably grab the PARTUUID for that child device (partition) - partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip() - if partition_uuid.lower() == uuid.lower(): - return index - - raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") - - def load_layout(self, layout :Dict[str, Any]) -> None: - from ..luks import luks2 - from .btrfs import BTRFSPartition - - # If the layout tells us to wipe the drive, we do so - if layout.get('wipe', False): - if self.mode == GPT: - if not self.parted_mklabel(self.blockdevice.device, "gpt"): - raise KeyError(f"Could not create a GPT label on {self}") - elif self.mode == MBR: - if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MS-DOS label on {self}") - - self.blockdevice.flush_cache() - time.sleep(3) - - prev_partition = None - # We then iterate the partitions in order - for partition in layout.get('partitions', []): - # We don't want to re-add an existing partition (those containing a UUID already) - if partition.get('wipe', False) and not partition.get('PARTUUID', None): - start = partition.get('start') or ( - prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START) - partition['device_instance'] = self.add_partition(partition.get('type', 'primary'), - start=start, - end=partition.get('size', '100%'), - partition_format=partition.get('filesystem', {}).get('format', 'btrfs'), - skip_mklabel=layout.get('wipe', False) is not False) - - elif (partition_uuid := partition.get('PARTUUID')): - # We try to deal with both UUID and PARTUUID of a partition when it's being re-used. - # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID') - # but for now, lets just attempt to deal with both. - try: - partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid) - except DiskError: - partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid) - - log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray") + while True: + debug('Fetching LVM volume info') + lv_info = device_handler.lvm_vol_info(lv.name) + if lv_info is not None: + break + + time.sleep(1) + + self._lvm_vol_handle_e2scrub(vg) + + def _format_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_vols: Dict[LvmVolume, Luks2] = {} + ): + for vol in lvm_config.get_all_volumes(): + if enc_vol := enc_vols.get(vol, None): + if not enc_vol.mapper_dev: + raise ValueError('No mapper device defined') + path = enc_vol.mapper_dev else: - log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING) - continue - - if partition.get('filesystem', {}).get('format', False): - # needed for backward compatibility with the introduction of the new "format_options" - format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[]) - disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption') - - if disk_encryption and partition in disk_encryption.all_partitions: - if not partition['device_instance']: - raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") - - if partition.get('mountpoint',None): - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" - else: - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" - - partition['device_instance'].encrypt(password=disk_encryption.encryption_password) - # Immediately unlock the encrypted device to format the inner volume - with luks2(partition['device_instance'], loopdev, disk_encryption.encryption_password, auto_unmount=True) as unlocked_device: - if not partition.get('wipe'): - if storage['arguments'] == 'silent': - raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}") - else: - if not partition.get('filesystem'): - partition['filesystem'] = {} - - if not partition['filesystem'].get('format', False): - while True: - partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip() - if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False: - log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")) - continue - break - - unlocked_device.format(partition['filesystem']['format'], options=format_options) - - elif partition.get('wipe', False): - if not partition['device_instance']: - raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") - - partition['device_instance'].format(partition['filesystem']['format'], options=format_options) - - if partition['filesystem']['format'] == 'btrfs': - # We upgrade the device instance to a BTRFSPartition if we format it as such. - # This is so that we can gain access to more features than otherwise available in Partition() - partition['device_instance'] = BTRFSPartition( - partition['device_instance'].path, - block_device=partition['device_instance'].block_device, - encrypted=False, - filesystem='btrfs', - autodetect_filesystem=False - ) - - if partition.get('boot', False): - log(f"Marking partition {partition['device_instance']} as bootable.") - self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on') - - prev_partition = partition - - def find_partition(self, mountpoint :str) -> Partition: - for partition in self.blockdevice: - if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: - return partition - - def partprobe(self) -> bool: - try: - SysCommand(f'partprobe {self.blockdevice.device}') - except SysCallError as error: - log(f"Could not execute partprobe: {error!r}", level=logging.ERROR, fg="red") - raise DiskError(f"Could not run partprobe on {self.blockdevice.device}: {error!r}") + path = vol.safe_dev_path - return True + # wait a bit otherwise the mkfs will fail as it can't + # find the mapper device yet + device_handler.format(vol.fs_type, path) - def raw_parted(self, string: str) -> SysCommand: - try: - cmd_handle = SysCommand(f'/usr/bin/parted -s {string}') - time.sleep(0.5) - return cmd_handle - except SysCallError as error: - log(f"Parted ended with a bad exit code: {error.exit_code} ({error})", level=logging.ERROR, fg="red") - return error + if vol.fs_type == FilesystemType.Btrfs: + device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options) - def parted(self, string: str) -> bool: - """ - Performs a parted execution of the given string + def _lvm_create_pvs( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + pv_paths: Set[Path] = set() - :param string: A raw string passed to /usr/bin/parted -s <string> - :type string: str - """ - if (parted_handle := self.raw_parted(string)).exit_code == 0: - return self.partprobe() - else: - raise DiskError(f"Parted failed to add a partition: {parted_handle}") + for vg in lvm_config.vol_groups: + pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods) - def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: - # TODO: Implement this with declarative profiles instead. - raise ValueError("Installation().use_entire_disk() has to be re-worked.") + device_handler.lvm_pv_create(pv_paths) - def add_partition( + def _get_all_pv_dev_paths( self, - partition_type :str, - start :str, - end :str, - partition_format :Optional[str] = None, - skip_mklabel :bool = False - ) -> Partition: - log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) - - if len(self.blockdevice.partitions) == 0 and skip_mklabel is False: - # If it's a completely empty drive, and we're about to add partitions to it - # we need to make sure there's a filesystem label. - if self.mode == GPT: - if not self.parted_mklabel(self.blockdevice.device, "gpt"): - raise KeyError(f"Could not create a GPT label on {self}") - elif self.mode == MBR: - if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MS-DOS label on {self}") - - self.blockdevice.flush_cache() - - previous_partuuids = [] - for partition in self.blockdevice.partitions.values(): - try: - previous_partuuids.append(partition.part_uuid) - except DiskError: - pass - - # TODO this check should probably run in the setup process rather than during the installation - if self.mode == MBR: - if len(self.blockdevice.partitions) > 3: - DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions") - - if partition_format: - parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}' - else: - parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}' - - log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG) - - if self.parted(parted_string): - for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)): - self.blockdevice.flush_cache() - - new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()] - new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids)) - - if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()): - try: - return self.blockdevice.get_partition(partuuid=new_partuuid) - except Exception as err: - log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red") - log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red") - log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red") - log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red") - log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red") - raise err - else: - log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG) - self.partprobe() - time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) - else: - print("Parted did not return True during partition creation") + pvs: List[PartitionModification], + enc_mods: Dict[PartitionModification, Luks2] = {} + ) -> Set[Path]: + pv_paths: Set[Path] = set() + + for pv in pvs: + if enc_pv := enc_mods.get(pv, None): + if mapper := enc_pv.mapper_dev: + pv_paths.add(mapper) + else: + pv_paths.add(pv.safe_dev_path) + + return pv_paths + + def _encrypt_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[LvmVolume, Luks2]: + enc_vols: Dict[LvmVolume, Luks2] = {} + + for vol in lvm_config.get_all_volumes(): + if vol in enc_config.lvm_volumes: + luks_handler = device_handler.encrypt( + vol.safe_dev_path, + vol.mapper_name, + enc_config.encryption_password, + lock_after_create + ) + + enc_vols[vol] = luks_handler + + return enc_vols + + def _encrypt_partitions( + self, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[PartitionModification, Luks2]: + enc_mods: Dict[PartitionModification, Luks2] = {} + + for mod in self._disk_config.device_modifications: + partitions = mod.partitions + + # don't touch existing partitions + filtered_part = [p for p in partitions if not p.exists()] + + self._validate_partitions(filtered_part) + + # make sure all devices are unmounted + device_handler.umount_all_existing(mod.device_path) + + enc_mods = {} + + for part_mod in filtered_part: + if part_mod in enc_config.partitions: + luks_handler = device_handler.encrypt( + part_mod.safe_dev_path, + part_mod.mapper_name, + enc_config.encryption_password, + lock_after_create=lock_after_create + ) + + enc_mods[part_mod] = luks_handler - total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()]) - total_partitions.update(previous_partuuids) + return enc_mods - # TODO: This should never be able to happen - log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red") - log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red") - log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red") + def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup): + # from arch wiki: + # If a logical volume will be formatted with ext4, leave at least 256 MiB + # free space in the volume group to allow using e2scrub + if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]): + largest_vol = max(vol_gp.volumes, key=lambda x: x.length) - raise DiskError(f"Could not add partition using: {parted_string}") + device_handler.lvm_vol_reduce( + largest_vol.safe_dev_path, + Size(256, Unit.MiB, SectorSize.default()) + ) - def set_name(self, partition: int, name: str) -> bool: - return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 + def _do_countdown(self) -> bool: + SIG_TRIGGER = False - def set(self, partition: int, string: str) -> bool: - log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) - return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 + def kill_handler(sig: int, frame: Any) -> None: + print() + exit(0) - def parted_mklabel(self, device: str, disk_label: str) -> bool: - log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") - # Try to unmount devices before attempting to run mklabel - try: - SysCommand(f'bash -c "umount {device}?"') - except: - pass + def sig_handler(sig: int, frame: Any) -> None: + signal.signal(signal.SIGINT, kill_handler) - self.partprobe() - worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0 - self.partprobe() + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, sig_handler) - return worked + for i in range(5, 0, -1): + print(f"{i}", end='') + + for x in range(4): + sys.stdout.flush() + time.sleep(0.25) + print(".", end='') + + if SIG_TRIGGER: + prompt = _('Do you really want to abort?') + choice = Menu(prompt, Menu.yes_no(), skip=False).run() + if choice.value == Menu.yes(): + exit(0) + + if SIG_TRIGGER is False: + sys.stdin.read() + + SIG_TRIGGER = False + signal.signal(signal.SIGINT, sig_handler) + + print() + signal.signal(signal.SIGINT, original_sigint_handler) + + return True |