index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/filesystem.py | 295 |
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 9c6e6d35..5c11896e 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -3,13 +3,21 @@ from __future__ import annotations import signal import sys import time -from typing import Any, Optional, TYPE_CHECKING +from pathlib import Path +from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set -from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption from .device_handler import device_handler +from .device_model import ( + DiskLayoutConfiguration, DiskLayoutType, PartitionTable, + FilesystemType, DiskEncryption, LvmVolumeGroup, + Size, Unit, SectorSize, PartitionModification, EncryptionType, + LvmVolume, LvmConfiguration +) from ..hardware import SysInfo -from ..output import debug +from ..luks import Luks2 from ..menu import Menu +from ..output import debug, info +from ..general import SysCommand if TYPE_CHECKING: _: Any @@ -52,13 +60,288 @@ class FilesystemHandler: for mod in device_mods: device_handler.partition(mod, partition_table=partition_table) - device_handler.format(mod, enc_conf=self._enc_config) - for part_mod in mod.partitions: - if part_mod.is_create_or_modify(): + if self._disk_config.lvm_config: + for mod in device_mods: + if boot_part := mod.get_boot_partition(): + debug(f'Formatting boot partition: {boot_part.dev_path}') + self._format_partitions( + [boot_part], + mod.device_path + ) + + self.perform_lvm_operations() + else: + for mod in device_mods: + self._format_partitions( + mod.partitions, + mod.device_path + ) + + for part_mod in mod.partitions: if part_mod.fs_type == FilesystemType.Btrfs: device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config) + def _format_partitions( + self, + partitions: List[PartitionModification], + device_path: Path + ): + """ + Format can be given an overriding path, for instance /dev/null to test + the formatting functionality and in essence the support for the given filesystem. + """ + + # don't touch existing partitions + create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()] + + self._validate_partitions(create_or_modify_parts) + + # make sure all devices are unmounted + device_handler.umount_all_existing(device_path) + + for part_mod in create_or_modify_parts: + # partition will be encrypted + if self._enc_config is not None and part_mod in self._enc_config.partitions: + device_handler.format_encrypted( + part_mod.safe_dev_path, + part_mod.mapper_name, + part_mod.safe_fs_type, + self._enc_config + ) + else: + device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path) + + # synchronize with udev before using lsblk + SysCommand('udevadm settle') + + lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path) + + part_mod.partn = lsblk_info.partn + part_mod.partuuid = lsblk_info.partuuid + part_mod.uuid = lsblk_info.uuid + + def _validate_partitions(self, partitions: List[PartitionModification]): + checks = { + # verify that all partitions have a path set (which implies that they have been created) + lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'), + # crypto luks is not a valid file system type + lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError( + 'Crypto luks cannot be set as a filesystem type'), + # file system type must be set + lambda x: x.fs_type is None: ValueError('File system type must be set for modification') + } + + for check, exc in checks.items(): + found = next(filter(check, partitions), None) + if found is not None: + raise exc + + def perform_lvm_operations(self): + info('Setting up LVM config...') + + if not self._disk_config.lvm_config: + return + + if self._enc_config: + self._setup_lvm_encrypted( + self._disk_config.lvm_config, + self._enc_config + ) + else: + self._setup_lvm(self._disk_config.lvm_config) + self._format_lvm_vols(self._disk_config.lvm_config) + + def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption): + if enc_config.encryption_type == EncryptionType.LvmOnLuks: + enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False) + + self._setup_lvm(lvm_config, enc_mods) + self._format_lvm_vols(lvm_config) + + # export the lvm group safely otherwise the Luks cannot be closed + self._safely_close_lvm(lvm_config) + + for luks in enc_mods.values(): + luks.lock() + elif enc_config.encryption_type == EncryptionType.LuksOnLvm: + self._setup_lvm(lvm_config) + enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False) + self._format_lvm_vols(lvm_config, enc_vols) + + for luks in enc_vols.values(): + luks.lock() + + self._safely_close_lvm(lvm_config) + + def _safely_close_lvm(self, lvm_config: LvmConfiguration): + for vg in lvm_config.vol_groups: + for vol in vg.volumes: + device_handler.lvm_vol_change(vol, False) + + device_handler.lvm_export_vg(vg) + + def _setup_lvm( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + self._lvm_create_pvs(lvm_config, enc_mods) + + for vg in lvm_config.vol_groups: + pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods) + + device_handler.lvm_vg_create(pv_dev_paths, vg.name) + + # figure out what the actual available size in the group is + vg_info = device_handler.lvm_group_info(vg.name) + + if not vg_info: + raise ValueError('Unable to fetch VG info') + + # the actual available LVM Group size will be smaller than the + # total PVs size due to reserved metadata storage etc. + # so we'll have a look at the total avail. size, check the delta + # to the desired sizes and subtract some equally from the actually + # created volume + avail_size = vg_info.vg_size + desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default())) + + delta = desired_size - avail_size + max_vol_offset = delta.convert(Unit.B) + + max_vol = max(vg.volumes, key=lambda x: x.length) + + for lv in vg.volumes: + offset = max_vol_offset if lv == max_vol else None + + debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}') + device_handler.lvm_vol_create(vg.name, lv, offset) + + while True: + debug('Fetching LVM volume info') + lv_info = device_handler.lvm_vol_info(lv.name) + if lv_info is not None: + break + + time.sleep(1) + + self._lvm_vol_handle_e2scrub(vg) + + def _format_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_vols: Dict[LvmVolume, Luks2] = {} + ): + for vol in lvm_config.get_all_volumes(): + if enc_vol := enc_vols.get(vol, None): + if not enc_vol.mapper_dev: + raise ValueError('No mapper device defined') + path = enc_vol.mapper_dev + else: + path = vol.safe_dev_path + + # wait a bit otherwise the mkfs will fail as it can't + # find the mapper device yet + device_handler.format(vol.fs_type, path) + + if vol.fs_type == FilesystemType.Btrfs: + device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options) + + def _lvm_create_pvs( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + pv_paths: Set[Path] = set() + + for vg in lvm_config.vol_groups: + pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods) + + device_handler.lvm_pv_create(pv_paths) + + def _get_all_pv_dev_paths( + self, + pvs: List[PartitionModification], + enc_mods: Dict[PartitionModification, Luks2] = {} + ) -> Set[Path]: + pv_paths: Set[Path] = set() + + for pv in pvs: + if enc_pv := enc_mods.get(pv, None): + if mapper := enc_pv.mapper_dev: + pv_paths.add(mapper) + else: + pv_paths.add(pv.safe_dev_path) + + return pv_paths + + def _encrypt_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[LvmVolume, Luks2]: + enc_vols: Dict[LvmVolume, Luks2] = {} + + for vol in lvm_config.get_all_volumes(): + if vol in enc_config.lvm_volumes: + luks_handler = device_handler.encrypt( + vol.safe_dev_path, + vol.mapper_name, + enc_config.encryption_password, + lock_after_create + ) + + enc_vols[vol] = luks_handler + + return enc_vols + + def _encrypt_partitions( + self, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[PartitionModification, Luks2]: + enc_mods: Dict[PartitionModification, Luks2] = {} + + for mod in self._disk_config.device_modifications: + partitions = mod.partitions + + # don't touch existing partitions + filtered_part = [p for p in partitions if not p.exists()] + + self._validate_partitions(filtered_part) + + # make sure all devices are unmounted + device_handler.umount_all_existing(mod.device_path) + + enc_mods = {} + + for part_mod in filtered_part: + if part_mod in enc_config.partitions: + luks_handler = device_handler.encrypt( + part_mod.safe_dev_path, + part_mod.mapper_name, + enc_config.encryption_password, + lock_after_create=lock_after_create + ) + + enc_mods[part_mod] = luks_handler + + return enc_mods + + def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup): + # from arch wiki: + # If a logical volume will be formatted with ext4, leave at least 256 MiB + # free space in the volume group to allow using e2scrub + if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]): + largest_vol = max(vol_gp.volumes, key=lambda x: x.length) + + device_handler.lvm_vol_reduce( + largest_vol.safe_dev_path, + Size(256, Unit.MiB, SectorSize.default()) + ) + def _do_countdown(self) -> bool: SIG_TRIGGER = False |