index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Andreas Baumann <mail@andreasbaumann.cc> | 2024-05-10 15:56:28 +0200 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2024-05-10 15:56:28 +0200 |
commit | 683da22298abbd90f51d4dd38a7ec4b0dfb04555 (patch) | |
tree | ec2ac04967f9277df038edc362201937b331abe5 /archinstall/lib/disk | |
parent | af7ab9833c9f9944874f0162ae0975175ddc628d (diff) | |
parent | 3381cd55673e5105697d354cf4a1be9a7bcef062 (diff) |
diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py index 352d04b9..7f881273 100644 --- a/archinstall/lib/disk/__init__.py +++ b/archinstall/lib/disk/__init__.py @@ -1,7 +1,48 @@ -from .btrfs import * -from .helpers import * -from .blockdevice import BlockDevice -from .filesystem import Filesystem, MBR, GPT -from .partition import * -from .user_guides import * -from .validators import *
\ No newline at end of file +from .device_handler import device_handler, disk_layouts +from .fido import Fido2 +from .filesystem import FilesystemHandler +from .subvolume_menu import SubvolumeMenu +from .partitioning_menu import ( + manual_partitioning, + PartitioningList +) +from .device_model import ( + _DeviceInfo, + BDevice, + DiskLayoutType, + DiskLayoutConfiguration, + LvmLayoutType, + LvmConfiguration, + LvmVolumeGroup, + LvmVolume, + LvmVolumeStatus, + PartitionTable, + Unit, + Size, + SectorSize, + SubvolumeModification, + DeviceGeometry, + PartitionType, + PartitionFlag, + FilesystemType, + ModificationStatus, + PartitionModification, + DeviceModification, + EncryptionType, + DiskEncryption, + Fido2Device, + LsblkInfo, + CleanType, + get_lsblk_info, + get_all_lsblk_info, + get_lsblk_by_mountpoint, +) +from .encryption_menu import ( + select_encryption_type, + select_encrypted_password, + select_hsm, + select_partitions_to_encrypt, + DiskEncryptionMenu, +) + +from .disk_menu import DiskLayoutConfigurationMenu diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py deleted file mode 100644 index 178b786a..00000000 --- a/archinstall/lib/disk/blockdevice.py +++ /dev/null @@ -1,301 +0,0 @@ -from __future__ import annotations -import json -import logging -import time - -from collections import OrderedDict -from dataclasses import dataclass -from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING - -from ..exceptions import DiskError, SysCallError -from ..output import log -from ..general import SysCommand -from ..storage import storage - - -if TYPE_CHECKING: - from .partition import Partition - _: Any - - -@dataclass -class BlockSizeInfo: - start: str - end: str - size: str - - -@dataclass -class BlockInfo: - pttype: str - ptuuid: str - size: int - tran: Optional[str] - rota: bool - free_space: Optional[List[BlockSizeInfo]] - - -class BlockDevice: - def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): - if not info: - from .helpers import all_blockdevices - # If we don't give any information, we need to auto-fill it. - # Otherwise any subsequent usage will break. - self.info = all_blockdevices(partitions=False)[path].info - else: - self.info = info - - self._path = path - self.keep_partitions = True - self._block_info = self._fetch_information() - self._partitions: Dict[str, 'Partition'] = {} - - self._load_partitions() - - # TODO: Currently disk encryption is a BIT misleading. - # It's actually partition-encryption, but for future-proofing this - # I'm placing the encryption password on a BlockDevice level. - - def __repr__(self, *args :str, **kwargs :str) -> str: - return self._str_repr - - @property - def path(self) -> str: - return self._path - - @property - def _str_repr(self) -> str: - return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})" - - def as_json(self) -> Dict[str, Any]: - return { - str(_('Device')): self._device_or_backfile, - str(_('Size')): f'{self.size}GB', - str(_('Free space')): f'{self._safe_free_space()}', - str(_('Bus-type')): f'{self.bus_type}' - } - - def __iter__(self) -> Iterator['Partition']: - for partition in self.partitions: - yield self.partitions[partition] - - def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: - if hasattr(self, key): - return getattr(self, key) - - if self.info and key in self.info: - return self.info[key] - - raise KeyError(f'{self.info} does not contain information: "{key}"') - - def __lt__(self, left_comparitor :'BlockDevice') -> bool: - return self._path < left_comparitor.path - - def json(self) -> str: - """ - json() has precedence over __dump__, so this is a way - to give less/partial information for user readability. - """ - return self._path - - def __dump__(self) -> Dict[str, Dict[str, Any]]: - return { - self._path: { - 'partuuid': self.uuid, - 'wipe': self.info.get('wipe', None), - 'partitions': [part.__dump__() for part in self.partitions.values()] - } - } - - def _call_lsblk(self, path: str) -> Dict[str, Any]: - output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8') - if output: - lsblk_info = json.loads(output) - return lsblk_info - - raise DiskError(f'Failed to read disk "{self.path}" with lsblk') - - def _load_partitions(self): - from .partition import Partition - - self._partitions.clear() - - lsblk_info = self._call_lsblk(self._path) - device = lsblk_info['blockdevices'][0] - self._partitions.clear() - - if children := device.get('children', None): - root = f'/dev/{device["name"]}' - for child in children: - part_id = child['name'].removeprefix(device['name']) - self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id) - - def _get_free_space(self) -> Optional[List[BlockSizeInfo]]: - # NOTE: parted -s will default to `cancel` on prompt, skipping any partition - # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, - # so the free will ignore the ESP partition and just give the "free" space. - # Doesn't harm us, but worth noting in case something weird happens. - try: - output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8') - if output: - free_lines = [line for line in output.split('\n') if 'free' in line] - sizes = [] - for free_space in free_lines: - _, start, end, size, *_ = free_space.strip('\r\n;').split(':') - sizes.append(BlockSizeInfo(start, end, size)) - - return sizes - except SysCallError as error: - log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG) - - return None - - def _fetch_information(self) -> BlockInfo: - lsblk_info = self._call_lsblk(self._path) - device = lsblk_info['blockdevices'][0] - free_space = self._get_free_space() - - return BlockInfo( - pttype=device['pttype'], - ptuuid=device['ptuuid'], - size=device['size'], - tran=device['tran'], - rota=device['rota'], - free_space=free_space - ) - - @property - def _device_or_backfile(self) -> Optional[str]: - """ - Returns the actual device-endpoint of the BlockDevice. - If it's a loop-back-device it returns the back-file, - For other types it return self.device - """ - if self.info.get('type') == 'loop': - return self.info['back-file'] - else: - return self.device - - @property - def mountpoint(self) -> None: - """ - A dummy function to enable transparent comparisons of mountpoints. - As blockdevices can't be mounted directly, this will always be None - """ - return None - - @property - def device(self) -> Optional[str]: - """ - Returns the device file of the BlockDevice. - If it's a loop-back-device it returns the /dev/X device, - If it's a ATA-drive it returns the /dev/X device - And if it's a crypto-device it returns the parent device - """ - if "DEVTYPE" not in self.info: - raise DiskError(f'Could not locate backplane info for "{self._path}"') - - if self.info['DEVTYPE'] in ['disk','loop']: - return self._path - elif self.info['DEVTYPE'][:4] == 'raid': - # This should catch /dev/md## raid devices - return self._path - elif self.info['DEVTYPE'] == 'crypt': - if 'pkname' not in self.info: - raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.') - return f"/dev/{self.info['pkname']}" - else: - log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG) - - return None - - @property - def partition_type(self) -> str: - return self._block_info.pttype - - @property - def uuid(self) -> str: - return self._block_info.ptuuid - - @property - def size(self) -> float: - from .helpers import convert_size_to_gb - return convert_size_to_gb(self._block_info.size) - - @property - def bus_type(self) -> Optional[str]: - return self._block_info.tran - - @property - def spinning(self) -> bool: - return self._block_info.rota - - @property - def partitions(self) -> Dict[str, 'Partition']: - return OrderedDict(sorted(self._partitions.items())) - - @property - def partition(self) -> List['Partition']: - return list(self.partitions.values()) - - @property - def first_free_sector(self) -> str: - if block_size := self._largest_free_space(): - return block_size.start - else: - return '512MB' - - @property - def first_end_sector(self) -> str: - if block_size := self._largest_free_space(): - return block_size.end - else: - return f"{self.size}GB" - - def _safe_free_space(self) -> str: - if self._block_info.free_space: - sizes = [free_space.size for free_space in self._block_info.free_space] - return '+'.join(sizes) - return '?' - - def _largest_free_space(self) -> Optional[BlockSizeInfo]: - if self._block_info.free_space: - sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True) - return sorted_sizes[0] - return None - - def _partprobe(self) -> bool: - return SysCommand(['partprobe', self._path]).exit_code == 0 - - def flush_cache(self) -> None: - self._load_partitions() - - def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition: - if not uuid and not partuuid: - raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.") - - log(f"Retrieving partition PARTUUID={partuuid} or UUID={uuid}", level=logging.DEBUG, fg="gray") - - for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)): - for partition_index, partition in self.partitions.items(): - try: - if uuid and partition.uuid and partition.uuid.lower() == uuid.lower(): - log(f"Matched UUID={uuid} against {partition.uuid}", level=logging.DEBUG, fg="gray") - return partition - elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower(): - log(f"Matched PARTUUID={partuuid} against {partition.part_uuid}", level=logging.DEBUG, fg="gray") - return partition - except DiskError as error: - # Most likely a blockdevice that doesn't support or use UUID's - # (like Microsoft recovery partition) - log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray") - pass - - log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) - self.flush_cache() - time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) - - log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) - log(f"Cache: {self._partitions}") - log(f"Partitions: {self.partitions.items()}") - raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.") diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py deleted file mode 100644 index a26e0160..00000000 --- a/archinstall/lib/disk/btrfs/__init__.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import annotations -import pathlib -import glob -import logging -from typing import Union, Dict, TYPE_CHECKING - -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from ...installer import Installer - -from .btrfs_helpers import ( - subvolume_info_from_path as subvolume_info_from_path, - find_parent_subvolume as find_parent_subvolume, - setup_subvolumes as setup_subvolumes, - mount_subvolume as mount_subvolume -) -from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume -from .btrfspartition import BTRFSPartition as BTRFSPartition - -from ...exceptions import DiskError, Deprecated -from ...general import SysCommand -from ...output import log - - -def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: - """ - This function uses btrfs to create a subvolume. - - @installation: archinstall.Installer instance - @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot - """ - - installation_mountpoint = installation.target - if type(installation_mountpoint) == str: - installation_mountpoint = pathlib.Path(installation_mountpoint) - # Set up the required physical structure - if type(subvolume_location) == str: - subvolume_location = pathlib.Path(subvolume_location) - - target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor) - - # Difference from mount_subvolume: - # We only check if the parent exists, since we'll run in to "target path already exists" otherwise - if not target.parent.exists(): - target.parent.mkdir(parents=True) - - if glob.glob(str(target / '*')): - raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)") - - # Remove the target if it exists - if target.exists(): - target.rmdir() - - log(f"Creating a subvolume on {target}", level=logging.INFO) - if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: - raise DiskError(f"Could not create a subvolume at {target}: {cmd}") diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py deleted file mode 100644 index f6d2734a..00000000 --- a/archinstall/lib/disk/btrfs/btrfs_helpers.py +++ /dev/null @@ -1,136 +0,0 @@ -import logging -import re -from pathlib import Path -from typing import Optional, Dict, Any, TYPE_CHECKING - -from ...models.subvolume import Subvolume -from ...exceptions import SysCallError, DiskError -from ...general import SysCommand -from ...output import log -from ...plugins import plugins -from ..helpers import get_mount_info -from .btrfssubvolumeinfo import BtrfsSubvolumeInfo - -if TYPE_CHECKING: - from .btrfspartition import BTRFSPartition - from ...installer import Installer - - -class fstab_btrfs_compression_plugin(): - def __init__(self, partition_dict): - self.partition_dict = partition_dict - - def on_genfstab(self, installation): - with open(f"{installation.target}/etc/fstab", 'r') as fh: - fstab = fh.read() - - # Replace the {installation}/etc/fstab with entries - # using the compress=zstd where the mountpoint has compression set. - with open(f"{installation.target}/etc/fstab", 'w') as fh: - for line in fstab.split('\n'): - # So first we grab the mount options by using subvol=.*? as a locator. - # And we also grab the mountpoint for the entry, for instance /var/log - if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)): - for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []): - # We then locate the correct subvolume and check if it's compressed - if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip(): - # We then sneak in the compress=zstd option if it doesn't already exist: - # We skip entries where compression is already defined - if ',compress=zstd,' not in line: - line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}") - break - - fh.write(f"{line}\n") - - return True - - -def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume): - # we normalize the subvolume name (getting rid of slash at the start if exists. - # In our implementation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = subvolume.name.lstrip('/') - mountpoint = Path(subvolume.mountpoint) - installation_target = Path(installation.target) - - mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) - mountpoint.mkdir(parents=True, exist_ok=True) - mount_options = subvolume.options + [f'subvol={name}'] - - log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") - SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") - - -def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]): - log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") - - for subvolume in partition_dict['btrfs']['subvolumes']: - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = subvolume.name.lstrip('/') - - # We create the subvolume using the BTRFSPartition instance. - # That way we ensure not only easy access, but also accurate mount locations etc. - partition_dict['device_instance'].create_subvolume(name, installation=installation) - - # Make the nodatacow processing now - # It will be the main cause of creation of subvolumes which are not to be mounted - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if subvolume.nodatacow: - if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - - # Make the compress processing now - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - # in this way only zstd compression is activaded - # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - - if subvolume.compress: - if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - - if 'fstab_btrfs_compression_plugin' not in plugins: - plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict) - - -def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]: - try: - subvolume_name = '' - result = {} - for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): - if index == 0: - subvolume_name = line.strip().decode('UTF-8') - continue - - if b':' in line: - key, value = line.strip().decode('UTF-8').split(':', 1) - - # A bit of a hack, until I figure out how @dataclass - # allows for hooking in a pre-processor to do this we have to do it here: - result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() - - return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore - except SysCallError as error: - log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange") - - return None - - -def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]: - # A root path cannot have a parent - if str(path) == '/': - return None - - if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters): - if not (subvolume := subvolume_info_from_path(found_mount['target'])): - if found_mount['target'] == '/': - return None - - return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']]) - - return subvolume - - return None diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py deleted file mode 100644 index d04c9b98..00000000 --- a/archinstall/lib/disk/btrfs/btrfspartition.py +++ /dev/null @@ -1,109 +0,0 @@ -import glob -import pathlib -import logging -from typing import Optional, TYPE_CHECKING - -from ...exceptions import DiskError -from ...storage import storage -from ...output import log -from ...general import SysCommand -from ..partition import Partition -from ..helpers import findmnt -from .btrfs_helpers import ( - subvolume_info_from_path -) - -if TYPE_CHECKING: - from ...installer import Installer - from .btrfssubvolumeinfo import BtrfsSubvolumeInfo - - -class BTRFSPartition(Partition): - def __init__(self, *args, **kwargs): - Partition.__init__(self, *args, **kwargs) - - @property - def subvolumes(self): - for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): - if '[' in filesystem.get('source', ''): - yield subvolume_info_from_path(filesystem['target']) - - def iterate_children(struct): - for c in struct.get('children', []): - if '[' in child.get('source', ''): - yield subvolume_info_from_path(c['target']) - - for sub_child in iterate_children(c): - yield sub_child - - for child in iterate_children(filesystem): - yield child - - def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo': - """ - Subvolumes have to be created within a mountpoint. - This means we need to get the current installation target. - After we get it, we need to verify it is a btrfs subvolume filesystem. - Finally, the destination must be empty. - """ - - # Allow users to override the installation session - if not installation: - installation = storage.get('installation_session') - - # Determain if the path given, is an absolute path or a relative path. - # We do this by checking if the path contains a known mountpoint. - if str(subvolume)[0] == '/': - if filesystems := findmnt(subvolume, traverse=True).get('filesystems'): - if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target): - # Path starts with a known mountpoint which isn't / - # Which means it's an absolute path to a mounted location. - pass - else: - # Since it's not an absolute position with a known start. - # We omit the anchor ('/' basically) and make sure it's appendable - # to the installation.target later - subvolume = subvolume.relative_to(subvolume.anchor) - # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is. - - # If the subvolume is not absolute, then we do two checks: - # 1. Check if the partition itself is mounted somewhere, and use that as a root - # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs - # If both above fail, we need to warn the user that such setup is not supported. - if str(subvolume)[0] != '/': - if self.mountpoint is None and installation is None: - raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.") - elif self.mountpoint: - subvolume = self.mountpoint / subvolume - elif installation: - ongoing_installation_destination = installation.target - if type(ongoing_installation_destination) == str: - ongoing_installation_destination = pathlib.Path(ongoing_installation_destination) - - subvolume = ongoing_installation_destination / subvolume - - subvolume.parent.mkdir(parents=True, exist_ok=True) - - # <!-- - # We perform one more check from the given absolute position. - # And we traverse backwards in order to locate any if possible subvolumes above - # our new btrfs subvolume. This is because it needs to be mounted under it to properly - # function. - # if btrfs_parent := find_parent_subvolume(subvolume): - # print('Found parent:', btrfs_parent) - # --> - - log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey") - - if glob.glob(str(subvolume / '*')): - raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)") - # Ideally we would like to check if the destination is already a subvolume. - # But then we would need the mount-point at this stage as well. - # So we'll comment out this check: - # elif subvolinfo := subvolume_info_from_path(subvolume): - # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") - - # And deal with it here: - SysCommand(f"btrfs subvolume create {subvolume}") - - return subvolume_info_from_path(subvolume) diff --git a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py deleted file mode 100644 index 5f5bdea6..00000000 --- a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py +++ /dev/null @@ -1,192 +0,0 @@ -import pathlib -import datetime -import logging -import string -import random -import shutil -from dataclasses import dataclass -from typing import Optional, List# , TYPE_CHECKING -from functools import cached_property - -# if TYPE_CHECKING: -# from ..blockdevice import BlockDevice - -from ...exceptions import DiskError -from ...general import SysCommand -from ...output import log -from ...storage import storage - - -@dataclass -class BtrfsSubvolumeInfo: - full_path :pathlib.Path - name :str - uuid :str - parent_uuid :str - creation_time :datetime.datetime - subvolume_id :int - generation :int - gen_at_creation :int - parent_id :int - top_level_id :int - send_transid :int - send_time :datetime.datetime - receive_transid :int - received_uuid :Optional[str] = None - flags :Optional[str] = None - receive_time :Optional[datetime.datetime] = None - snapshots :Optional[List] = None - - def __post_init__(self): - self.full_path = pathlib.Path(self.full_path) - - # Convert "-" entries to `None` - if self.parent_uuid == "-": - self.parent_uuid = None - if self.received_uuid == "-": - self.received_uuid = None - if self.flags == "-": - self.flags = None - if self.receive_time == "-": - self.receive_time = None - if self.snapshots == "": - self.snapshots = [] - - # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats) - self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time)) - self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time)) - if self.receive_time: - self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time)) - - @property - def parent_subvolume(self): - from .btrfs_helpers import find_parent_subvolume - - return find_parent_subvolume(self.full_path) - - @property - def root(self) -> bool: - from .btrfs_helpers import subvolume_info_from_path - - # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first - # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. - # It would also be nice if it could use findmnt(self.full_path) and traverse backwards - # finding the last occurrence of a subvolume which 'self' belongs to. - if volume := subvolume_info_from_path(storage['MOUNT_POINT']): - return self.full_path == volume.full_path - - return False - - @cached_property - def partition(self): - from ..helpers import findmnt, get_parent_of_partition, all_blockdevices - from ..partition import Partition - from ..blockdevice import BlockDevice - from ..mapperdev import MapperDev - from .btrfspartition import BTRFSPartition - from .btrfs_helpers import subvolume_info_from_path - - try: - # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device. - if filesystem := findmnt(self.full_path).get('filesystems', []): - if source := filesystem[0].get('source', None): - # Strip away subvolume definitions from findmnt - if '[' in source: - source = source[:source.find('[')] - - if filesystem[0].get('fstype', '') == 'btrfs': - return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - elif filesystem[0].get('source', '').startswith('/dev/mapper'): - return MapperDev(source) - else: - return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - except DiskError: - # Subvolume has never been mounted, we have no reliable way of finding where it is. - # But we have the UUID of the partition, and can begin looking for it by mounting - # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices. - - log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING) - for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items(): - if type(instance) in (Partition, MapperDev): - we_mounted_it = False - detection_mountpoint = instance.mountpoint - if not detection_mountpoint: - if type(instance) == Partition and instance.encrypted: - # TODO: Perhaps support unlocking encrypted volumes? - # This will cause a lot of potential user interactions tho. - log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG) - continue - - detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}") - detection_mountpoint.mkdir(parents=True, exist_ok=True) - - instance.mount(str(detection_mountpoint)) - we_mounted_it = True - - if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])): - if subvolume := subvolume_info_from_path(filesystem[0]['target']): - if subvolume.uuid == self.uuid: - # The top level subvolume matched of ourselves, - # which means the instance we're iterating has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - def iterate_children(struct): - for child in struct.get('children', []): - if '[' in child.get('source', ''): - yield subvolume_info_from_path(child['target']) - - for sub_child in iterate_children(child): - yield sub_child - - for child in iterate_children(filesystem[0]): - if child.uuid == self.uuid: - # We found a child within the instance that has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - if we_mounted_it: - instance.unmount() - shutil.rmtree(detection_mountpoint) - - @cached_property - def mount_options(self) -> Optional[List[str]]: - from ..helpers import findmnt - - if filesystem := findmnt(self.full_path).get('filesystems', []): - return filesystem[0].get('options').split(',') - - def convert_to_ISO_format(self, time_string): - time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '') - iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}" - return iso_string - - def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True): - from ..helpers import findmnt - - try: - if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False): - log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}") - SysCommand(f"umount {mountpoint}") - except DiskError: - # No previously mounted device at the mountpoint - pass - - if not options: - options = [] - - try: - if include_previously_known_options and (cached_options := self.mount_options): - options += cached_options - except DiskError: - pass - - if not any('subvol=' in x for x in options): - options += f'subvol={self.name}' - - SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}") - log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray") - - def unmount(self, recurse :bool = True): - SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") - log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py new file mode 100644 index 00000000..7ba70382 --- /dev/null +++ b/archinstall/lib/disk/device_handler.py @@ -0,0 +1,809 @@ +from __future__ import annotations + +import json +import os +import logging +import time +from pathlib import Path +from typing import List, Dict, Any, Optional, TYPE_CHECKING, Literal, Iterable + +from parted import ( # type: ignore + Disk, Geometry, FileSystem, + PartitionException, DiskLabelException, + getDevice, getAllDevices, freshDisk, Partition, Device +) + +from .device_model import ( + DeviceModification, PartitionModification, + BDevice, _DeviceInfo, _PartitionInfo, + FilesystemType, Unit, PartitionTable, + ModificationStatus, get_lsblk_info, LsblkInfo, + _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption, LvmVolumeGroup, LvmVolume, Size, LvmGroupInfo, + SectorSize, LvmVolumeInfo, LvmPVInfo, SubvolumeModification, BtrfsMountOption +) + +from ..exceptions import DiskError, UnknownFilesystemFormat +from ..general import SysCommand, SysCallError, JSON, SysCommandWorker +from ..luks import Luks2 +from ..output import debug, error, info, warn, log +from ..utils.util import is_subpath + +if TYPE_CHECKING: + _: Any + + +class DeviceHandler(object): + _TMP_BTRFS_MOUNT = Path('/mnt/arch_btrfs') + + def __init__(self): + self._devices: Dict[Path, BDevice] = {} + self.load_devices() + + @property + def devices(self) -> List[BDevice]: + return list(self._devices.values()) + + def load_devices(self): + block_devices = {} + + devices = getAllDevices() + + try: + loop_devices = SysCommand(['losetup', '-a']) + for ld_info in str(loop_devices).splitlines(): + loop_device = getDevice(ld_info.split(':', maxsplit=1)[0]) + devices.append(loop_device) + except Exception as err: + debug(f'Failed to get loop devices: {err}') + + for device in devices: + if get_lsblk_info(device.path).type == 'rom': + continue + + try: + disk = Disk(device) + except DiskLabelException as err: + if 'unrecognised disk label' in getattr(error, 'message', str(err)): + disk = freshDisk(device, PartitionTable.GPT.value) + else: + debug(f'Unable to get disk from device: {device}') + continue + + device_info = _DeviceInfo.from_disk(disk) + partition_infos = [] + + for partition in disk.partitions: + lsblk_info = get_lsblk_info(partition.path) + fs_type = self._determine_fs_type(partition, lsblk_info) + subvol_infos = [] + + if fs_type == FilesystemType.Btrfs: + subvol_infos = self.get_btrfs_info(partition.path) + + partition_infos.append( + _PartitionInfo.from_partition( + partition, + fs_type, + lsblk_info.partn, + lsblk_info.partuuid, + lsblk_info.uuid, + lsblk_info.mountpoints, + subvol_infos + ) + ) + + block_device = BDevice(disk, device_info, partition_infos) + block_devices[block_device.device_info.path] = block_device + + self._devices = block_devices + + def _determine_fs_type( + self, + partition: Partition, + lsblk_info: Optional[LsblkInfo] = None + ) -> Optional[FilesystemType]: + try: + if partition.fileSystem: + return FilesystemType(partition.fileSystem.type) + elif lsblk_info is not None: + return FilesystemType(lsblk_info.fstype) if lsblk_info.fstype else None + return None + except ValueError: + debug(f'Could not determine the filesystem: {partition.fileSystem}') + + return None + + def get_device(self, path: Path) -> Optional[BDevice]: + return self._devices.get(path, None) + + def get_device_by_partition_path(self, partition_path: Path) -> Optional[BDevice]: + partition = self.find_partition(partition_path) + if partition: + device: Device = partition.disk.device + return self.get_device(Path(device.path)) + return None + + def find_partition(self, path: Path) -> Optional[_PartitionInfo]: + for device in self._devices.values(): + part = next(filter(lambda x: str(x.path) == str(path), device.partition_infos), None) + if part is not None: + return part + return None + + def get_parent_device_path(self, dev_path: Path) -> Path: + lsblk = get_lsblk_info(dev_path) + return Path(f'/dev/{lsblk.pkname}') + + def get_unique_path_for_device(self, dev_path: Path) -> Optional[Path]: + paths = Path('/dev/disk/by-id').glob('*') + linked_targets = {p.resolve(): p for p in paths} + linked_wwn_targets = {p: linked_targets[p] for p in linked_targets + if p.name.startswith('wwn-') or p.name.startswith('nvme-eui.')} + + if dev_path in linked_wwn_targets: + return linked_wwn_targets[dev_path] + + if dev_path in linked_targets: + return linked_targets[dev_path] + + return None + + def get_uuid_for_path(self, path: Path) -> Optional[str]: + partition = self.find_partition(path) + return partition.partuuid if partition else None + + def get_btrfs_info(self, dev_path: Path) -> List[_BtrfsSubvolumeInfo]: + lsblk_info = get_lsblk_info(dev_path) + subvol_infos: List[_BtrfsSubvolumeInfo] = [] + + if not lsblk_info.mountpoint: + self.mount(dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) + mountpoint = self._TMP_BTRFS_MOUNT + else: + # when multiple subvolumes are mounted then the lsblk output may look like + # "mountpoint": "/mnt/archinstall/.snapshots" + # "mountpoints": ["/mnt/archinstall/.snapshots", "/mnt/archinstall/home", ..] + # so we'll determine the minimum common path and assume that's the root + path_strings = [str(m) for m in lsblk_info.mountpoints] + common_prefix = os.path.commonprefix(path_strings) + mountpoint = Path(common_prefix) + + try: + result = SysCommand(f'btrfs subvolume list {mountpoint}').decode() + except SysCallError as err: + debug(f'Failed to read btrfs subvolume information: {err}') + return subvol_infos + + try: + # ID 256 gen 16 top level 5 path @ + for line in result.splitlines(): + # expected output format: + # ID 257 gen 8 top level 5 path @home + name = Path(line.split(' ')[-1]) + sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None) + subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint)) + except json.decoder.JSONDecodeError as err: + error(f"Could not decode lsblk JSON: {result}") + raise err + + if not lsblk_info.mountpoint: + self.umount(dev_path) + + return subvol_infos + + def format( + self, + fs_type: FilesystemType, + path: Path, + additional_parted_options: List[str] = [] + ): + options = [] + command = '' + + match fs_type: + case FilesystemType.Btrfs: + options += ['-f'] + command += 'mkfs.btrfs' + case FilesystemType.Fat16: + options += ['-F16'] + command += 'mkfs.fat' + case FilesystemType.Fat32: + options += ['-F32'] + command += 'mkfs.fat' + case FilesystemType.Ext2: + options += ['-F'] + command += 'mkfs.ext2' + case FilesystemType.Ext3: + options += ['-F'] + command += 'mkfs.ext3' + case FilesystemType.Ext4: + options += ['-F'] + command += 'mkfs.ext4' + case FilesystemType.Xfs: + options += ['-f'] + command += 'mkfs.xfs' + case FilesystemType.F2fs: + options += ['-f'] + command += 'mkfs.f2fs' + case FilesystemType.Ntfs: + options += ['-f', '-Q'] + command += 'mkfs.ntfs' + case FilesystemType.Reiserfs: + command += 'mkfs.reiserfs' + case _: + raise UnknownFilesystemFormat(f'Filetype "{fs_type.value}" is not supported') + + options += additional_parted_options + options_str = ' '.join(options) + + debug(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') + + try: + SysCommand(f"/usr/bin/{command} {options_str} {path}") + except SysCallError as err: + msg = f'Could not format {path} with {fs_type.value}: {err.message}' + error(msg) + raise DiskError(msg) from err + + def encrypt( + self, + dev_path: Path, + mapper_name: Optional[str], + enc_password: str, + lock_after_create: bool = True + ) -> Luks2: + luks_handler = Luks2( + dev_path, + mapper_name=mapper_name, + password=enc_password + ) + + key_file = luks_handler.encrypt() + + luks_handler.unlock(key_file=key_file) + + if not luks_handler.mapper_dev: + raise DiskError('Failed to unlock luks device') + + if lock_after_create: + debug(f'luks2 locking device: {dev_path}') + luks_handler.lock() + + return luks_handler + + def format_encrypted( + self, + dev_path: Path, + mapper_name: Optional[str], + fs_type: FilesystemType, + enc_conf: DiskEncryption + ): + luks_handler = Luks2( + dev_path, + mapper_name=mapper_name, + password=enc_conf.encryption_password + ) + + key_file = luks_handler.encrypt() + + luks_handler.unlock(key_file=key_file) + + if not luks_handler.mapper_dev: + raise DiskError('Failed to unlock luks device') + + info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}') + self.format(fs_type, luks_handler.mapper_dev) + + info(f'luks2 locking device: {dev_path}') + luks_handler.lock() + + def _lvm_info( + self, + cmd: str, + info_type: Literal['lv', 'vg', 'pvseg'] + ) -> Optional[Any]: + raw_info = SysCommand(cmd).decode().split('\n') + + # for whatever reason the output sometimes contains + # "File descriptor X leaked leaked on vgs invocation + data = '\n'.join([raw for raw in raw_info if 'File descriptor' not in raw]) + + debug(f'LVM info: {data}') + + reports = json.loads(data) + + for report in reports['report']: + if len(report[info_type]) != 1: + raise ValueError(f'Report does not contain any entry') + + entry = report[info_type][0] + + match info_type: + case 'pvseg': + return LvmPVInfo( + pv_name=Path(entry['pv_name']), + lv_name=entry['lv_name'], + vg_name=entry['vg_name'], + ) + case 'lv': + return LvmVolumeInfo( + lv_name=entry['lv_name'], + vg_name=entry['vg_name'], + lv_size=Size(int(entry[f'lv_size'][:-1]), Unit.B, SectorSize.default()) + ) + case 'vg': + return LvmGroupInfo( + vg_uuid=entry['vg_uuid'], + vg_size=Size(int(entry[f'vg_size'][:-1]), Unit.B, SectorSize.default()) + ) + + return None + + def _lvm_info_with_retry(self, cmd: str, info_type: Literal['lv', 'vg', 'pvseg']) -> Optional[Any]: + attempts = 3 + + for attempt_nr in range(attempts): + try: + return self._lvm_info(cmd, info_type) + except ValueError: + time.sleep(attempt_nr + 1) + + raise ValueError(f'Failed to fetch {info_type} information') + + def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]: + cmd = ( + 'lvs --reportformat json ' + '--unit B ' + f'-S lv_name={lv_name}' + ) + + return self._lvm_info_with_retry(cmd, 'lv') + + def lvm_group_info(self, vg_name: str) -> Optional[LvmGroupInfo]: + cmd = ( + 'vgs --reportformat json ' + '--unit B ' + '-o vg_name,vg_uuid,vg_size ' + f'-S vg_name={vg_name}' + ) + + return self._lvm_info_with_retry(cmd, 'vg') + + def lvm_pvseg_info(self, vg_name: str, lv_name: str) -> Optional[LvmPVInfo]: + cmd = ( + 'pvs ' + '--segments -o+lv_name,vg_name ' + f'-S vg_name={vg_name},lv_name={lv_name} ' + '--reportformat json ' + ) + + return self._lvm_info_with_retry(cmd, 'pvseg') + + def lvm_vol_change(self, vol: LvmVolume, activate: bool): + active_flag = 'y' if activate else 'n' + cmd = f'lvchange -a {active_flag} {vol.safe_dev_path}' + + debug(f'lvchange volume: {cmd}') + SysCommand(cmd) + + def lvm_export_vg(self, vg: LvmVolumeGroup): + cmd = f'vgexport {vg.name}' + + debug(f'vgexport: {cmd}') + SysCommand(cmd) + + def lvm_import_vg(self, vg: LvmVolumeGroup): + cmd = f'vgimport {vg.name}' + + debug(f'vgimport: {cmd}') + SysCommand(cmd) + + def lvm_vol_reduce(self, vol_path: Path, amount: Size): + val = amount.format_size(Unit.B, include_unit=False) + cmd = f'lvreduce -L -{val}B {vol_path}' + + debug(f'Reducing LVM volume size: {cmd}') + SysCommand(cmd) + + def lvm_pv_create(self, pvs: Iterable[Path]): + cmd = 'pvcreate ' + ' '.join([str(pv) for pv in pvs]) + debug(f'Creating LVM PVS: {cmd}') + + worker = SysCommandWorker(cmd) + worker.poll() + worker.write(b'y\n', line_ending=False) + + def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str): + pvs_str = ' '.join([str(pv) for pv in pvs]) + cmd = f'vgcreate --yes {vg_name} {pvs_str}' + + debug(f'Creating LVM group: {cmd}') + + worker = SysCommandWorker(cmd) + worker.poll() + worker.write(b'y\n', line_ending=False) + + def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None): + if offset is not None: + length = volume.length - offset + else: + length = volume.length + + length_str = length.format_size(Unit.B, include_unit=False) + cmd = f'lvcreate --yes -L {length_str}B {vg_name} -n {volume.name}' + + debug(f'Creating volume: {cmd}') + + worker = SysCommandWorker(cmd) + worker.poll() + worker.write(b'y\n', line_ending=False) + + volume.vg_name = vg_name + volume.dev_path = Path(f'/dev/{vg_name}/{volume.name}') + + def _setup_partition( + self, + part_mod: PartitionModification, + block_device: BDevice, + disk: Disk, + requires_delete: bool + ): + # when we require a delete and the partition to be (re)created + # already exists then we have to delete it first + if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]: + info(f'Delete existing partition: {part_mod.safe_dev_path}') + part_info = self.find_partition(part_mod.safe_dev_path) + + if not part_info: + raise DiskError(f'No partition for dev path found: {part_mod.safe_dev_path}') + + disk.deletePartition(part_info.partition) + + if part_mod.status == ModificationStatus.Delete: + return + + start_sector = part_mod.start.convert( + Unit.sectors, + block_device.device_info.sector_size + ) + + length_sector = part_mod.length.convert( + Unit.sectors, + block_device.device_info.sector_size + ) + + geometry = Geometry( + device=block_device.disk.device, + start=start_sector.value, + length=length_sector.value + ) + + filesystem = FileSystem(type=part_mod.safe_fs_type.value, geometry=geometry) + + partition = Partition( + disk=disk, + type=part_mod.type.get_partition_code(), + fs=filesystem, + geometry=geometry + ) + + for flag in part_mod.flags: + partition.setFlag(flag.value) + + debug(f'\tType: {part_mod.type.value}') + debug(f'\tFilesystem: {part_mod.safe_fs_type.value}') + debug(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length') + + try: + disk.addPartition(partition=partition, constraint=disk.device.optimalAlignedConstraint) + except PartitionException as ex: + raise DiskError(f'Unable to add partition, most likely due to overlapping sectors: {ex}') from ex + + # the partition has a path now that it has been added + part_mod.dev_path = Path(partition.path) + + def fetch_part_info(self, path: Path) -> LsblkInfo: + lsblk_info = get_lsblk_info(path) + + if not lsblk_info.partn: + debug(f'Unable to determine new partition number: {path}\n{lsblk_info}') + raise DiskError(f'Unable to determine new partition number: {path}') + + if not lsblk_info.partuuid: + debug(f'Unable to determine new partition uuid: {path}\n{lsblk_info}') + raise DiskError(f'Unable to determine new partition uuid: {path}') + + if not lsblk_info.uuid: + debug(f'Unable to determine new uuid: {path}\n{lsblk_info}') + raise DiskError(f'Unable to determine new uuid: {path}') + + debug(f'partition information found: {lsblk_info.json()}') + + return lsblk_info + + def create_lvm_btrfs_subvolumes( + self, + path: Path, + btrfs_subvols: List[SubvolumeModification], + mount_options: List[str] + ): + info(f'Creating subvolumes: {path}') + + self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) + + for sub_vol in btrfs_subvols: + debug(f'Creating subvolume: {sub_vol.name}') + + subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name + + SysCommand(f"btrfs subvolume create {subvol_path}") + + if BtrfsMountOption.nodatacow.value in mount_options: + try: + SysCommand(f'chattr +C {subvol_path}') + except SysCallError as err: + raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}') + + if BtrfsMountOption.compress.value in mount_options: + try: + SysCommand(f'chattr +c {subvol_path}') + except SysCallError as err: + raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}') + + self.umount(path) + + def create_btrfs_volumes( + self, + part_mod: PartitionModification, + enc_conf: Optional['DiskEncryption'] = None + ): + info(f'Creating subvolumes: {part_mod.safe_dev_path}') + + luks_handler = None + + # unlock the partition first if it's encrypted + if enc_conf is not None and part_mod in enc_conf.partitions: + if not part_mod.mapper_name: + raise ValueError('No device path specified for modification') + + luks_handler = self.unlock_luks2_dev( + part_mod.safe_dev_path, + part_mod.mapper_name, + enc_conf.encryption_password + ) + + if not luks_handler.mapper_dev: + raise DiskError('Failed to unlock luks device') + + self.mount( + luks_handler.mapper_dev, + self._TMP_BTRFS_MOUNT, + create_target_mountpoint=True, + options=part_mod.mount_options + ) + else: + self.mount( + part_mod.safe_dev_path, + self._TMP_BTRFS_MOUNT, + create_target_mountpoint=True, + options=part_mod.mount_options + ) + + for sub_vol in part_mod.btrfs_subvols: + debug(f'Creating subvolume: {sub_vol.name}') + + if luks_handler is not None: + subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name + else: + subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name + + SysCommand(f"btrfs subvolume create {subvol_path}") + + if luks_handler is not None and luks_handler.mapper_dev is not None: + self.umount(luks_handler.mapper_dev) + luks_handler.lock() + else: + self.umount(part_mod.safe_dev_path) + + def unlock_luks2_dev(self, dev_path: Path, mapper_name: str, enc_password: str) -> Luks2: + luks_handler = Luks2(dev_path, mapper_name=mapper_name, password=enc_password) + + if not luks_handler.is_unlocked(): + luks_handler.unlock() + + if not luks_handler.is_unlocked(): + raise DiskError(f'Failed to unlock luks2 device: {dev_path}') + + return luks_handler + + def umount_all_existing(self, device_path: Path): + debug(f'Unmounting all existing partitions: {device_path}') + + existing_partitions = self._devices[device_path].partition_infos + + for partition in existing_partitions: + debug(f'Unmounting: {partition.path}') + + # un-mount for existing encrypted partitions + if partition.fs_type == FilesystemType.Crypto_luks: + Luks2(partition.path).lock() + else: + self.umount(partition.path, recursive=True) + + def partition( + self, + modification: DeviceModification, + partition_table: Optional[PartitionTable] = None + ): + """ + Create a partition table on the block device and create all partitions. + """ + if modification.wipe: + if partition_table is None: + raise ValueError('Modification is marked as wipe but no partitioning table was provided') + + if partition_table.MBR and len(modification.partitions) > 3: + raise DiskError('Too many partitions on disk, MBR disks can only have 3 primary partitions') + + # make sure all devices are unmounted + self.umount_all_existing(modification.device_path) + + # WARNING: the entire device will be wiped and all data lost + if modification.wipe: + self.wipe_dev(modification.device) + part_table = partition_table.value if partition_table else None + disk = freshDisk(modification.device.disk.device, part_table) + else: + info(f'Use existing device: {modification.device_path}') + disk = modification.device.disk + + info(f'Creating partitions: {modification.device_path}') + + # don't touch existing partitions + filtered_part = [p for p in modification.partitions if not p.exists()] + + for part_mod in filtered_part: + # if the entire disk got nuked then we don't have to delete + # any existing partitions anymore because they're all gone already + requires_delete = modification.wipe is False + self._setup_partition(part_mod, modification.device, disk, requires_delete=requires_delete) + + disk.commit() + + def mount( + self, + dev_path: Path, + target_mountpoint: Path, + mount_fs: Optional[str] = None, + create_target_mountpoint: bool = True, + options: List[str] = [] + ): + if create_target_mountpoint and not target_mountpoint.exists(): + target_mountpoint.mkdir(parents=True, exist_ok=True) + + if not target_mountpoint.exists(): + raise ValueError('Target mountpoint does not exist') + + lsblk_info = get_lsblk_info(dev_path) + if target_mountpoint in lsblk_info.mountpoints: + info(f'Device already mounted at {target_mountpoint}') + return + + cmd = ['mount'] + + if len(options): + cmd.extend(('-o', ','.join(options))) + if mount_fs: + cmd.extend(('-t', mount_fs)) + + cmd.extend((str(dev_path), str(target_mountpoint))) + + command = ' '.join(cmd) + + debug(f'Mounting {dev_path}: {command}') + + try: + SysCommand(command) + except SysCallError as err: + raise DiskError(f'Could not mount {dev_path}: {command}\n{err.message}') + + def umount(self, mountpoint: Path, recursive: bool = False): + try: + lsblk_info = get_lsblk_info(mountpoint) + except SysCallError as ex: + # this could happen if before partitioning the device contained 3 partitions + # and after partitioning only 2 partitions were created, then the modifications object + # will have a reference to /dev/sX3 which is being tried to umount here now + if 'not a block device' in ex.message: + return + raise ex + + if len(lsblk_info.mountpoints) > 0: + debug(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}') + + for mountpoint in lsblk_info.mountpoints: + debug(f'Unmounting mountpoint: {mountpoint}') + + command = 'umount' + + if recursive: + command += ' -R' + + SysCommand(f'{command} {mountpoint}') + + def detect_pre_mounted_mods(self, base_mountpoint: Path) -> List[DeviceModification]: + part_mods: Dict[Path, List[PartitionModification]] = {} + + for device in self.devices: + for part_info in device.partition_infos: + for mountpoint in part_info.mountpoints: + if is_subpath(mountpoint, base_mountpoint): + path = Path(part_info.disk.device.path) + part_mods.setdefault(path, []) + part_mod = PartitionModification.from_existing_partition(part_info) + if part_mod.mountpoint: + part_mod.mountpoint = mountpoint.root / mountpoint.relative_to(base_mountpoint) + else: + for subvol in part_mod.btrfs_subvols: + if sm := subvol.mountpoint: + subvol.mountpoint = sm.root / sm.relative_to(base_mountpoint) + part_mods[path].append(part_mod) + break + + device_mods: List[DeviceModification] = [] + for device_path, mods in part_mods.items(): + device_mod = DeviceModification(self._devices[device_path], False, mods) + device_mods.append(device_mod) + + return device_mods + + def partprobe(self, path: Optional[Path] = None): + if path is not None: + command = f'partprobe {path}' + else: + command = 'partprobe' + + try: + debug(f'Calling partprobe: {command}') + SysCommand(command) + except SysCallError as err: + if 'have been written, but we have been unable to inform the kernel of the change' in str(err): + log(f"Partprobe was not able to inform the kernel of the new disk state (ignoring error): {err}", fg="gray", level=logging.INFO) + else: + error(f'"{command}" failed to run (continuing anyway): {err}') + + def _wipe(self, dev_path: Path): + """ + Wipe a device (partition or otherwise) of meta-data, be it file system, LVM, etc. + @param dev_path: Device path of the partition to be wiped. + @type dev_path: str + """ + with open(dev_path, 'wb') as p: + p.write(bytearray(1024)) + + def wipe_dev(self, block_device: BDevice): + """ + Wipe the block device of meta-data, be it file system, LVM, etc. + This is not intended to be secure, but rather to ensure that + auto-discovery tools don't recognize anything here. + """ + info(f'Wiping partitions and metadata: {block_device.device_info.path}') + for partition in block_device.partition_infos: + self._wipe(partition.path) + + self._wipe(block_device.device_info.path) + + +device_handler = DeviceHandler() + + +def disk_layouts() -> str: + try: + lsblk_info = get_all_lsblk_info() + return json.dumps(lsblk_info, indent=4, sort_keys=True, cls=JSON) + except SysCallError as err: + warn(f"Could not return disk layouts: {err}") + return '' + except json.decoder.JSONDecodeError as err: + warn(f"Could not return disk layouts: {err}") + return '' diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py new file mode 100644 index 00000000..f98d05fb --- /dev/null +++ b/archinstall/lib/disk/device_model.py @@ -0,0 +1,1499 @@ +from __future__ import annotations + +import dataclasses +import json +import math +import uuid +from dataclasses import dataclass, field +from enum import Enum +from enum import auto +from pathlib import Path +from typing import Optional, List, Dict, TYPE_CHECKING, Any +from typing import Union + +import parted # type: ignore +import _ped # type: ignore +from parted import Disk, Geometry, Partition + +from ..exceptions import DiskError, SysCallError +from ..general import SysCommand +from ..output import debug, error +from ..storage import storage + +if TYPE_CHECKING: + _: Any + + +class DiskLayoutType(Enum): + Default = 'default_layout' + Manual = 'manual_partitioning' + Pre_mount = 'pre_mounted_config' + + def display_msg(self) -> str: + match self: + case DiskLayoutType.Default: return str(_('Use a best-effort default partition layout')) + case DiskLayoutType.Manual: return str(_('Manual Partitioning')) + case DiskLayoutType.Pre_mount: return str(_('Pre-mounted configuration')) + + +@dataclass +class DiskLayoutConfiguration: + config_type: DiskLayoutType + device_modifications: List[DeviceModification] = field(default_factory=list) + lvm_config: Optional[LvmConfiguration] = None + + # used for pre-mounted config + mountpoint: Optional[Path] = None + + def json(self) -> Dict[str, Any]: + if self.config_type == DiskLayoutType.Pre_mount: + return { + 'config_type': self.config_type.value, + 'mountpoint': str(self.mountpoint) + } + else: + config: Dict[str, Any] = { + 'config_type': self.config_type.value, + 'device_modifications': [mod.json() for mod in self.device_modifications], + } + + if self.lvm_config: + config['lvm_config'] = self.lvm_config.json() + + return config + + @classmethod + def parse_arg(cls, disk_config: Dict[str, Any]) -> Optional[DiskLayoutConfiguration]: + from .device_handler import device_handler + + device_modifications: List[DeviceModification] = [] + config_type = disk_config.get('config_type', None) + + if not config_type: + raise ValueError('Missing disk layout configuration: config_type') + + config = DiskLayoutConfiguration( + config_type=DiskLayoutType(config_type), + device_modifications=device_modifications + ) + + if config_type == DiskLayoutType.Pre_mount.value: + if not (mountpoint := disk_config.get('mountpoint')): + raise ValueError('Must set a mountpoint when layout type is pre-mount') + + path = Path(str(mountpoint)) + + mods = device_handler.detect_pre_mounted_mods(path) + device_modifications.extend(mods) + + storage['MOUNT_POINT'] = path + + config.mountpoint = path + + return config + + for entry in disk_config.get('device_modifications', []): + device_path = Path(entry.get('device', None)) if entry.get('device', None) else None + + if not device_path: + continue + + device = device_handler.get_device(device_path) + + if not device: + continue + + device_modification = DeviceModification( + wipe=entry.get('wipe', False), + device=device + ) + + device_partitions: List[PartitionModification] = [] + + for partition in entry.get('partitions', []): + device_partition = PartitionModification( + status=ModificationStatus(partition['status']), + fs_type=FilesystemType(partition['fs_type']) if partition.get('fs_type') else None, + start=Size.parse_args(partition['start']), + length=Size.parse_args(partition['size']), + mount_options=partition['mount_options'], + mountpoint=Path(partition['mountpoint']) if partition['mountpoint'] else None, + dev_path=Path(partition['dev_path']) if partition['dev_path'] else None, + type=PartitionType(partition['type']), + flags=[PartitionFlag[f] for f in partition.get('flags', [])], + btrfs_subvols=SubvolumeModification.parse_args(partition.get('btrfs', [])), + ) + # special 'invisible attr to internally identify the part mod + setattr(device_partition, '_obj_id', partition['obj_id']) + device_partitions.append(device_partition) + + device_modification.partitions = device_partitions + device_modifications.append(device_modification) + + # Parse LVM configuration from settings + if (lvm_arg := disk_config.get('lvm_config', None)) is not None: + config.lvm_config = LvmConfiguration.parse_arg(lvm_arg, config) + + return config + + +class PartitionTable(Enum): + GPT = 'gpt' + MBR = 'msdos' + + +class Unit(Enum): + B = 1 # byte + kB = 1000 ** 1 # kilobyte + MB = 1000 ** 2 # megabyte + GB = 1000 ** 3 # gigabyte + TB = 1000 ** 4 # terabyte + PB = 1000 ** 5 # petabyte + EB = 1000 ** 6 # exabyte + ZB = 1000 ** 7 # zettabyte + YB = 1000 ** 8 # yottabyte + + KiB = 1024 ** 1 # kibibyte + MiB = 1024 ** 2 # mebibyte + GiB = 1024 ** 3 # gibibyte + TiB = 1024 ** 4 # tebibyte + PiB = 1024 ** 5 # pebibyte + EiB = 1024 ** 6 # exbibyte + ZiB = 1024 ** 7 # zebibyte + YiB = 1024 ** 8 # yobibyte + + sectors = 'sectors' # size in sector + + @staticmethod + def get_all_units() -> List[str]: + return [u.name for u in Unit] + + @staticmethod + def get_si_units() -> List[Unit]: + return [u for u in Unit if 'i' not in u.name and u.name != 'sectors'] + + +@dataclass +class SectorSize: + value: int + unit: Unit + + def __post_init__(self): + match self.unit: + case Unit.sectors: + raise ValueError('Unit type sector not allowed for SectorSize') + + @staticmethod + def default() -> SectorSize: + return SectorSize(512, Unit.B) + + def json(self) -> Dict[str, Any]: + return { + 'value': self.value, + 'unit': self.unit.name, + } + + @classmethod + def parse_args(cls, arg: Dict[str, Any]) -> SectorSize: + return SectorSize( + arg['value'], + Unit[arg['unit']] + ) + + def normalize(self) -> int: + """ + will normalize the value of the unit to Byte + """ + return int(self.value * self.unit.value) # type: ignore + + +@dataclass +class Size: + value: int + unit: Unit + sector_size: SectorSize + + def __post_init__(self): + if not isinstance(self.sector_size, SectorSize): + raise ValueError('sector size must be of type SectorSize') + + def json(self) -> Dict[str, Any]: + return { + 'value': self.value, + 'unit': self.unit.name, + 'sector_size': self.sector_size.json() if self.sector_size else None + } + + @classmethod + def parse_args(cls, size_arg: Dict[str, Any]) -> Size: + sector_size = size_arg['sector_size'] + + return Size( + size_arg['value'], + Unit[size_arg['unit']], + SectorSize.parse_args(sector_size), + ) + + def convert( + self, + target_unit: Unit, + sector_size: Optional[SectorSize] = None + ) -> Size: + if target_unit == Unit.sectors and sector_size is None: + raise ValueError('If target has unit sector, a sector size must be provided') + + if self.unit == target_unit: + return self + elif self.unit == Unit.sectors: + norm = self._normalize() + return Size(norm, Unit.B, self.sector_size).convert(target_unit, sector_size) + else: + if target_unit == Unit.sectors and sector_size is not None: + norm = self._normalize() + sectors = math.ceil(norm / sector_size.value) + return Size(sectors, Unit.sectors, sector_size) + else: + value = int(self._normalize() / target_unit.value) # type: ignore + return Size(value, target_unit, self.sector_size) + + def as_text(self) -> str: + return self.format_size( + self.unit, + self.sector_size + ) + + def format_size( + self, + target_unit: Unit, + sector_size: Optional[SectorSize] = None, + include_unit: bool = True + ) -> str: + target_size = self.convert(target_unit, sector_size) + + if include_unit: + return f'{target_size.value} {target_unit.name}' + return f'{target_size.value}' + + def format_highest(self, include_unit: bool = True) -> str: + si_units = Unit.get_si_units() + all_si_values = [self.convert(si) for si in si_units] + filtered = filter(lambda x: x.value >= 1, all_si_values) + + # we have to get the max by the unit value as we're interested + # in getting the value in the highest possible unit without floats + si_value = max(filtered, key=lambda x: x.unit.value) + + if include_unit: + return f'{si_value.value} {si_value.unit.name}' + return f'{si_value.value}' + + def _normalize(self) -> int: + """ + will normalize the value of the unit to Byte + """ + if self.unit == Unit.sectors and self.sector_size is not None: + return self.value * self.sector_size.normalize() + return int(self.value * self.unit.value) # type: ignore + + def __sub__(self, other: Size) -> Size: + src_norm = self._normalize() + dest_norm = other._normalize() + return Size(abs(src_norm - dest_norm), Unit.B, self.sector_size) + + def __add__(self, other: Size) -> Size: + src_norm = self._normalize() + dest_norm = other._normalize() + return Size(abs(src_norm + dest_norm), Unit.B, self.sector_size) + + def __lt__(self, other): + return self._normalize() < other._normalize() + + def __le__(self, other): + return self._normalize() <= other._normalize() + + def __eq__(self, other): + return self._normalize() == other._normalize() + + def __ne__(self, other): + return self._normalize() != other._normalize() + + def __gt__(self, other): + return self._normalize() > other._normalize() + + def __ge__(self, other): + return self._normalize() >= other._normalize() + + +class BtrfsMountOption(Enum): + compress = 'compress=zstd' + nodatacow = 'nodatacow' + + +@dataclass +class _BtrfsSubvolumeInfo: + name: Path + mountpoint: Optional[Path] + + +@dataclass +class _PartitionInfo: + partition: Partition + name: str + type: PartitionType + fs_type: Optional[FilesystemType] + path: Path + start: Size + length: Size + flags: List[PartitionFlag] + partn: Optional[int] + partuuid: Optional[str] + uuid: Optional[str] + disk: Disk + mountpoints: List[Path] + btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list) + + @property + def sector_size(self) -> SectorSize: + sector_size = self.partition.geometry.device.sectorSize + return SectorSize(sector_size, Unit.B) + + def table_data(self) -> Dict[str, Any]: + end = self.start + self.length + + part_info = { + 'Name': self.name, + 'Type': self.type.value, + 'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')), + 'Path': str(self.path), + 'Start': self.start.format_size(Unit.sectors, self.sector_size, include_unit=False), + 'End': end.format_size(Unit.sectors, self.sector_size, include_unit=False), + 'Size': self.length.format_highest(), + 'Flags': ', '.join([f.name for f in self.flags]) + } + + if self.btrfs_subvol_infos: + part_info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes' + + return part_info + + @classmethod + def from_partition( + cls, + partition: Partition, + fs_type: Optional[FilesystemType], + partn: Optional[int], + partuuid: Optional[str], + uuid: Optional[str], + mountpoints: List[Path], + btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = [] + ) -> _PartitionInfo: + partition_type = PartitionType.get_type_from_code(partition.type) + flags = [f for f in PartitionFlag if partition.getFlag(f.value)] + + start = Size( + partition.geometry.start, + Unit.sectors, + SectorSize(partition.disk.device.sectorSize, Unit.B) + ) + + length = Size( + int(partition.getLength(unit='B')), + Unit.B, + SectorSize(partition.disk.device.sectorSize, Unit.B) + ) + + return _PartitionInfo( + partition=partition, + name=partition.get_name(), + type=partition_type, + fs_type=fs_type, + path=partition.path, + start=start, + length=length, + flags=flags, + partn=partn, + partuuid=partuuid, + uuid=uuid, + disk=partition.disk, + mountpoints=mountpoints, + btrfs_subvol_infos=btrfs_subvol_infos + ) + + +@dataclass +class _DeviceInfo: + model: str + path: Path + type: str + total_size: Size + free_space_regions: List[DeviceGeometry] + sector_size: SectorSize + read_only: bool + dirty: bool + + def table_data(self) -> Dict[str, Any]: + total_free_space = sum([region.get_length(unit=Unit.MiB) for region in self.free_space_regions]) + return { + 'Model': self.model, + 'Path': str(self.path), + 'Type': self.type, + 'Size': self.total_size.format_highest(), + 'Free space': int(total_free_space), + 'Sector size': self.sector_size.value, + 'Read only': self.read_only + } + + @classmethod + def from_disk(cls, disk: Disk) -> _DeviceInfo: + device = disk.device + if device.type == 18: + device_type = 'loop' + elif device.type in parted.devices: + device_type = parted.devices[device.type] + else: + debug(f'Device code unknown: {device.type}') + device_type = parted.devices[parted.DEVICE_UNKNOWN] + + sector_size = SectorSize(device.sectorSize, Unit.B) + free_space = [DeviceGeometry(g, sector_size) for g in disk.getFreeSpaceRegions()] + + sector_size = SectorSize(device.sectorSize, Unit.B) + + return _DeviceInfo( + model=device.model.strip(), + path=Path(device.path), + type=device_type, + sector_size=sector_size, + total_size=Size(int(device.getLength(unit='B')), Unit.B, sector_size), + free_space_regions=free_space, + read_only=device.readOnly, + dirty=device.dirty + ) + + +@dataclass +class SubvolumeModification: + name: Path + mountpoint: Optional[Path] = None + + @classmethod + def from_existing_subvol_info(cls, info: _BtrfsSubvolumeInfo) -> SubvolumeModification: + return SubvolumeModification(info.name, mountpoint=info.mountpoint) + + @classmethod + def parse_args(cls, subvol_args: List[Dict[str, Any]]) -> List[SubvolumeModification]: + mods = [] + for entry in subvol_args: + if not entry.get('name', None) or not entry.get('mountpoint', None): + debug(f'Subvolume arg is missing name: {entry}') + continue + + mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None + + mods.append(SubvolumeModification(entry['name'], mountpoint)) + + return mods + + @property + def relative_mountpoint(self) -> Path: + """ + Will return the relative path based on the anchor + e.g. Path('/mnt/test') -> Path('mnt/test') + """ + if self.mountpoint is not None: + return self.mountpoint.relative_to(self.mountpoint.anchor) + + raise ValueError('Mountpoint is not specified') + + def is_root(self) -> bool: + if self.mountpoint: + return self.mountpoint == Path('/') + return False + + def json(self) -> Dict[str, Any]: + return {'name': str(self.name), 'mountpoint': str(self.mountpoint)} + + def table_data(self) -> Dict[str, Any]: + return self.json() + + +class DeviceGeometry: + def __init__(self, geometry: Geometry, sector_size: SectorSize): + self._geometry = geometry + self._sector_size = sector_size + + @property + def start(self) -> int: + return self._geometry.start + + @property + def end(self) -> int: + return self._geometry.end + + def get_length(self, unit: Unit = Unit.sectors) -> int: + return self._geometry.getLength(unit.name) + + def table_data(self) -> Dict[str, Any]: + start = Size(self._geometry.start, Unit.sectors, self._sector_size) + end = Size(self._geometry.end, Unit.sectors, self._sector_size) + length = Size(self._geometry.getLength(), Unit.sectors, self._sector_size) + + start_str = f'{self._geometry.start} / {start.format_size(Unit.B, include_unit=False)}' + end_str = f'{self._geometry.end} / {end.format_size(Unit.B, include_unit=False)}' + length_str = f'{self._geometry.getLength()} / {length.format_size(Unit.B, include_unit=False)}' + + return { + 'Sector size': self._sector_size.value, + 'Start (sector/B)': start_str, + 'End (sector/B)': end_str, + 'Size (sectors/B)': length_str + } + + +@dataclass +class BDevice: + disk: Disk + device_info: _DeviceInfo + partition_infos: List[_PartitionInfo] + + def __hash__(self): + return hash(self.disk.device.path) + + +class PartitionType(Enum): + Boot = 'boot' + Primary = 'primary' + _Unknown = 'unknown' + + @classmethod + def get_type_from_code(cls, code: int) -> PartitionType: + if code == parted.PARTITION_NORMAL: + return PartitionType.Primary + else: + debug(f'Partition code not supported: {code}') + return PartitionType._Unknown + + def get_partition_code(self) -> Optional[int]: + if self == PartitionType.Primary: + return parted.PARTITION_NORMAL + elif self == PartitionType.Boot: + return parted.PARTITION_BOOT + return None + + +class PartitionFlag(Enum): + """ + Flags are taken from _ped because pyparted uses this to look + up their flag definitions: https://github.com/dcantrell/pyparted/blob/c4e0186dad45c8efbe67c52b02c8c4319df8aa9b/src/parted/__init__.py#L200-L202 + Which is the way libparted checks for its flags: https://git.savannah.gnu.org/gitweb/?p=parted.git;a=blob;f=libparted/labels/gpt.c;hb=4a0e468ed63fff85a1f9b923189f20945b32f4f1#l183 + """ + Boot = _ped.PARTITION_BOOT + XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot + ESP = _ped.PARTITION_ESP + + +# class PartitionGUIDs(Enum): +# """ +# A list of Partition type GUIDs (lsblk -o+PARTTYPE) can be found here: https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs +# """ +# XBOOTLDR = 'bc13c2ff-59e6-4262-a352-b275fd6f7172' + + +class FilesystemType(Enum): + Btrfs = 'btrfs' + Ext2 = 'ext2' + Ext3 = 'ext3' + Ext4 = 'ext4' + F2fs = 'f2fs' + Fat16 = 'fat16' + Fat32 = 'fat32' + Ntfs = 'ntfs' + Reiserfs = 'reiserfs' + Xfs = 'xfs' + + # this is not a FS known to parted, so be careful + # with the usage from this enum + Crypto_luks = 'crypto_LUKS' + + def is_crypto(self) -> bool: + return self == FilesystemType.Crypto_luks + + @property + def fs_type_mount(self) -> str: + match self: + case FilesystemType.Ntfs: return 'ntfs3' + case FilesystemType.Fat32: return 'vfat' + case _: return self.value # type: ignore + + @property + def installation_pkg(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return 'btrfs-progs' + case FilesystemType.Xfs: return 'xfsprogs' + case FilesystemType.F2fs: return 'f2fs-tools' + case _: return None + + @property + def installation_module(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return 'btrfs' + case _: return None + + @property + def installation_binary(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return '/usr/bin/btrfs' + case _: return None + + @property + def installation_hooks(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return 'btrfs' + case _: return None + + +class ModificationStatus(Enum): + Exist = 'existing' + Modify = 'modify' + Delete = 'delete' + Create = 'create' + + +@dataclass +class PartitionModification: + status: ModificationStatus + type: PartitionType + start: Size + length: Size + fs_type: Optional[FilesystemType] = None + mountpoint: Optional[Path] = None + mount_options: List[str] = field(default_factory=list) + flags: List[PartitionFlag] = field(default_factory=list) + btrfs_subvols: List[SubvolumeModification] = field(default_factory=list) + + # only set if the device was created or exists + dev_path: Optional[Path] = None + partn: Optional[int] = None + partuuid: Optional[str] = None + uuid: Optional[str] = None + + _efi_indicator_flags = (PartitionFlag.Boot, PartitionFlag.ESP) + _boot_indicator_flags = (PartitionFlag.Boot, PartitionFlag.XBOOTLDR) + + def __post_init__(self): + # needed to use the object as a dictionary key due to hash func + if not hasattr(self, '_obj_id'): + self._obj_id = uuid.uuid4() + + if self.is_exists_or_modify() and not self.dev_path: + raise ValueError('If partition marked as existing a path must be set') + + if self.fs_type is None and self.status == ModificationStatus.Modify: + raise ValueError('FS type must not be empty on modifications with status type modify') + + def __hash__(self): + return hash(self._obj_id) + + @property + def end(self) -> Size: + return self.start + self.length + + @property + def obj_id(self) -> str: + if hasattr(self, '_obj_id'): + return str(self._obj_id) + return '' + + @property + def safe_dev_path(self) -> Path: + if self.dev_path is None: + raise ValueError('Device path was not set') + return self.dev_path + + @property + def safe_fs_type(self) -> FilesystemType: + if self.fs_type is None: + raise ValueError('File system type is not set') + return self.fs_type + + @classmethod + def from_existing_partition(cls, partition_info: _PartitionInfo) -> PartitionModification: + if partition_info.btrfs_subvol_infos: + mountpoint = None + subvol_mods = [] + for i in partition_info.btrfs_subvol_infos: + subvol_mods.append( + SubvolumeModification.from_existing_subvol_info(i) + ) + else: + mountpoint = partition_info.mountpoints[0] if partition_info.mountpoints else None + subvol_mods = [] + + return PartitionModification( + status=ModificationStatus.Exist, + type=partition_info.type, + start=partition_info.start, + length=partition_info.length, + fs_type=partition_info.fs_type, + dev_path=partition_info.path, + partn=partition_info.partn, + partuuid=partition_info.partuuid, + uuid=partition_info.uuid, + flags=partition_info.flags, + mountpoint=mountpoint, + btrfs_subvols=subvol_mods + ) + + @property + def relative_mountpoint(self) -> Path: + """ + Will return the relative path based on the anchor + e.g. Path('/mnt/test') -> Path('mnt/test') + """ + if self.mountpoint: + return self.mountpoint.relative_to(self.mountpoint.anchor) + + raise ValueError('Mountpoint is not specified') + + def is_efi(self) -> bool: + return ( + any(set(self.flags) & set(self._efi_indicator_flags)) + and self.fs_type == FilesystemType.Fat32 + and PartitionFlag.XBOOTLDR not in self.flags + ) + + def is_boot(self) -> bool: + """ + Returns True if any of the boot indicator flags are found in self.flags + """ + return any(set(self.flags) & set(self._boot_indicator_flags)) + + def is_root(self) -> bool: + if self.mountpoint is not None: + return Path('/') == self.mountpoint + else: + for subvol in self.btrfs_subvols: + if subvol.is_root(): + return True + + return False + + def is_modify(self) -> bool: + return self.status == ModificationStatus.Modify + + def exists(self) -> bool: + return self.status == ModificationStatus.Exist + + def is_exists_or_modify(self) -> bool: + return self.status in [ModificationStatus.Exist, ModificationStatus.Modify] + + def is_create_or_modify(self) -> bool: + return self.status in [ModificationStatus.Create, ModificationStatus.Modify] + + @property + def mapper_name(self) -> Optional[str]: + if self.dev_path: + return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.dev_path.name}' + return None + + def set_flag(self, flag: PartitionFlag): + if flag not in self.flags: + self.flags.append(flag) + + def invert_flag(self, flag: PartitionFlag): + if flag in self.flags: + self.flags = [f for f in self.flags if f != flag] + else: + self.set_flag(flag) + + def json(self) -> Dict[str, Any]: + """ + Called for configuration settings + """ + return { + 'obj_id': self.obj_id, + 'status': self.status.value, + 'type': self.type.value, + 'start': self.start.json(), + 'size': self.length.json(), + 'fs_type': self.fs_type.value if self.fs_type else None, + 'mountpoint': str(self.mountpoint) if self.mountpoint else None, + 'mount_options': self.mount_options, + 'flags': [f.name for f in self.flags], + 'dev_path': str(self.dev_path) if self.dev_path else None, + 'btrfs': [vol.json() for vol in self.btrfs_subvols] + } + + def table_data(self) -> Dict[str, Any]: + """ + Called for displaying data in table format + """ + part_mod = { + 'Status': self.status.value, + 'Device': str(self.dev_path) if self.dev_path else '', + 'Type': self.type.value, + 'Start': self.start.format_size(Unit.sectors, self.start.sector_size, include_unit=False), + 'End': self.end.format_size(Unit.sectors, self.start.sector_size, include_unit=False), + 'Size': self.length.format_highest(), + 'FS type': self.fs_type.value if self.fs_type else 'Unknown', + 'Mountpoint': self.mountpoint if self.mountpoint else '', + 'Mount options': ', '.join(self.mount_options), + 'Flags': ', '.join([f.name for f in self.flags]), + } + + if self.btrfs_subvols: + part_mod['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes' + + return part_mod + + +class LvmLayoutType(Enum): + Default = 'default' + + # Manual = 'manual_lvm' + + def display_msg(self) -> str: + match self: + case LvmLayoutType.Default: + return str(_('Default layout')) + # case LvmLayoutType.Manual: + # return str(_('Manual configuration')) + + raise ValueError(f'Unknown type: {self}') + + +@dataclass +class LvmVolumeGroup: + name: str + pvs: List[PartitionModification] + volumes: List[LvmVolume] = field(default_factory=list) + + def json(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'lvm_pvs': [p.obj_id for p in self.pvs], + 'volumes': [vol.json() for vol in self.volumes] + } + + @staticmethod + def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmVolumeGroup: + lvm_pvs = [] + for mod in disk_config.device_modifications: + for part in mod.partitions: + if part.obj_id in arg.get('lvm_pvs', []): + lvm_pvs.append(part) + + return LvmVolumeGroup( + arg['name'], + lvm_pvs, + [LvmVolume.parse_arg(vol) for vol in arg['volumes']] + ) + + def contains_lv(self, lv: LvmVolume) -> bool: + return lv in self.volumes + + +class LvmVolumeStatus(Enum): + Exist = 'existing' + Modify = 'modify' + Delete = 'delete' + Create = 'create' + + +@dataclass +class LvmVolume: + status: LvmVolumeStatus + name: str + fs_type: FilesystemType + length: Size + mountpoint: Optional[Path] + mount_options: List[str] = field(default_factory=list) + btrfs_subvols: List[SubvolumeModification] = field(default_factory=list) + + # volume group name + vg_name: Optional[str] = None + # mapper device path /dev/<vg>/<vol> + dev_path: Optional[Path] = None + + def __post_init__(self): + # needed to use the object as a dictionary key due to hash func + if not hasattr(self, '_obj_id'): + self._obj_id = uuid.uuid4() + + def __hash__(self): + return hash(self._obj_id) + + @property + def obj_id(self) -> str: + if hasattr(self, '_obj_id'): + return str(self._obj_id) + return '' + + @property + def mapper_name(self) -> Optional[str]: + if self.dev_path: + return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.safe_dev_path.name}' + return None + + @property + def mapper_path(self) -> Path: + if self.mapper_name: + return Path(f'/dev/mapper/{self.mapper_name}') + + raise ValueError('No mapper path set') + + @property + def safe_dev_path(self) -> Path: + if self.dev_path: + return self.dev_path + raise ValueError('No device path for volume defined') + + @property + def safe_fs_type(self) -> FilesystemType: + if self.fs_type is None: + raise ValueError('File system type is not set') + return self.fs_type + + @property + def relative_mountpoint(self) -> Path: + """ + Will return the relative path based on the anchor + e.g. Path('/mnt/test') -> Path('mnt/test') + """ + if self.mountpoint is not None: + return self.mountpoint.relative_to(self.mountpoint.anchor) + + raise ValueError('Mountpoint is not specified') + + @staticmethod + def parse_arg(arg: Dict[str, Any]) -> LvmVolume: + volume = LvmVolume( + status=LvmVolumeStatus(arg['status']), + name=arg['name'], + fs_type=FilesystemType(arg['fs_type']), + length=Size.parse_args(arg['length']), + mountpoint=Path(arg['mountpoint']) if arg['mountpoint'] else None, + mount_options=arg.get('mount_options', []), + btrfs_subvols=SubvolumeModification.parse_args(arg.get('btrfs', [])) + ) + + setattr(volume, '_obj_id', arg['obj_id']) + + return volume + + def json(self) -> Dict[str, Any]: + return { + 'obj_id': self.obj_id, + 'status': self.status.value, + 'name': self.name, + 'fs_type': self.fs_type.value, + 'length': self.length.json(), + 'mountpoint': str(self.mountpoint) if self.mountpoint else None, + 'mount_options': self.mount_options, + 'btrfs': [vol.json() for vol in self.btrfs_subvols] + } + + def table_data(self) -> Dict[str, Any]: + part_mod = { + 'Type': self.status.value, + 'Name': self.name, + 'Size': self.length.format_highest(), + 'FS type': self.fs_type.value, + 'Mountpoint': str(self.mountpoint) if self.mountpoint else '', + 'Mount options': ', '.join(self.mount_options), + 'Btrfs': '{} {}'.format(str(len(self.btrfs_subvols)), 'vol') + } + return part_mod + + def is_modify(self) -> bool: + return self.status == LvmVolumeStatus.Modify + + def exists(self) -> bool: + return self.status == LvmVolumeStatus.Exist + + def is_exists_or_modify(self) -> bool: + return self.status in [LvmVolumeStatus.Exist, LvmVolumeStatus.Modify] + + def is_root(self) -> bool: + if self.mountpoint is not None: + return Path('/') == self.mountpoint + else: + for subvol in self.btrfs_subvols: + if subvol.is_root(): + return True + + return False + + +@dataclass +class LvmGroupInfo: + vg_size: Size + vg_uuid: str + + +@dataclass +class LvmVolumeInfo: + lv_name: str + vg_name: str + lv_size: Size + + +@dataclass +class LvmPVInfo: + pv_name: Path + lv_name: str + vg_name: str + + +@dataclass +class LvmConfiguration: + config_type: LvmLayoutType + vol_groups: List[LvmVolumeGroup] + + def __post_init__(self): + # make sure all volume groups have unique PVs + pvs = [] + for group in self.vol_groups: + for pv in group.pvs: + if pv in pvs: + raise ValueError('A PV cannot be used in multiple volume groups') + pvs.append(pv) + + def json(self) -> Dict[str, Any]: + return { + 'config_type': self.config_type.value, + 'vol_groups': [vol_gr.json() for vol_gr in self.vol_groups] + } + + @staticmethod + def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmConfiguration: + lvm_pvs = [] + for mod in disk_config.device_modifications: + for part in mod.partitions: + if part.obj_id in arg.get('lvm_pvs', []): + lvm_pvs.append(part) + + return LvmConfiguration( + config_type=LvmLayoutType(arg['config_type']), + vol_groups=[LvmVolumeGroup.parse_arg(vol_group, disk_config) for vol_group in arg['vol_groups']], + ) + + def get_all_pvs(self) -> List[PartitionModification]: + pvs = [] + for vg in self.vol_groups: + pvs += vg.pvs + + return pvs + + def get_all_volumes(self) -> List[LvmVolume]: + volumes = [] + + for vg in self.vol_groups: + volumes += vg.volumes + + return volumes + + def get_root_volume(self) -> Optional[LvmVolume]: + for vg in self.vol_groups: + filtered = next(filter(lambda x: x.is_root(), vg.volumes), None) + if filtered: + return filtered + + return None + + +# def get_lv_crypt_uuid(self, lv: LvmVolume, encryption: EncryptionType) -> str: +# """ +# Find the LUKS superblock UUID for the device that +# contains the given logical volume +# """ +# for vg in self.vol_groups: +# if vg.contains_lv(lv): + + +@dataclass +class DeviceModification: + device: BDevice + wipe: bool + partitions: List[PartitionModification] = field(default_factory=list) + + @property + def device_path(self) -> Path: + return self.device.device_info.path + + def add_partition(self, partition: PartitionModification): + self.partitions.append(partition) + + def get_efi_partition(self) -> Optional[PartitionModification]: + """ + Similar to get_boot_partition() but excludes XBOOTLDR partitions from it's candidates. + """ + filtered = filter(lambda x: x.is_efi() and x.mountpoint, self.partitions) + return next(filtered, None) + + def get_boot_partition(self) -> Optional[PartitionModification]: + """ + Returns the first partition marked as XBOOTLDR (PARTTYPE id of bc13c2ff-...) or Boot and has a mountpoint. + Only returns XBOOTLDR if separate EFI is detected using self.get_efi_partition() + Will return None if no suitable partition is found. + """ + if efi_partition := self.get_efi_partition(): + filtered = filter(lambda x: x.is_boot() and x != efi_partition and x.mountpoint, self.partitions) + if boot_partition := next(filtered, None): + return boot_partition + return efi_partition + else: + filtered = filter(lambda x: x.is_boot() and x.mountpoint, self.partitions) + return next(filtered, None) + + def get_root_partition(self) -> Optional[PartitionModification]: + filtered = filter(lambda x: x.is_root(), self.partitions) + return next(filtered, None) + + def json(self) -> Dict[str, Any]: + """ + Called when generating configuration files + """ + return { + 'device': str(self.device.device_info.path), + 'wipe': self.wipe, + 'partitions': [p.json() for p in self.partitions] + } + + +class EncryptionType(Enum): + NoEncryption = "no_encryption" + Luks = "luks" + LvmOnLuks = 'lvm_on_luks' + LuksOnLvm = 'luks_on_lvm' + + @classmethod + def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']: + return { + str(_('No Encryption')): EncryptionType.NoEncryption, + str(_('LUKS')): EncryptionType.Luks, + str(_('LVM on LUKS')): EncryptionType.LvmOnLuks, + str(_('LUKS on LVM')): EncryptionType.LuksOnLvm + } + + @classmethod + def text_to_type(cls, text: str) -> 'EncryptionType': + mapping = cls._encryption_type_mapper() + return mapping[text] + + @classmethod + def type_to_text(cls, type_: 'EncryptionType') -> str: + mapping = cls._encryption_type_mapper() + type_to_text = {type_: text for text, type_ in mapping.items()} + return type_to_text[type_] + + +@dataclass +class DiskEncryption: + encryption_type: EncryptionType = EncryptionType.NoEncryption + encryption_password: str = '' + partitions: List[PartitionModification] = field(default_factory=list) + lvm_volumes: List[LvmVolume] = field(default_factory=list) + hsm_device: Optional[Fido2Device] = None + + def __post_init__(self): + if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions: + raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined') + + if self.encryption_type == EncryptionType.LuksOnLvm and not self.lvm_volumes: + raise ValueError('LuksOnLvm encryption require LMV volumes to be defined') + + def should_generate_encryption_file(self, dev: PartitionModification | LvmVolume) -> bool: + if isinstance(dev, PartitionModification): + return dev in self.partitions and dev.mountpoint != Path('/') + elif isinstance(dev, LvmVolume): + return dev in self.lvm_volumes and dev.mountpoint != Path('/') + return False + + def json(self) -> Dict[str, Any]: + obj: Dict[str, Any] = { + 'encryption_type': self.encryption_type.value, + 'partitions': [p.obj_id for p in self.partitions], + 'lvm_volumes': [vol.obj_id for vol in self.lvm_volumes] + } + + if self.hsm_device: + obj['hsm_device'] = self.hsm_device.json() + + return obj + + @classmethod + def validate_enc(cls, disk_config: DiskLayoutConfiguration) -> bool: + partitions = [] + + for mod in disk_config.device_modifications: + for part in mod.partitions: + partitions.append(part) + + if len(partitions) > 2: # assume one boot and at least 2 additional + if disk_config.lvm_config: + return False + + return True + + @classmethod + def parse_arg( + cls, + disk_config: DiskLayoutConfiguration, + arg: Dict[str, Any], + password: str = '' + ) -> Optional['DiskEncryption']: + if not cls.validate_enc(disk_config): + return None + + enc_partitions = [] + for mod in disk_config.device_modifications: + for part in mod.partitions: + if part.obj_id in arg.get('partitions', []): + enc_partitions.append(part) + + volumes = [] + if disk_config.lvm_config: + for vol in disk_config.lvm_config.get_all_volumes(): + if vol.obj_id in arg.get('lvm_volumes', []): + volumes.append(vol) + + enc = DiskEncryption( + EncryptionType(arg['encryption_type']), + password, + enc_partitions, + volumes + ) + + if hsm := arg.get('hsm_device', None): + enc.hsm_device = Fido2Device.parse_arg(hsm) + + return enc + + +@dataclass +class Fido2Device: + path: Path + manufacturer: str + product: str + + def json(self) -> Dict[str, str]: + return { + 'path': str(self.path), + 'manufacturer': self.manufacturer, + 'product': self.product + } + + def table_data(self) -> Dict[str, str]: + return { + 'Path': str(self.path), + 'Manufacturer': self.manufacturer, + 'Product': self.product + } + + @classmethod + def parse_arg(cls, arg: Dict[str, str]) -> 'Fido2Device': + return Fido2Device( + Path(arg['path']), + arg['manufacturer'], + arg['product'] + ) + + +@dataclass +class LsblkInfo: + name: str = '' + path: Path = Path() + pkname: str = '' + size: Size = field(default_factory=lambda: Size(0, Unit.B, SectorSize.default())) + log_sec: int = 0 + pttype: str = '' + ptuuid: str = '' + rota: bool = False + tran: Optional[str] = None + partn: Optional[int] = None + partuuid: Optional[str] = None + parttype: Optional[str] = None + uuid: Optional[str] = None + fstype: Optional[str] = None + fsver: Optional[str] = None + fsavail: Optional[str] = None + fsuse_percentage: Optional[str] = None + type: Optional[str] = None + mountpoint: Optional[Path] = None + mountpoints: List[Path] = field(default_factory=list) + fsroots: List[Path] = field(default_factory=list) + children: List[LsblkInfo] = field(default_factory=list) + + def json(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'path': str(self.path), + 'pkname': self.pkname, + 'size': self.size.format_size(Unit.MiB), + 'log_sec': self.log_sec, + 'pttype': self.pttype, + 'ptuuid': self.ptuuid, + 'rota': self.rota, + 'tran': self.tran, + 'partn': self.partn, + 'partuuid': self.partuuid, + 'parttype': self.parttype, + 'uuid': self.uuid, + 'fstype': self.fstype, + 'fsver': self.fsver, + 'fsavail': self.fsavail, + 'fsuse_percentage': self.fsuse_percentage, + 'type': self.type, + 'mountpoint': self.mountpoint, + 'mountpoints': [str(m) for m in self.mountpoints], + 'fsroots': [str(r) for r in self.fsroots], + 'children': [c.json() for c in self.children] + } + + @property + def btrfs_subvol_info(self) -> Dict[Path, Path]: + """ + It is assumed that lsblk will contain the fields as + + "mountpoints": ["/mnt/archinstall/log", "/mnt/archinstall/home", "/mnt/archinstall", ...] + "fsroots": ["/@log", "/@home", "/@"...] + + we'll thereby map the fsroot, which are the mounted filesystem roots + to the corresponding mountpoints + """ + return dict(zip(self.fsroots, self.mountpoints)) + + @classmethod + def exclude(cls) -> List[str]: + return ['children'] + + @classmethod + def fields(cls) -> List[str]: + return [f.name for f in dataclasses.fields(LsblkInfo) if f.name not in cls.exclude()] + + @classmethod + def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo: + lsblk_info = cls() + + for f in cls.fields(): + lsblk_field = _clean_field(f, CleanType.Blockdevice) + data_field = _clean_field(f, CleanType.Dataclass) + + val: Any = None + if isinstance(getattr(lsblk_info, data_field), Path): + val = Path(blockdevice[lsblk_field]) + elif isinstance(getattr(lsblk_info, data_field), Size): + sector_size = SectorSize(blockdevice['log-sec'], Unit.B) + val = Size(blockdevice[lsblk_field], Unit.B, sector_size) + else: + val = blockdevice[lsblk_field] + + setattr(lsblk_info, data_field, val) + + lsblk_info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])] + + # sometimes lsblk returns 'mountpoints': [null] + lsblk_info.mountpoints = [Path(mnt) for mnt in lsblk_info.mountpoints if mnt] + + fs_roots = [] + for r in lsblk_info.fsroots: + if r: + path = Path(r) + # store the fsroot entries without the leading / + fs_roots.append(path.relative_to(path.anchor)) + lsblk_info.fsroots = fs_roots + + return lsblk_info + + +class CleanType(Enum): + Blockdevice = auto() + Dataclass = auto() + Lsblk = auto() + + +def _clean_field(name: str, clean_type: CleanType) -> str: + match clean_type: + case CleanType.Blockdevice: + return name.replace('_percentage', '%').replace('_', '-') + case CleanType.Dataclass: + return name.lower().replace('-', '_').replace('%', '_percentage') + case CleanType.Lsblk: + return name.replace('_percentage', '%').replace('_', '-') + + +def _fetch_lsblk_info( + dev_path: Optional[Union[Path, str]] = None, + reverse: bool = False, + full_dev_path: bool = False +) -> List[LsblkInfo]: + fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()] + cmd = ['lsblk', '--json', '--bytes', '--output', '+' + ','.join(fields)] + + if dev_path: + cmd.append(str(dev_path)) + + if reverse: + cmd.append('--inverse') + + if full_dev_path: + cmd.append('--paths') + + try: + result = SysCommand(cmd).decode() + except SysCallError as err: + # Get the output minus the message/info from lsblk if it returns a non-zero exit code. + if err.worker: + err_str = err.worker.decode() + debug(f'Error calling lsblk: {err_str}') + + if dev_path: + raise DiskError(f'Failed to read disk "{dev_path}" with lsblk') + + raise err + + try: + block_devices = json.loads(result) + except json.decoder.JSONDecodeError as err: + error(f"Could not decode lsblk JSON: {result}") + raise err + + blockdevices = block_devices['blockdevices'] + return [LsblkInfo.from_json(device) for device in blockdevices] + + +def get_lsblk_info( + dev_path: Union[Path, str], + reverse: bool = False, + full_dev_path: bool = False +) -> LsblkInfo: + if infos := _fetch_lsblk_info(dev_path, reverse=reverse, full_dev_path=full_dev_path): + return infos[0] + + raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"') + + +def get_all_lsblk_info() -> List[LsblkInfo]: + return _fetch_lsblk_info() + + +def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]: + def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]: + devices = [] + for entry in infos: + if as_prefix: + matches = [m for m in entry.mountpoints if str(m).startswith(str(mountpoint))] + if matches: + devices += [entry] + elif mountpoint in entry.mountpoints: + devices += [entry] + + if len(entry.children) > 0: + if len(match := _check(entry.children)) > 0: + devices += match + + return devices + + all_info = get_all_lsblk_info() + return _check(all_info) diff --git a/archinstall/lib/disk/disk_menu.py b/archinstall/lib/disk/disk_menu.py new file mode 100644 index 00000000..a7d9ccc3 --- /dev/null +++ b/archinstall/lib/disk/disk_menu.py @@ -0,0 +1,140 @@ +from typing import Dict, Optional, Any, TYPE_CHECKING, List + +from . import DiskLayoutConfiguration, DiskLayoutType +from .device_model import LvmConfiguration +from ..disk import ( + DeviceModification +) +from ..interactions import select_disk_config +from ..interactions.disk_conf import select_lvm_config +from ..menu import ( + Selector, + AbstractSubMenu +) +from ..output import FormattedOutput + +if TYPE_CHECKING: + _: Any + + +class DiskLayoutConfigurationMenu(AbstractSubMenu): + def __init__( + self, + disk_layout_config: Optional[DiskLayoutConfiguration], + data_store: Dict[str, Any], + advanced: bool = False + ): + self._disk_layout_config = disk_layout_config + self._advanced = advanced + + super().__init__(data_store=data_store, preview_size=0.5) + + def setup_selection_menu_options(self): + self._menu_options['disk_config'] = \ + Selector( + _('Partitioning'), + lambda x: self._select_disk_layout_config(x), + display_func=lambda x: self._display_disk_layout(x), + preview_func=self._prev_disk_layouts, + default=self._disk_layout_config, + enabled=True + ) + self._menu_options['lvm_config'] = \ + Selector( + _('Logical Volume Management (LVM)'), + lambda x: self._select_lvm_config(x), + display_func=lambda x: self.defined_text if x else '', + preview_func=self._prev_lvm_config, + default=self._disk_layout_config.lvm_config if self._disk_layout_config else None, + dependencies=[self._check_dep_lvm], + enabled=True + ) + + def run(self, allow_reset: bool = True) -> Optional[DiskLayoutConfiguration]: + super().run(allow_reset=allow_reset) + + disk_layout_config: Optional[DiskLayoutConfiguration] = self._data_store.get('disk_config', None) + + if disk_layout_config: + disk_layout_config.lvm_config = self._data_store.get('lvm_config', None) + + return disk_layout_config + + def _check_dep_lvm(self) -> bool: + disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + + if disk_layout_conf and disk_layout_conf.config_type == DiskLayoutType.Default: + return True + + return False + + def _select_disk_layout_config( + self, + preset: Optional[DiskLayoutConfiguration] + ) -> Optional[DiskLayoutConfiguration]: + disk_config = select_disk_config(preset, advanced_option=self._advanced) + + if disk_config != preset: + self._menu_options['lvm_config'].set_current_selection(None) + + return disk_config + + def _select_lvm_config(self, preset: Optional[LvmConfiguration]) -> Optional[LvmConfiguration]: + disk_config: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + if disk_config: + return select_lvm_config(disk_config, preset=preset) + return preset + + def _display_disk_layout(self, current_value: Optional[DiskLayoutConfiguration] = None) -> str: + if current_value: + return current_value.config_type.display_msg() + return '' + + def _prev_disk_layouts(self) -> Optional[str]: + disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + + if disk_layout_conf: + device_mods: List[DeviceModification] = \ + list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications)) + + if device_mods: + output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg()) + output_btrfs = '' + + for mod in device_mods: + # create partition table + partition_table = FormattedOutput.as_table(mod.partitions) + + output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n' + output_partition += partition_table + '\n' + + # create btrfs table + btrfs_partitions = list( + filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions) + ) + for partition in btrfs_partitions: + output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n' + + output = output_partition + output_btrfs + return output.rstrip() + + return None + + def _prev_lvm_config(self) -> Optional[str]: + lvm_config: Optional[LvmConfiguration] = self._menu_options['lvm_config'].current_selection + + if lvm_config: + output = '{}: {}\n'.format(str(_('Configuration')), lvm_config.config_type.display_msg()) + + for vol_gp in lvm_config.vol_groups: + pv_table = FormattedOutput.as_table(vol_gp.pvs) + output += '{}:\n{}'.format(str(_('Physical volumes')), pv_table) + + output += f'\nVolume Group: {vol_gp.name}' + + lvm_volumes = FormattedOutput.as_table(vol_gp.volumes) + output += '\n\n{}:\n{}'.format(str(_('Volumes')), lvm_volumes) + + return output + + return None diff --git a/archinstall/lib/disk/diskinfo.py b/archinstall/lib/disk/diskinfo.py deleted file mode 100644 index b56ba282..00000000 --- a/archinstall/lib/disk/diskinfo.py +++ /dev/null @@ -1,40 +0,0 @@ -import dataclasses -import json -from dataclasses import dataclass, field -from typing import Optional, List - -from ..general import SysCommand -from ..exceptions import DiskError - -@dataclass -class LsblkInfo: - size: int = 0 - log_sec: int = 0 - pttype: Optional[str] = None - rota: bool = False - tran: Optional[str] = None - ptuuid: Optional[str] = None - partuuid: Optional[str] = None - uuid: Optional[str] = None - fstype: Optional[str] = None - type: Optional[str] = None - mountpoints: List[str] = field(default_factory=list) - - -def get_lsblk_info(dev_path: str) -> LsblkInfo: - fields = [f.name for f in dataclasses.fields(LsblkInfo)] - lsblk_fields = ','.join([f.upper().replace('_', '-') for f in fields]) - - output = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}').decode('UTF-8') - - if output: - block_devices = json.loads(output) - info = block_devices['blockdevices'][0] - lsblk_info = LsblkInfo() - - for f in fields: - setattr(lsblk_info, f, info[f.replace('_', '-')]) - - return lsblk_info - - raise DiskError(f'Failed to read disk "{dev_path}" with lsblk') diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py deleted file mode 100644 index 63392ffb..00000000 --- a/archinstall/lib/disk/dmcryptdev.py +++ /dev/null @@ -1,48 +0,0 @@ -import pathlib -import logging -import json -from dataclasses import dataclass -from typing import Optional -from ..exceptions import SysCallError -from ..general import SysCommand -from ..output import log -from .mapperdev import MapperDev - -@dataclass -class DMCryptDev: - dev_path :pathlib.Path - - @property - def name(self): - with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh: - return fh.read().strip() - - @property - def path(self): - return f"/dev/mapper/{self.dev_path}" - - @property - def blockdev(self): - pass - - @property - def MapperDev(self): - return MapperDev(mappername=self.name) - - @property - def mountpoint(self) -> Optional[str]: - try: - data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode()) - for filesystem in data['filesystems']: - return filesystem.get('target') - - except SysCallError as error: - # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow") - pass - - return None - - @property - def filesystem(self) -> Optional[str]: - return self.MapperDev.filesystem
\ No newline at end of file diff --git a/archinstall/lib/disk/encryption.py b/archinstall/lib/disk/encryption.py deleted file mode 100644 index c7496bfa..00000000 --- a/archinstall/lib/disk/encryption.py +++ /dev/null @@ -1,174 +0,0 @@ -from typing import Dict, Optional, Any, TYPE_CHECKING, List - -from ..menu.abstract_menu import Selector, AbstractSubMenu -from ..menu.menu import MenuSelectionType -from ..menu.table_selection_menu import TableMenu -from ..models.disk_encryption import EncryptionType, DiskEncryption -from ..user_interaction.partitioning_conf import current_partition_layout -from ..user_interaction.utils import get_password -from ..menu import Menu -from ..general import secret -from ..hsm.fido import Fido2Device, Fido2 - -if TYPE_CHECKING: - _: Any - - -class DiskEncryptionMenu(AbstractSubMenu): - def __init__(self, data_store: Dict[str, Any], preset: Optional[DiskEncryption], disk_layouts: Dict[str, Any]): - if preset: - self._preset = preset - else: - self._preset = DiskEncryption() - - self._disk_layouts = disk_layouts - super().__init__(data_store=data_store) - - def _setup_selection_menu_options(self): - self._menu_options['encryption_password'] = \ - Selector( - _('Encryption password'), - lambda x: select_encrypted_password(), - display_func=lambda x: secret(x) if x else '', - default=self._preset.encryption_password, - enabled=True - ) - self._menu_options['encryption_type'] = \ - Selector( - _('Encryption type'), - func=lambda preset: select_encryption_type(preset), - display_func=lambda x: EncryptionType.type_to_text(x) if x else None, - dependencies=['encryption_password'], - default=self._preset.encryption_type, - enabled=True - ) - self._menu_options['partitions'] = \ - Selector( - _('Partitions'), - func=lambda preset: select_partitions_to_encrypt(self._disk_layouts, preset), - display_func=lambda x: f'{sum([len(y) for y in x.values()])} {_("Partitions")}' if x else None, - dependencies=['encryption_password'], - default=self._preset.partitions, - preview_func=self._prev_disk_layouts, - enabled=True - ) - self._menu_options['HSM'] = \ - Selector( - description=_('Use HSM to unlock encrypted drive'), - func=lambda preset: select_hsm(preset), - display_func=lambda x: self._display_hsm(x), - dependencies=['encryption_password'], - default=self._preset.hsm_device, - enabled=True - ) - - def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]: - super().run(allow_reset=allow_reset) - - if self._data_store.get('encryption_password', None): - return DiskEncryption( - encryption_password=self._data_store.get('encryption_password', None), - encryption_type=self._data_store['encryption_type'], - partitions=self._data_store.get('partitions', None), - hsm_device=self._data_store.get('HSM', None) - ) - - return None - - def _display_hsm(self, device: Optional[Fido2Device]) -> Optional[str]: - if device: - return device.manufacturer - - if not Fido2.get_fido2_devices(): - return str(_('No HSM devices available')) - return None - - def _prev_disk_layouts(self) -> Optional[str]: - selector = self._menu_options['partitions'] - if selector.has_selection(): - partitions: Dict[str, Any] = selector.current_selection - - all_partitions = [] - for parts in partitions.values(): - all_partitions += parts - - output = str(_('Partitions to be encrypted')) + '\n' - output += current_partition_layout(all_partitions, with_title=False) - return output.rstrip() - return None - - -def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]: - title = str(_('Select disk encryption option')) - options = [ - # _type_to_text(EncryptionType.FullDiskEncryption), - EncryptionType.type_to_text(EncryptionType.Partition) - ] - - preset_value = EncryptionType.type_to_text(preset) - choice = Menu(title, options, preset_values=preset_value).run() - - match choice.type_: - case MenuSelectionType.Reset: return None - case MenuSelectionType.Skip: return preset - case MenuSelectionType.Selection: return EncryptionType.text_to_type(choice.value) # type: ignore - - -def select_encrypted_password() -> Optional[str]: - if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))): - return passwd - return None - - -def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]: - title = _('Select a FIDO2 device to use for HSM') - fido_devices = Fido2.get_fido2_devices() - - if fido_devices: - choice = TableMenu(title, data=fido_devices).run() - match choice.type_: - case MenuSelectionType.Reset: - return None - case MenuSelectionType.Skip: - return preset - case MenuSelectionType.Selection: - return choice.value # type: ignore - - return None - - -def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: Dict[str, Any]) -> Dict[str, Any]: - # If no partitions was marked as encrypted, but a password was supplied and we have some disks to format.. - # Then we need to identify which partitions to encrypt. This will default to / (root). - all_partitions = [] - for blockdevice in disk_layouts.values(): - if partitions := blockdevice.get('partitions'): - partitions = [p for p in partitions if p['mountpoint'] != '/boot'] - all_partitions += partitions - - if all_partitions: - title = str(_('Select which partitions to encrypt')) - partition_table = current_partition_layout(all_partitions, with_title=False).strip() - - choice = TableMenu( - title, - table_data=(all_partitions, partition_table), - multi=True - ).run() - - match choice.type_: - case MenuSelectionType.Reset: - return {} - case MenuSelectionType.Skip: - return preset - case MenuSelectionType.Selection: - selections: List[Any] = choice.value # type: ignore - partitions = {} - - for path, device in disk_layouts.items(): - for part in selections: - if part in device.get('partitions', []): - partitions.setdefault(path, []).append(part) - - return partitions - return {} diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py new file mode 100644 index 00000000..b0e292ce --- /dev/null +++ b/archinstall/lib/disk/encryption_menu.py @@ -0,0 +1,288 @@ +from pathlib import Path +from typing import Dict, Optional, Any, TYPE_CHECKING, List + +from . import LvmConfiguration, LvmVolume +from ..disk import ( + DeviceModification, + DiskLayoutConfiguration, + PartitionModification, + DiskEncryption, + EncryptionType +) +from ..menu import ( + Selector, + AbstractSubMenu, + MenuSelectionType, + TableMenu +) +from ..interactions.utils import get_password +from ..menu import Menu +from ..general import secret +from .fido import Fido2Device, Fido2 +from ..output import FormattedOutput + +if TYPE_CHECKING: + _: Any + + +class DiskEncryptionMenu(AbstractSubMenu): + def __init__( + self, + disk_config: DiskLayoutConfiguration, + data_store: Dict[str, Any], + preset: Optional[DiskEncryption] = None + ): + if preset: + self._preset = preset + else: + self._preset = DiskEncryption() + + self._disk_config = disk_config + super().__init__(data_store=data_store) + + def setup_selection_menu_options(self): + self._menu_options['encryption_type'] = \ + Selector( + _('Encryption type'), + func=lambda preset: select_encryption_type(self._disk_config, preset), + display_func=lambda x: EncryptionType.type_to_text(x) if x else None, + default=self._preset.encryption_type, + enabled=True, + ) + self._menu_options['encryption_password'] = \ + Selector( + _('Encryption password'), + lambda x: select_encrypted_password(), + dependencies=[self._check_dep_enc_type], + display_func=lambda x: secret(x) if x else '', + default=self._preset.encryption_password, + enabled=True + ) + self._menu_options['partitions'] = \ + Selector( + _('Partitions'), + func=lambda preset: select_partitions_to_encrypt(self._disk_config.device_modifications, preset), + display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None, + dependencies=[self._check_dep_partitions], + default=self._preset.partitions, + preview_func=self._prev_partitions, + enabled=True + ) + self._menu_options['lvm_vols'] = \ + Selector( + _('LVM volumes'), + func=lambda preset: self._select_lvm_vols(preset), + display_func=lambda x: f'{len(x)} {_("LVM volumes")}' if x else None, + dependencies=[self._check_dep_lvm_vols], + default=self._preset.lvm_volumes, + preview_func=self._prev_lvm_vols, + enabled=True + ) + self._menu_options['HSM'] = \ + Selector( + description=_('Use HSM to unlock encrypted drive'), + func=lambda preset: select_hsm(preset), + display_func=lambda x: self._display_hsm(x), + preview_func=self._prev_hsm, + dependencies=[self._check_dep_enc_type], + default=self._preset.hsm_device, + enabled=True + ) + + def _select_lvm_vols(self, preset: List[LvmVolume]) -> List[LvmVolume]: + if self._disk_config.lvm_config: + return select_lvm_vols_to_encrypt(self._disk_config.lvm_config, preset=preset) + return [] + + def _check_dep_enc_type(self) -> bool: + enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection + if enc_type and enc_type != EncryptionType.NoEncryption: + return True + return False + + def _check_dep_partitions(self) -> bool: + enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection + if enc_type and enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks]: + return True + return False + + def _check_dep_lvm_vols(self) -> bool: + enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection + if enc_type and enc_type == EncryptionType.LuksOnLvm: + return True + return False + + def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]: + super().run(allow_reset=allow_reset) + + enc_type = self._data_store.get('encryption_type', None) + enc_password = self._data_store.get('encryption_password', None) + enc_partitions = self._data_store.get('partitions', None) + enc_lvm_vols = self._data_store.get('lvm_vols', None) + + if enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and enc_partitions: + enc_lvm_vols = [] + + if enc_type == EncryptionType.LuksOnLvm: + enc_partitions = [] + + if enc_type != EncryptionType.NoEncryption and enc_password and (enc_partitions or enc_lvm_vols): + return DiskEncryption( + encryption_password=enc_password, + encryption_type=enc_type, + partitions=enc_partitions, + lvm_volumes=enc_lvm_vols, + hsm_device=self._data_store.get('HSM', None) + ) + + return None + + def _display_hsm(self, device: Optional[Fido2Device]) -> Optional[str]: + if device: + return device.manufacturer + + return None + + def _prev_partitions(self) -> Optional[str]: + partitions: Optional[List[PartitionModification]] = self._menu_options['partitions'].current_selection + if partitions: + output = str(_('Partitions to be encrypted')) + '\n' + output += FormattedOutput.as_table(partitions) + return output.rstrip() + + return None + + def _prev_lvm_vols(self) -> Optional[str]: + volumes: Optional[List[PartitionModification]] = self._menu_options['lvm_vols'].current_selection + if volumes: + output = str(_('LVM volumes to be encrypted')) + '\n' + output += FormattedOutput.as_table(volumes) + return output.rstrip() + + return None + + def _prev_hsm(self) -> Optional[str]: + try: + Fido2.get_fido2_devices() + except ValueError: + return str(_('Unable to determine fido2 devices. Is libfido2 installed?')) + + fido_device: Optional[Fido2Device] = self._menu_options['HSM'].current_selection + + if fido_device: + output = '{}: {}'.format(str(_('Path')), fido_device.path) + output += '{}: {}'.format(str(_('Manufacturer')), fido_device.manufacturer) + output += '{}: {}'.format(str(_('Product')), fido_device.product) + return output + + return None + + +def select_encryption_type(disk_config: DiskLayoutConfiguration, preset: EncryptionType) -> Optional[EncryptionType]: + title = str(_('Select disk encryption option')) + + if disk_config.lvm_config: + options = [ + EncryptionType.type_to_text(EncryptionType.LvmOnLuks), + EncryptionType.type_to_text(EncryptionType.LuksOnLvm) + ] + else: + options = [EncryptionType.type_to_text(EncryptionType.Luks)] + + preset_value = EncryptionType.type_to_text(preset) + + choice = Menu(title, options, preset_values=preset_value).run() + + match choice.type_: + case MenuSelectionType.Reset: return None + case MenuSelectionType.Skip: return preset + case MenuSelectionType.Selection: return EncryptionType.text_to_type(choice.value) # type: ignore + + +def select_encrypted_password() -> Optional[str]: + if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))): + return passwd + return None + + +def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]: + title = _('Select a FIDO2 device to use for HSM') + + try: + fido_devices = Fido2.get_fido2_devices() + except ValueError: + return None + + if fido_devices: + choice = TableMenu(title, data=fido_devices).run() + match choice.type_: + case MenuSelectionType.Reset: + return None + case MenuSelectionType.Skip: + return preset + case MenuSelectionType.Selection: + return choice.value # type: ignore + + return None + + +def select_partitions_to_encrypt( + modification: List[DeviceModification], + preset: List[PartitionModification] +) -> List[PartitionModification]: + partitions: List[PartitionModification] = [] + + # do not allow encrypting the boot partition + for mod in modification: + partitions += list(filter(lambda x: x.mountpoint != Path('/boot'), mod.partitions)) + + # do not allow encrypting existing partitions that are not marked as wipe + avail_partitions = list(filter(lambda x: not x.exists(), partitions)) + + if avail_partitions: + title = str(_('Select which partitions to encrypt')) + partition_table = FormattedOutput.as_table(avail_partitions) + + choice = TableMenu( + title, + table_data=(avail_partitions, partition_table), + preset=preset, + multi=True + ).run() + + match choice.type_: + case MenuSelectionType.Reset: + return [] + case MenuSelectionType.Skip: + return preset + case MenuSelectionType.Selection: + return choice.multi_value + return [] + + +def select_lvm_vols_to_encrypt( + lvm_config: LvmConfiguration, + preset: List[LvmVolume] +) -> List[LvmVolume]: + volumes: List[LvmVolume] = lvm_config.get_all_volumes() + + if volumes: + title = str(_('Select which LVM volumes to encrypt')) + partition_table = FormattedOutput.as_table(volumes) + + choice = TableMenu( + title, + table_data=(volumes, partition_table), + preset=preset, + multi=True + ).run() + + match choice.type_: + case MenuSelectionType.Reset: + return [] + case MenuSelectionType.Skip: + return preset + case MenuSelectionType.Selection: + return choice.multi_value + + return [] diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py new file mode 100644 index 00000000..5a139534 --- /dev/null +++ b/archinstall/lib/disk/fido.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import getpass +from pathlib import Path +from typing import List + +from .device_model import Fido2Device +from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes +from ..output import error, info +from ..exceptions import SysCallError + + +class Fido2: + _loaded: bool = False + _fido2_devices: List[Fido2Device] = [] + + @classmethod + def get_fido2_devices(cls, reload: bool = False) -> List[Fido2Device]: + """ + Uses systemd-cryptenroll to list the FIDO2 devices + connected that supports FIDO2. + Some devices might show up in udevadm as FIDO2 compliant + when they are in fact not. + + The drawback of systemd-cryptenroll is that it uses human readable format. + That means we get this weird table like structure that is of no use. + + So we'll look for `MANUFACTURER` and `PRODUCT`, we take their index + and we split each line based on those positions. + + Output example: + + PATH MANUFACTURER PRODUCT + /dev/hidraw1 Yubico YubiKey OTP+FIDO+CCID + """ + + # to prevent continuous reloading which will slow + # down moving the cursor in the menu + if not cls._loaded or reload: + try: + ret = SysCommand("systemd-cryptenroll --fido2-device=list").decode() + except SysCallError: + error('fido2 support is most likely not installed') + raise ValueError('HSM devices can not be detected, is libfido2 installed?') + + fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore + + manufacturer_pos = 0 + product_pos = 0 + devices = [] + + for line in fido_devices.split('\r\n'): + if '/dev' not in line: + manufacturer_pos = line.find('MANUFACTURER') + product_pos = line.find('PRODUCT') + continue + + path = line[:manufacturer_pos].rstrip() + manufacturer = line[manufacturer_pos:product_pos].rstrip() + product = line[product_pos:] + + devices.append( + Fido2Device(Path(path), manufacturer, product) + ) + + cls._loaded = True + cls._fido2_devices = devices + + return cls._fido2_devices + + @classmethod + def fido2_enroll( + cls, + hsm_device: Fido2Device, + dev_path: Path, + password: str + ): + worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {dev_path}", peek_output=True) + pw_inputted = False + pin_inputted = False + + while worker.is_alive(): + if pw_inputted is False: + if bytes(f"please enter current passphrase for disk {dev_path}", 'UTF-8') in worker._trace_log.lower(): + worker.write(bytes(password, 'UTF-8')) + pw_inputted = True + elif pin_inputted is False: + if bytes(f"please enter security token pin", 'UTF-8') in worker._trace_log.lower(): + worker.write(bytes(getpass.getpass(" "), 'UTF-8')) + pin_inputted = True + + info('You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds') diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 1083df53..5c11896e 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,301 +1,381 @@ from __future__ import annotations + +import signal +import sys import time -import logging -import json -import pathlib -from typing import Optional, Dict, Any, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -from ..models.disk_encryption import DiskEncryption +from pathlib import Path +from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set + +from .device_handler import device_handler +from .device_model import ( + DiskLayoutConfiguration, DiskLayoutType, PartitionTable, + FilesystemType, DiskEncryption, LvmVolumeGroup, + Size, Unit, SectorSize, PartitionModification, EncryptionType, + LvmVolume, LvmConfiguration +) +from ..hardware import SysInfo +from ..luks import Luks2 +from ..menu import Menu +from ..output import debug, info +from ..general import SysCommand if TYPE_CHECKING: - from .blockdevice import BlockDevice _: Any -from .partition import Partition -from .validators import valid_fs_type -from ..exceptions import DiskError, SysCallError -from ..general import SysCommand -from ..output import log -from ..storage import storage -GPT = 0b00000001 -MBR = 0b00000010 +class FilesystemHandler: + def __init__( + self, + disk_config: DiskLayoutConfiguration, + enc_conf: Optional[DiskEncryption] = None + ): + self._disk_config = disk_config + self._enc_config = enc_conf + + def perform_filesystem_operations(self, show_countdown: bool = True): + if self._disk_config.config_type == DiskLayoutType.Pre_mount: + debug('Disk layout configuration is set to pre-mount, not performing any operations') + return + + device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications)) + + if not device_mods: + debug('No modifications required') + return + + device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods]) + + # Issue a final warning before we continue with something un-revertable. + # We mention the drive one last time, and count from 5 to 0. + print(str(_(' ! Formatting {} in ')).format(device_paths)) + + if show_countdown: + self._do_countdown() + + # Setup the blockdevice, filesystem (and optionally encryption). + # Once that's done, we'll hand over to perform_installation() + partition_table = PartitionTable.GPT + if SysInfo.has_uefi() is False: + partition_table = PartitionTable.MBR + + for mod in device_mods: + device_handler.partition(mod, partition_table=partition_table) + + if self._disk_config.lvm_config: + for mod in device_mods: + if boot_part := mod.get_boot_partition(): + debug(f'Formatting boot partition: {boot_part.dev_path}') + self._format_partitions( + [boot_part], + mod.device_path + ) + + self.perform_lvm_operations() + else: + for mod in device_mods: + self._format_partitions( + mod.partitions, + mod.device_path + ) -# A sane default is 5MiB, that allows for plenty of buffer for GRUB on MBR -# but also 4MiB for memory cards for instance. And another 1MiB to avoid issues. -# (we've been pestered by disk issues since the start, so please let this be here for a few versions) -DEFAULT_PARTITION_START = '5MiB' + for part_mod in mod.partitions: + if part_mod.fs_type == FilesystemType.Btrfs: + device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config) -class Filesystem: - # TODO: - # When instance of a HDD is selected, check all usages and gracefully unmount them - # as well as close any crypto handles. - def __init__(self, blockdevice :BlockDevice, mode :int): - self.blockdevice = blockdevice - self.mode = mode + def _format_partitions( + self, + partitions: List[PartitionModification], + device_path: Path + ): + """ + Format can be given an overriding path, for instance /dev/null to test + the formatting functionality and in essence the support for the given filesystem. + """ - def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': - return self + # don't touch existing partitions + create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()] - def __repr__(self) -> str: - return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" + self._validate_partitions(create_or_modify_parts) - def __exit__(self, *args :str, **kwargs :str) -> bool: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - if len(args) >= 2 and args[1]: - raise args[1] + # make sure all devices are unmounted + device_handler.umount_all_existing(device_path) - SysCommand('sync') - return True + for part_mod in create_or_modify_parts: + # partition will be encrypted + if self._enc_config is not None and part_mod in self._enc_config.partitions: + device_handler.format_encrypted( + part_mod.safe_dev_path, + part_mod.mapper_name, + part_mod.safe_fs_type, + self._enc_config + ) + else: + device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path) + + # synchronize with udev before using lsblk + SysCommand('udevadm settle') + + lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path) + + part_mod.partn = lsblk_info.partn + part_mod.partuuid = lsblk_info.partuuid + part_mod.uuid = lsblk_info.uuid + + def _validate_partitions(self, partitions: List[PartitionModification]): + checks = { + # verify that all partitions have a path set (which implies that they have been created) + lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'), + # crypto luks is not a valid file system type + lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError( + 'Crypto luks cannot be set as a filesystem type'), + # file system type must be set + lambda x: x.fs_type is None: ValueError('File system type must be set for modification') + } + + for check, exc in checks.items(): + found = next(filter(check, partitions), None) + if found is not None: + raise exc + + def perform_lvm_operations(self): + info('Setting up LVM config...') + + if not self._disk_config.lvm_config: + return + + if self._enc_config: + self._setup_lvm_encrypted( + self._disk_config.lvm_config, + self._enc_config + ) + else: + self._setup_lvm(self._disk_config.lvm_config) + self._format_lvm_vols(self._disk_config.lvm_config) + + def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption): + if enc_config.encryption_type == EncryptionType.LvmOnLuks: + enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False) + + self._setup_lvm(lvm_config, enc_mods) + self._format_lvm_vols(lvm_config) + + # export the lvm group safely otherwise the Luks cannot be closed + self._safely_close_lvm(lvm_config) + + for luks in enc_mods.values(): + luks.lock() + elif enc_config.encryption_type == EncryptionType.LuksOnLvm: + self._setup_lvm(lvm_config) + enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False) + self._format_lvm_vols(lvm_config, enc_vols) + + for luks in enc_vols.values(): + luks.lock() + + self._safely_close_lvm(lvm_config) + + def _safely_close_lvm(self, lvm_config: LvmConfiguration): + for vg in lvm_config.vol_groups: + for vol in vg.volumes: + device_handler.lvm_vol_change(vol, False) + + device_handler.lvm_export_vg(vg) + + def _setup_lvm( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + self._lvm_create_pvs(lvm_config, enc_mods) + + for vg in lvm_config.vol_groups: + pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods) + + device_handler.lvm_vg_create(pv_dev_paths, vg.name) + + # figure out what the actual available size in the group is + vg_info = device_handler.lvm_group_info(vg.name) + + if not vg_info: + raise ValueError('Unable to fetch VG info') + + # the actual available LVM Group size will be smaller than the + # total PVs size due to reserved metadata storage etc. + # so we'll have a look at the total avail. size, check the delta + # to the desired sizes and subtract some equally from the actually + # created volume + avail_size = vg_info.vg_size + desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default())) + + delta = desired_size - avail_size + max_vol_offset = delta.convert(Unit.B) + + max_vol = max(vg.volumes, key=lambda x: x.length) + + for lv in vg.volumes: + offset = max_vol_offset if lv == max_vol else None + + debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}') + device_handler.lvm_vol_create(vg.name, lv, offset) - def partuuid_to_index(self, uuid :str) -> Optional[int]: - for i in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - # We'll use unreliable lbslk to grab children under the /dev/<device> - output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8')) - - for device in output['blockdevices']: - for index, partition in enumerate(device.get('children', [])): - # But we'll use blkid to reliably grab the PARTUUID for that child device (partition) - partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip() - if partition_uuid.lower() == uuid.lower(): - return index - - raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") - - def load_layout(self, layout :Dict[str, Any]) -> None: - from ..luks import luks2 - from .btrfs import BTRFSPartition - - # If the layout tells us to wipe the drive, we do so - if layout.get('wipe', False): - if self.mode == GPT: - if not self.parted_mklabel(self.blockdevice.device, "gpt"): - raise KeyError(f"Could not create a GPT label on {self}") - elif self.mode == MBR: - if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MS-DOS label on {self}") - - self.blockdevice.flush_cache() - time.sleep(3) - - prev_partition = None - # We then iterate the partitions in order - for partition in layout.get('partitions', []): - # We don't want to re-add an existing partition (those containing a UUID already) - if partition.get('wipe', False) and not partition.get('PARTUUID', None): - start = partition.get('start') or ( - prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START) - partition['device_instance'] = self.add_partition(partition.get('type', 'primary'), - start=start, - end=partition.get('size', '100%'), - partition_format=partition.get('filesystem', {}).get('format', 'btrfs'), - skip_mklabel=layout.get('wipe', False) is not False) - - elif (partition_uuid := partition.get('PARTUUID')): - # We try to deal with both UUID and PARTUUID of a partition when it's being re-used. - # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID') - # but for now, lets just attempt to deal with both. - try: - partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid) - except DiskError: - partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid) - - log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray") + while True: + debug('Fetching LVM volume info') + lv_info = device_handler.lvm_vol_info(lv.name) + if lv_info is not None: + break + + time.sleep(1) + + self._lvm_vol_handle_e2scrub(vg) + + def _format_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_vols: Dict[LvmVolume, Luks2] = {} + ): + for vol in lvm_config.get_all_volumes(): + if enc_vol := enc_vols.get(vol, None): + if not enc_vol.mapper_dev: + raise ValueError('No mapper device defined') + path = enc_vol.mapper_dev else: - log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING) - continue - - if partition.get('filesystem', {}).get('format', False): - # needed for backward compatibility with the introduction of the new "format_options" - format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[]) - disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption') - - if disk_encryption and partition in disk_encryption.all_partitions: - if not partition['device_instance']: - raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") - - if partition.get('mountpoint',None): - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" - else: - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" - - partition['device_instance'].encrypt(password=disk_encryption.encryption_password) - # Immediately unlock the encrypted device to format the inner volume - with luks2(partition['device_instance'], loopdev, disk_encryption.encryption_password, auto_unmount=True) as unlocked_device: - if not partition.get('wipe'): - if storage['arguments'] == 'silent': - raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}") - else: - if not partition.get('filesystem'): - partition['filesystem'] = {} - - if not partition['filesystem'].get('format', False): - while True: - partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip() - if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False: - log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")) - continue - break - - unlocked_device.format(partition['filesystem']['format'], options=format_options) - - elif partition.get('wipe', False): - if not partition['device_instance']: - raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") - - partition['device_instance'].format(partition['filesystem']['format'], options=format_options) - - if partition['filesystem']['format'] == 'btrfs': - # We upgrade the device instance to a BTRFSPartition if we format it as such. - # This is so that we can gain access to more features than otherwise available in Partition() - partition['device_instance'] = BTRFSPartition( - partition['device_instance'].path, - block_device=partition['device_instance'].block_device, - encrypted=False, - filesystem='btrfs', - autodetect_filesystem=False - ) - - if partition.get('boot', False): - log(f"Marking partition {partition['device_instance']} as bootable.") - self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on') - - prev_partition = partition - - def find_partition(self, mountpoint :str) -> Partition: - for partition in self.blockdevice: - if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: - return partition - - def partprobe(self) -> bool: - try: - SysCommand(f'partprobe {self.blockdevice.device}') - except SysCallError as error: - log(f"Could not execute partprobe: {error!r}", level=logging.ERROR, fg="red") - raise DiskError(f"Could not run partprobe on {self.blockdevice.device}: {error!r}") + path = vol.safe_dev_path - return True + # wait a bit otherwise the mkfs will fail as it can't + # find the mapper device yet + device_handler.format(vol.fs_type, path) - def raw_parted(self, string: str) -> SysCommand: - try: - cmd_handle = SysCommand(f'/usr/bin/parted -s {string}') - time.sleep(0.5) - return cmd_handle - except SysCallError as error: - log(f"Parted ended with a bad exit code: {error.exit_code} ({error})", level=logging.ERROR, fg="red") - return error + if vol.fs_type == FilesystemType.Btrfs: + device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options) - def parted(self, string: str) -> bool: - """ - Performs a parted execution of the given string + def _lvm_create_pvs( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + pv_paths: Set[Path] = set() - :param string: A raw string passed to /usr/bin/parted -s <string> - :type string: str - """ - if (parted_handle := self.raw_parted(string)).exit_code == 0: - return self.partprobe() - else: - raise DiskError(f"Parted failed to add a partition: {parted_handle}") + for vg in lvm_config.vol_groups: + pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods) - def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: - # TODO: Implement this with declarative profiles instead. - raise ValueError("Installation().use_entire_disk() has to be re-worked.") + device_handler.lvm_pv_create(pv_paths) - def add_partition( + def _get_all_pv_dev_paths( self, - partition_type :str, - start :str, - end :str, - partition_format :Optional[str] = None, - skip_mklabel :bool = False - ) -> Partition: - log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) - - if len(self.blockdevice.partitions) == 0 and skip_mklabel is False: - # If it's a completely empty drive, and we're about to add partitions to it - # we need to make sure there's a filesystem label. - if self.mode == GPT: - if not self.parted_mklabel(self.blockdevice.device, "gpt"): - raise KeyError(f"Could not create a GPT label on {self}") - elif self.mode == MBR: - if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MS-DOS label on {self}") - - self.blockdevice.flush_cache() - - previous_partuuids = [] - for partition in self.blockdevice.partitions.values(): - try: - previous_partuuids.append(partition.part_uuid) - except DiskError: - pass - - # TODO this check should probably run in the setup process rather than during the installation - if self.mode == MBR: - if len(self.blockdevice.partitions) > 3: - DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions") - - if partition_format: - parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}' - else: - parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}' - - log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG) - - if self.parted(parted_string): - for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)): - self.blockdevice.flush_cache() - - new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()] - new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids)) - - if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()): - try: - return self.blockdevice.get_partition(partuuid=new_partuuid) - except Exception as err: - log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red") - log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red") - log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red") - log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red") - log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red") - raise err - else: - log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG) - self.partprobe() - time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) - else: - print("Parted did not return True during partition creation") + pvs: List[PartitionModification], + enc_mods: Dict[PartitionModification, Luks2] = {} + ) -> Set[Path]: + pv_paths: Set[Path] = set() + + for pv in pvs: + if enc_pv := enc_mods.get(pv, None): + if mapper := enc_pv.mapper_dev: + pv_paths.add(mapper) + else: + pv_paths.add(pv.safe_dev_path) + + return pv_paths + + def _encrypt_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[LvmVolume, Luks2]: + enc_vols: Dict[LvmVolume, Luks2] = {} + + for vol in lvm_config.get_all_volumes(): + if vol in enc_config.lvm_volumes: + luks_handler = device_handler.encrypt( + vol.safe_dev_path, + vol.mapper_name, + enc_config.encryption_password, + lock_after_create + ) + + enc_vols[vol] = luks_handler + + return enc_vols + + def _encrypt_partitions( + self, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[PartitionModification, Luks2]: + enc_mods: Dict[PartitionModification, Luks2] = {} + + for mod in self._disk_config.device_modifications: + partitions = mod.partitions + + # don't touch existing partitions + filtered_part = [p for p in partitions if not p.exists()] + + self._validate_partitions(filtered_part) + + # make sure all devices are unmounted + device_handler.umount_all_existing(mod.device_path) + + enc_mods = {} + + for part_mod in filtered_part: + if part_mod in enc_config.partitions: + luks_handler = device_handler.encrypt( + part_mod.safe_dev_path, + part_mod.mapper_name, + enc_config.encryption_password, + lock_after_create=lock_after_create + ) + + enc_mods[part_mod] = luks_handler - total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()]) - total_partitions.update(previous_partuuids) + return enc_mods - # TODO: This should never be able to happen - log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red") - log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red") - log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red") + def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup): + # from arch wiki: + # If a logical volume will be formatted with ext4, leave at least 256 MiB + # free space in the volume group to allow using e2scrub + if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]): + largest_vol = max(vol_gp.volumes, key=lambda x: x.length) - raise DiskError(f"Could not add partition using: {parted_string}") + device_handler.lvm_vol_reduce( + largest_vol.safe_dev_path, + Size(256, Unit.MiB, SectorSize.default()) + ) - def set_name(self, partition: int, name: str) -> bool: - return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 + def _do_countdown(self) -> bool: + SIG_TRIGGER = False - def set(self, partition: int, string: str) -> bool: - log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) - return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 + def kill_handler(sig: int, frame: Any) -> None: + print() + exit(0) - def parted_mklabel(self, device: str, disk_label: str) -> bool: - log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") - # Try to unmount devices before attempting to run mklabel - try: - SysCommand(f'bash -c "umount {device}?"') - except: - pass + def sig_handler(sig: int, frame: Any) -> None: + signal.signal(signal.SIGINT, kill_handler) - self.partprobe() - worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0 - self.partprobe() + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, sig_handler) - return worked + for i in range(5, 0, -1): + print(f"{i}", end='') + + for x in range(4): + sys.stdout.flush() + time.sleep(0.25) + print(".", end='') + + if SIG_TRIGGER: + prompt = _('Do you really want to abort?') + choice = Menu(prompt, Menu.yes_no(), skip=False).run() + if choice.value == Menu.yes(): + exit(0) + + if SIG_TRIGGER is False: + sys.stdin.read() + + SIG_TRIGGER = False + signal.signal(signal.SIGINT, sig_handler) + + print() + signal.signal(signal.SIGINT, original_sigint_handler) + + return True diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py deleted file mode 100644 index 80d0cb53..00000000 --- a/archinstall/lib/disk/helpers.py +++ /dev/null @@ -1,556 +0,0 @@ -from __future__ import annotations -import json -import logging -import os # type: ignore -import pathlib -import re -import time -import glob - -from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -from .diskinfo import get_lsblk_info -from ..models.subvolume import Subvolume - -from .blockdevice import BlockDevice -from .dmcryptdev import DMCryptDev -from .mapperdev import MapperDev -from ..exceptions import SysCallError, DiskError -from ..general import SysCommand -from ..output import log -from ..storage import storage - -if TYPE_CHECKING: - from .partition import Partition - - -ROOT_DIR_PATTERN = re.compile('^.*?/devices') -GIGA = 2 ** 30 - -def convert_size_to_gb(size :Union[int, float]) -> float: - return round(size / GIGA,1) - -def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]: - result = {device: 0 for device in block_devices} - - for device, weight in result.items(): - if device.spinning: - weight -= 10 - else: - weight += 5 - - if device.bus_type == 'nvme': - weight += 20 - elif device.bus_type == 'sata': - weight += 10 - - result[device] = weight - - return result - -def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]: - for disk in devices: - if disk.size >= gigabytes: - yield disk - -def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: - if not filter_out: - filter_out = [] - - copy_devices = [*devices] - for filter_device in filter_out: - if filter_device in copy_devices: - copy_devices.pop(copy_devices.index(filter_device)) - - copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes)) - - if not len(copy_devices): - return None - - return max(copy_devices, key=(lambda device : device.size)) - -def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: - if not filter_out: - filter_out = [] - - copy_devices = [*devices] - for filter_device in filter_out: - if filter_device in copy_devices: - copy_devices.pop(copy_devices.index(filter_device)) - - if not len(copy_devices): - return None - - return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) - -def convert_to_gigabytes(string :str) -> float: - unit = string.strip()[-1] - size = float(string.strip()[:-1]) - - if unit == 'M': - size = size / 1024 - elif unit == 'T': - size = size * 1024 - - return size - -def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: - # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 - if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): - with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: - if f.read(1) == '1': - return - - path = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/block/{}'.format(name))) - hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire") - for bus in hotplug_buses: - if os.path.exists('/sys/bus/{}'.format(bus)): - for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)): - device_link = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus))) - if re.search(device_link, path): - return - return True - - -def cleanup_bash_escapes(data :str) -> str: - return data.replace(r'\ ', ' ') - -def blkid(cmd :str) -> Dict[str, Any]: - if '-o' in cmd and '-o export' not in cmd: - raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.") - elif '-o' not in cmd: - cmd += ' -o export' - - try: - raw_data = SysCommand(cmd).decode() - except SysCallError as error: - log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG) - raise error - - result = {} - # Process the raw result - devname = None - for line in raw_data.split('\r\n'): - if not len(line): - devname = None - continue - - key, val = line.split('=', 1) - if key.lower() == 'devname': - devname = val - # Lowercase for backwards compatibility with all_disks() previous use cases - result[devname] = { - "path": devname, - "PATH": devname - } - continue - - result[devname][key] = cleanup_bash_escapes(val) - - return result - -def get_loop_info(path :str) -> Dict[str, Any]: - for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: - if not drive['name'] == path: - continue - - return { - path: { - **drive, - 'type' : 'loop', - 'TYPE' : 'loop', - 'DEVTYPE' : 'loop', - 'PATH' : drive['name'], - 'path' : drive['name'] - } - } - - return {} - -def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]: - result = {} - for device_path, device_information in information.items(): - dev_name = pathlib.Path(device_information['PATH']).name - if not device_information.get('TYPE') or not device_information.get('DEVTYPE'): - with open(f"/sys/class/block/{dev_name}/uevent") as fh: - device_information.update(uevent(fh.read())) - - if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists(): - with dmcrypt_name.open('r') as fh: - device_information['DMCRYPT_NAME'] = fh.read().strip() - - result[device_path] = device_information - - return result - -def uevent(data :str) -> Dict[str, Any]: - information = {} - - for line in data.replace('\r\n', '\n').split('\n'): - if len((line := line.strip())): - key, val = line.split('=', 1) - information[key] = val - - return information - -def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]: - device_information = {} - with open(f"/sys/class/block/{dev_name}/uevent") as fh: - device_information.update(uevent(fh.read())) - - return { - f"/dev/{dev_name}" : { - **device_information, - 'path' : f'/dev/{dev_name}', - 'PATH' : f'/dev/{dev_name}', - 'PTTYPE' : None - } - } - - -def all_disks() -> List[BlockDevice]: - log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow") - return all_blockdevices(partitions=False, mappers=False) - -def get_blockdevice_info(device_path, exclude_iso_dev :bool = True) -> Dict[str, Any]: - for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']): - partprobe(device_path) - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) - - try: - if exclude_iso_dev: - # exclude all devices associated with the iso boot locations - iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt'] - - try: - lsblk_info = get_lsblk_info(device_path) - except DiskError: - continue - - if any([dev in lsblk_info.mountpoints for dev in iso_devs]): - continue - - information = blkid(f'blkid -p -o export {device_path}') - return enrich_blockdevice_information(information) - except SysCallError as ex: - if ex.exit_code == 2: - # Assume that it's a loop device, and try to get info on it - try: - resolved_device_name = device_path.readlink().name - except OSError: - resolved_device_name = device_path.name - - try: - information = get_loop_info(device_path) - if not information: - raise SysCallError(f"Could not get loop information for {resolved_device_name}", exit_code=1) - return enrich_blockdevice_information(information) - - except SysCallError: - information = get_blockdevice_uevent(resolved_device_name) - return enrich_blockdevice_information(information) - else: - # We could not reliably get any information, perhaps the disk is clean of information? - if retry_attempt == storage['DISK_RETRY_ATTEMPTS'] - 1: - raise ex - -def all_blockdevices( - mappers: bool = False, - partitions: bool = False, - error: bool = False, - exclude_iso_dev: bool = True -) -> Dict[str, Any]: - """ - Returns BlockDevice() and Partition() objects for all available devices. - """ - from .partition import Partition - - instances = {} - - # Due to lsblk being highly unreliable for this use case, - # we'll iterate the /sys/class definitions and find the information - # from there. - for block_device in glob.glob("/sys/class/block/*"): - try: - device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}") - except FileNotFoundError: - log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") - - if device_path.exists() is False: - log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") - continue - - information = get_blockdevice_info(device_path) - if not information: - continue - - for path, path_info in information.items(): - if path_info.get('DMCRYPT_NAME'): - instances[path] = DMCryptDev(dev_path=path) - elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'): - if partitions: - instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path)))) - elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': - instances[path] = BlockDevice(path, path_info) - elif path_info.get('TYPE') in ('squashfs', 'erofs'): - # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) - continue - else: - log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow") - - if mappers: - for block_device in glob.glob("/dev/mapper/*"): - if (pathobj := pathlib.Path(block_device)).is_symlink(): - instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name) - - return instances - - -def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path: - partition_name = path.name - pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve() - return f"/dev/{pci_device.parent.name}" - -def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: - collection = all_blockdevices(partitions=False) - for drive in collection: - if size and convert_to_gigabytes(collection[drive]['size']) != size: - continue - if model and (collection[drive]['model'] is None or collection[drive]['model'].lower() != model.lower()): - continue - - return collection[drive] - -def split_bind_name(path :Union[pathlib.Path, str]) -> list: - # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow") - # we check for the bind notation. if exist we'll only use the "true" device path - if '[' in str(path) : # is a bind path (btrfs subvolume path) - device_path, bind_path = str(path).split('[') - bind_path = bind_path[:-1].strip() # remove the ] - else: - device_path = path - bind_path = None - return device_path,bind_path - -def find_mountpoint(device_path :str) -> Dict[str, Any]: - try: - for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']: - yield filesystem - except SysCallError: - return {} - -def findmnt(path :pathlib.Path, traverse :bool = False, ignore :List = [], recurse :bool = True) -> Dict[str, Any]: - for traversal in list(map(str, [str(path)] + list(path.parents))): - if traversal in ignore: - continue - - try: - log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) - if (output := SysCommand(f"/usr/bin/findmnt --json {'--submounts' if recurse else ''} {traversal}").decode('UTF-8')): - return json.loads(output) - - except SysCallError as error: - log(f"Could not get mount information on {path} but continuing and ignoring: {error}", level=logging.INFO, fg="gray") - pass - - if not traverse: - break - - raise DiskError(f"Could not get mount information for path {path}") - - -def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False, ignore :List = []) -> Dict[str, Any]: - import traceback - - log(f"Deprecated: archinstall.get_mount_info(). Use archinstall.findmnt() instead, which does not do any automatic parsing. Please change at:\n{''.join(traceback.format_stack())}") - device_path, bind_path = split_bind_name(path) - output = {} - - for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): - if traversal in ignore: - continue - - try: - log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) - if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')): - break - - except SysCallError as error: - print('ERROR:', error) - pass - - if not traverse: - break - - if not output: - raise DiskError(f"Could not get mount information for device path {device_path}") - - output = json.loads(output) - - # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter - # i.e. the subvolume filesystem we're searching for - if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None: - output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)] - - if 'filesystems' in output: - if len(output['filesystems']) > 1: - raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}") - - if return_real_path: - return output['filesystems'][0], traversal - else: - return output['filesystems'][0] - - if return_real_path: - return {}, traversal - else: - return {} - - -def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]: - for info in data: - if info.get('target') not in filters: - filters[info.get('target')] = None - - filters.update(get_all_targets(info.get('children', []))) - - return filters - -def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: - from .partition import Partition - - try: - output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') - except SysCallError: - return {} - - if not output: - return {} - - output = json.loads(output) - - mounts = {} - - block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True) - - block_devices_mountpoints = {} - for blockdev in block_devices_available.values(): - if not type(blockdev) in (Partition, MapperDev): - continue - - if isinstance(blockdev, Partition): - if blockdev.mountpoints: - for blockdev_mountpoint in blockdev.mountpoints: - block_devices_mountpoints[blockdev_mountpoint] = blockdev - else: - if blockdev.mount_information: - for blockdev_mountpoint in blockdev.mount_information: - block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev - - log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) - - for mountpoint in list(get_all_targets(output['filesystems']).keys()): - # Since all_blockdevices() returns PosixPath objects, we need to convert - # findmnt paths to pathlib.Path() first: - mountpoint = pathlib.Path(mountpoint) - - if mountpoint in block_devices_mountpoints: - if mountpoint not in mounts: - mounts[mountpoint] = block_devices_mountpoints[mountpoint] - # If the already defined mountpoint is a DMCryptDev, and the newly found - # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition. - elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev: - mounts[mountpoint] = block_devices_mountpoints[mountpoint] - - log(f"Available partitions: {mounts}", level=logging.DEBUG) - - return mounts - - -def get_filesystem_type(path :str) -> Optional[str]: - try: - return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip() - except SysCallError: - return None - - -def disk_layouts() -> Optional[Dict[str, Any]]: - try: - if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0: - return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()} - else: - log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow") - return None - except SysCallError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") - return None - except json.decoder.JSONDecodeError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") - return None - - -def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: - for device in block_devices: - for partition in block_devices[device]['partitions']: - if partition.get('mountpoint', None) == relative_mountpoint: - return partition - -def partprobe(path :str = '') -> bool: - try: - if SysCommand(f'bash -c "partprobe {path}"').exit_code == 0: - return True - except SysCallError: - pass - return False - -def convert_device_to_uuid(path :str) -> str: - device_name, bind_name = split_bind_name(path) - - for i in range(storage['DISK_RETRY_ATTEMPTS']): - partprobe(device_name) - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) # TODO: Remove, we should be relying on blkid instead of lsblk - - # TODO: Convert lsblk to blkid - # (lsblk supports BlockDev and Partition UUID grabbing, blkid requires you to pick PTUUID and PARTUUID) - output = json.loads(SysCommand(f"lsblk --json -o+UUID {device_name}").decode('UTF-8')) - - for device in output['blockdevices']: - if (dev_uuid := device.get('uuid', None)): - return dev_uuid - - raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.") - - -def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool: - """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path) - Coded for clarity rather than performance - - Input parms: - :parm partition the partition we check - :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema - - :parm target (a string representing a mount path we want to check for. - :type str - - :parm strict if the check will be strict, target is exactly the mountpoint, or no, where the target is a leaf (f.i. to check if it is in /mnt/archinstall/). Not available for root check ('/') for obvious reasons - - """ - # we create the mountpoint list - if isinstance(partition,dict): - subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', []) - mountpoints = [partition.get('mountpoint')] - mountpoints += [volume.mountpoint for volume in subvolumes] - else: - mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes] - - # we check - if strict or target == '/': - if target in mountpoints: - return True - else: - return False - else: - for mp in mountpoints: - if mp and mp.endswith(target): - return True - return False diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py deleted file mode 100644 index bf1b3583..00000000 --- a/archinstall/lib/disk/mapperdev.py +++ /dev/null @@ -1,92 +0,0 @@ -import glob -import pathlib -import logging -import json -from dataclasses import dataclass -from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING - -from ..exceptions import SysCallError -from ..general import SysCommand -from ..output import log - -if TYPE_CHECKING: - from .btrfs import BtrfsSubvolumeInfo - -@dataclass -class MapperDev: - mappername :str - - @property - def name(self): - return self.mappername - - @property - def path(self): - return f"/dev/mapper/{self.mappername}" - - @property - def part_uuid(self): - return self.partition.part_uuid - - @property - def partition(self): - from .helpers import uevent, get_parent_of_partition - from .partition import Partition - from .blockdevice import BlockDevice - - for mapper in glob.glob('/dev/mapper/*'): - path_obj = pathlib.Path(mapper) - if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink(): - dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve() - - for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"): - partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name - - try: - uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode() - except SysCallError as error: - log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red") - - information = uevent(uevent_data) - block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME']))) - - return Partition(information['DEVNAME'], block_device=block_device) - - raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device") - - @property - def mountpoint(self) -> Optional[pathlib.Path]: - try: - data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) - for filesystem in data['filesystems']: - return pathlib.Path(filesystem.get('target')) - - except SysCallError as error: - # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow") - pass - - return None - - @property - def mountpoints(self) -> List[Dict[str, Any]]: - return [obj['target'] for obj in self.mount_information] - - @property - def mount_information(self) -> List[Dict[str, Any]]: - from .helpers import find_mountpoint - return [{**obj, 'target' : pathlib.Path(obj.get('target', '/dev/null'))} for obj in find_mountpoint(self.path)] - - @property - def filesystem(self) -> Optional[str]: - from .helpers import get_filesystem_type - return get_filesystem_type(self.path) - - @property - def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']: - from .btrfs import subvolume_info_from_path - - for mountpoint in self.mount_information: - if target := mountpoint.get('target'): - if subvolume := subvolume_info_from_path(pathlib.Path(target)): - yield subvolume diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py deleted file mode 100644 index 87eaa6a7..00000000 --- a/archinstall/lib/disk/partition.py +++ /dev/null @@ -1,661 +0,0 @@ -import glob -import time -import logging -import json -import os -import hashlib -import typing -from dataclasses import dataclass, field -from pathlib import Path -from typing import Optional, Dict, Any, List, Union, Iterator - -from .blockdevice import BlockDevice -from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name -from ..storage import storage -from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat -from ..output import log -from ..general import SysCommand -from .btrfs.btrfs_helpers import subvolume_info_from_path -from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo - -@dataclass -class PartitionInfo: - partition_object: 'Partition' - device_path: str # This would be /dev/sda1 for instance - bootable: bool - size: float - sector_size: int - start: Optional[int] - end: Optional[int] - pttype: Optional[str] - filesystem_type: Optional[str] - partuuid: Optional[str] - uuid: Optional[str] - mountpoints: List[Path] = field(default_factory=list) - - def __post_init__(self): - if not all([self.partuuid, self.uuid]): - for i in range(storage['DISK_RETRY_ATTEMPTS']): - lsblk_info = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - try: - lsblk_info = json.loads(lsblk_info) - except json.decoder.JSONDecodeError: - log(f"Could not decode JSON: {lsblk_info}", fg="red", level=logging.ERROR) - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - if not (device := lsblk_info.get('blockdevices', [None])[0]): - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - self.partuuid = device.get('partuuid') - self.uuid = device.get('uuid') - - # Lets build a list of requirements that we would like - # to retry and build (stuff that can take time between partprobes) - requirements = [] - requirements.append(self.partuuid) - - # Unformatted partitions won't have a UUID - if lsblk_info.get('fstype') is not None: - requirements.append(self.uuid) - - if all(requirements): - break - - self.partition_object.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - def get_first_mountpoint(self) -> Optional[Path]: - if len(self.mountpoints) > 0: - return self.mountpoints[0] - return None - - -class Partition: - def __init__( - self, - path: str, - block_device: BlockDevice, - part_id :Optional[str] = None, - filesystem :Optional[str] = None, - mountpoint :Optional[str] = None, - encrypted :bool = False, - autodetect_filesystem :bool = True, - ): - if not part_id: - part_id = os.path.basename(path) - - if type(block_device) is str: - raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") - - self.block_device = block_device - self._path = path - self._part_id = part_id - self._target_mountpoint = mountpoint - self._encrypted = encrypted - self._wipe = False - self._type = 'primary' - - if mountpoint: - self.mount(mountpoint) - - try: - self._partition_info = self._fetch_information() - - if not autodetect_filesystem and filesystem: - self._partition_info.filesystem_type = filesystem - - if self._partition_info.filesystem_type == 'crypto_LUKS': - self._encrypted = True - except DiskError: - self._partition_info = None - - @typing.no_type_check # I hate doint this but I'm currently unsure where this is used. - def __lt__(self, left_comparitor :BlockDevice) -> bool: - if type(left_comparitor) == Partition: - left_comparitor = left_comparitor.path - else: - left_comparitor = str(left_comparitor) - - # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 - return self._path < left_comparitor - - def __repr__(self, *args :str, **kwargs :str) -> str: - mount_repr = '' - if self._partition_info: - if mountpoint := self._partition_info.get_first_mountpoint(): - mount_repr = f", mounted={mountpoint}" - elif self._target_mountpoint: - mount_repr = f", rel_mountpoint={self._target_mountpoint}" - - classname = self.__class__.__name__ - - if not self._partition_info: - return f'{classname}(path={self._path})' - elif self._encrypted: - return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})' - else: - return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})' - - def as_json(self) -> Dict[str, Any]: - """ - this is used for the table representation of the partition (see FormattedOutput) - """ - partition_info = { - 'type': self._type, - 'PARTUUID': self.part_uuid, - 'wipe': self._wipe, - 'boot': self.boot, - 'ESP': self.boot, - 'mountpoint': self._target_mountpoint, - 'encrypted': self._encrypted, - 'start': self.start, - 'size': self.end, - 'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown' - } - - return partition_info - - def __dump__(self) -> Dict[str, Any]: - # TODO remove this in favour of as_json - return { - 'type': self._type, - 'PARTUUID': self.part_uuid, - 'wipe': self._wipe, - 'boot': self.boot, - 'ESP': self.boot, - 'mountpoint': self._target_mountpoint, - 'encrypted': self._encrypted, - 'start': self.start, - 'size': self.end, - 'filesystem': { - 'format': self._partition_info.filesystem_type if self._partition_info else 'None' - } - } - - def _call_lsblk(self) -> Dict[str, Any]: - for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk - # This sleep might be overkill, but lsblk is known to - # work against a chaotic cache that can change during call - # causing no information to be returned (blkid is better) - # time.sleep(1) - - # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) - - try: - output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - except SysCallError as error: - # Get the output minus the message/info from lsblk if it returns a non-zero exit code. - output = error.worker.decode('UTF-8') - if '{' in output: - output = output[output.find('{'):] - - if output: - try: - lsblk_info = json.loads(output) - return lsblk_info - except json.decoder.JSONDecodeError: - log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR) - - raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk') - - def _call_sfdisk(self) -> Dict[str, Any]: - output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8') - - if output: - sfdisk_info = json.loads(output) - partitions = sfdisk_info.get('partitiontable', {}).get('partitions', []) - node = list(filter(lambda x: x['node'] == self._path, partitions)) - - if len(node) > 0: - return node[0] - - return {} - - raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk') - - def _fetch_information(self) -> PartitionInfo: - lsblk_info = self._call_lsblk() - sfdisk_info = self._call_sfdisk() - - if not (device := lsblk_info.get('blockdevices', [])): - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - # Grab the first (and only) block device in the list as we're targeting a specific partition - device = device[0] - - mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint] - bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' - - return PartitionInfo( - partition_object=self, - device_path=self._path, - pttype=device['pttype'], - partuuid=device['partuuid'], - uuid=device['uuid'], - sector_size=device['log-sec'], - size=convert_size_to_gb(device['size']), - start=sfdisk_info.get('start', None), - end=sfdisk_info.get('size', None), - bootable=bootable, - filesystem_type=device['fstype'], - mountpoints=mountpoints - ) - - @property - def target_mountpoint(self) -> Optional[str]: - return self._target_mountpoint - - @property - def path(self) -> str: - return self._path - - @property - def filesystem(self) -> str: - if self._partition_info: - return self._partition_info.filesystem_type - - @property - def mountpoint(self) -> Optional[Path]: - if len(self.mountpoints) > 0: - return self.mountpoints[0] - return None - - @property - def mountpoints(self) -> List[Path]: - if self._partition_info: - return self._partition_info.mountpoints - - @property - def sector_size(self) -> int: - if self._partition_info: - return self._partition_info.sector_size - - @property - def start(self) -> Optional[int]: - if self._partition_info: - return self._partition_info.start - - @property - def end(self) -> Optional[int]: - if self._partition_info: - return self._partition_info.end - - @property - def end_sectors(self) -> Optional[int]: - if self._partition_info: - start = self._partition_info.start - end = self._partition_info.end - if start and end: - return start + end - - @property - def size(self) -> Optional[float]: - if self._partition_info: - return self._partition_info.size - - @property - def boot(self) -> bool: - if self._partition_info: - return self._partition_info.bootable - - @property - def partition_type(self) -> Optional[str]: - if self._partition_info: - return self._partition_info.pttype - - @property - def part_uuid(self) -> str: - if self._partition_info: - return self._partition_info.partuuid - - @property - def uuid(self) -> Optional[str]: - """ - Returns the UUID as returned by lsblk for the **partition**. - This is more reliable than relying on /dev/disk/by-uuid as - it doesn't seam to be able to detect md raid partitions. - For bind mounts all the subvolumes share the same uuid - """ - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if not self.partprobe(): - raise DiskError(f"Could not perform partprobe on {self.device_path}") - - time.sleep(storage.get('DISK_TIMEOUTS', 1) * i) - - partuuid = self._safe_uuid - if partuuid: - return partuuid - - raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") - - @property - def _safe_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None - - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) - - try: - return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip() - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None - - log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}") - - @property - def _safe_part_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None - - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) - - try: - return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None - - log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}") - - if self._partition_info: - return self._partition_info.uuid - - @property - def encrypted(self) -> Union[bool, None]: - return self._encrypted - - @property - def parent(self) -> str: - return self.real_device - - @property - def real_device(self) -> str: - output = SysCommand('lsblk -J').decode('UTF-8') - - if output: - for blockdevice in json.loads(output)['blockdevices']: - if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): - return f"/dev/{parent}" - return self._path - - raise DiskError('Unable to get disk information for command "lsblk -J"') - - @property - def device_path(self) -> str: - """ for bind mounts returns the physical path of the partition - """ - device_path, bind_name = split_bind_name(self._path) - return device_path - - @property - def bind_name(self) -> str: - """ for bind mounts returns the bind name (subvolume path). - Returns none if this property does not exist - """ - device_path, bind_name = split_bind_name(self._path) - return bind_name - - @property - def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]: - from .helpers import findmnt - - def iterate_children_recursively(information): - for child in information.get('children', []): - if target := child.get('target'): - if child.get('fstype') == 'btrfs': - if subvolume := subvolume_info_from_path(Path(target)): - yield subvolume - - if child.get('children'): - for subchild in iterate_children_recursively(child): - yield subchild - - if self._partition_info.filesystem_type == 'btrfs': - for mountpoint in self._partition_info.mountpoints: - if result := findmnt(mountpoint): - for filesystem in result.get('filesystems', []): - if subvolume := subvolume_info_from_path(mountpoint): - yield subvolume - - for child in iterate_children_recursively(filesystem): - yield child - - def partprobe(self) -> bool: - try: - if self.block_device: - return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code - except SysCallError as error: - log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG) - - return False - - def detect_inner_filesystem(self, password :str) -> Optional[str]: - log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) - from ..luks import luks2 - - try: - with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device: - return unlocked_device.filesystem - except SysCallError: - pass - return None - - def has_content(self) -> bool: - fs_type = self._partition_info.filesystem_type - if not fs_type or "swap" in fs_type: - return False - - temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() - temporary_path = Path(temporary_mountpoint) - - temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0: - raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}') - - files = len(glob.glob(f"{temporary_mountpoint}/*")) - iterations = 0 - while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10: - time.sleep(1) - - temporary_path.rmdir() - - return True if files > 0 else False - - def encrypt(self, password: Optional[str] = None) -> str: - """ - A wrapper function for luks2() instances and the .encrypt() method of that instance. - """ - from ..luks import luks2 - - handle = luks2(self, None, None) - return handle.encrypt(self, password=password) - - def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool: - """ - Format can be given an overriding path, for instance /dev/null to test - the formatting functionality and in essence the support for the given filesystem. - """ - if filesystem is None: - filesystem = self._partition_info.filesystem_type - - if path is None: - path = self._path - - # This converts from fat32 -> vfat to unify filesystem names - filesystem = get_mount_fs_type(filesystem) - - # To avoid "unable to open /dev/x: No such file or directory" - start_wait = time.time() - while Path(path).exists() is False and time.time() - start_wait < 10: - time.sleep(0.025) - - if log_formatting: - log(f'Formatting {path} -> {filesystem}', level=logging.INFO) - - try: - if filesystem == 'btrfs': - options = ['-f'] + options - - mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8') - if mkfs and 'UUID:' not in mkfs: - raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'vfat': - options = ['-F32'] + options - log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}") - if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ext4': - options = ['-F'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ext2': - options = ['-F'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = 'ext2' - elif filesystem == 'xfs': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'f2fs': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ntfs3': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'crypto_LUKS': - # from ..luks import luks2 - # encrypted_partition = luks2(self, None, None) - # encrypted_partition.format(path) - self._partition_info.filesystem_type = filesystem - - else: - raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") - except SysCallError as error: - log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange") - if retry is True: - log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange") - time.sleep(storage.get('DISK_TIMEOUTS', 1)) - - return self.format(filesystem, path, log_formatting, options, retry=False) - - if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': - self._encrypted = True - else: - self._encrypted = False - - return True - - def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: - if data['name'] == name: - return parent - elif 'children' in data: - for child in data['children']: - if parent := self.find_parent_of(child, name, parent=data['name']): - return parent - - return None - - def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: - if not self._partition_info.get_first_mountpoint(): - log(f'Mounting {self} to {target}', level=logging.INFO) - - if not fs: - fs = self._partition_info.filesystem_type - - fs_type = get_mount_fs_type(fs) - - Path(target).mkdir(parents=True, exist_ok=True) - - if self.bind_name: - device_path = self.device_path - # TODO options should be better be a list than a string - if options: - options = f"{options},subvol={self.bind_name}" - else: - options = f"subvol={self.bind_name}" - else: - device_path = self._path - try: - if options: - mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}") - else: - mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} {device_path} {target}") - - # TODO: Should be redundant to check for exit_code - if mnt_handle.exit_code != 0: - raise DiskError(f"Could not mount {self._path} to {target} using options {options}") - except SysCallError as err: - raise err - - # Update the partition info since the mount info has changed after this call. - self._partition_info = self._fetch_information() - return True - - return False - - def unmount(self) -> bool: - SysCommand(f"/usr/bin/umount {self._path}") - - # Update the partition info since the mount info has changed after this call. - self._partition_info = self._fetch_information() - return True - - def filesystem_supported(self) -> bool: - """ - The support for a filesystem (this partition) is tested by calling - partition.format() with a path set to '/dev/null' which returns two exceptions: - 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported - 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type - """ - try: - self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False) - except (SysCallError, DiskError): - pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code - except UnknownFilesystemFormat as err: - raise err - return True - - -def get_mount_fs_type(fs :str) -> str: - if fs == 'ntfs': - return 'ntfs3' # Needed to use the Paragon R/W NTFS driver - elif fs == 'fat32': - return 'vfat' # This is the actual type used for fat32 mounting - return fs diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py new file mode 100644 index 00000000..fb1eb74b --- /dev/null +++ b/archinstall/lib/disk/partitioning_menu.py @@ -0,0 +1,429 @@ +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any, TYPE_CHECKING, List, Optional, Tuple +from dataclasses import dataclass + +from .device_model import ( + PartitionModification, FilesystemType, BDevice, + Size, Unit, PartitionType, PartitionFlag, + ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption +) +from ..hardware import SysInfo +from ..menu import Menu, ListManager, MenuSelection, TextInput +from ..output import FormattedOutput, warn +from .subvolume_menu import SubvolumeMenu + +if TYPE_CHECKING: + _: Any + + +@dataclass +class DefaultFreeSector: + start: Size + end: Size + + +class PartitioningList(ListManager): + """ + subclass of ListManager for the managing of user accounts + """ + def __init__(self, prompt: str, device: BDevice, device_partitions: List[PartitionModification]): + self._device = device + self._actions = { + 'create_new_partition': str(_('Create a new partition')), + 'suggest_partition_layout': str(_('Suggest partition layout')), + 'remove_added_partitions': str(_('Remove all newly added partitions')), + 'assign_mountpoint': str(_('Assign mountpoint')), + 'mark_formatting': str(_('Mark/Unmark to be formatted (wipes data)')), + 'mark_bootable': str(_('Mark/Unmark as bootable')), + 'set_filesystem': str(_('Change filesystem')), + 'btrfs_mark_compressed': str(_('Mark/Unmark as compressed')), # btrfs only + 'btrfs_mark_nodatacow': str(_('Mark/Unmark as nodatacow')), # btrfs only + 'btrfs_set_subvolumes': str(_('Set subvolumes')), # btrfs only + 'delete_partition': str(_('Delete partition')) + } + + display_actions = list(self._actions.values()) + super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:]) + + def selected_action_display(self, partition: PartitionModification) -> str: + return str(_('Partition')) + + def filter_options(self, selection: PartitionModification, options: List[str]) -> List[str]: + not_filter = [] + + # only display formatting if the partition exists already + if not selection.exists(): + not_filter += [self._actions['mark_formatting']] + else: + # only allow options if the existing partition + # was marked as formatting, otherwise we run into issues where + # 1. select a new fs -> potentially mark as wipe now + # 2. Switch back to old filesystem -> should unmark wipe now, but + # how do we know it was the original one? + not_filter += [ + self._actions['set_filesystem'], + self._actions['mark_bootable'], + self._actions['btrfs_mark_compressed'], + self._actions['btrfs_mark_nodatacow'], + self._actions['btrfs_set_subvolumes'] + ] + + # non btrfs partitions shouldn't get btrfs options + if selection.fs_type != FilesystemType.Btrfs: + not_filter += [ + self._actions['btrfs_mark_compressed'], + self._actions['btrfs_mark_nodatacow'], + self._actions['btrfs_set_subvolumes'] + ] + else: + not_filter += [self._actions['assign_mountpoint']] + + return [o for o in options if o not in not_filter] + + def handle_action( + self, + action: str, + entry: Optional[PartitionModification], + data: List[PartitionModification] + ) -> List[PartitionModification]: + action_key = [k for k, v in self._actions.items() if v == action][0] + + match action_key: + case 'create_new_partition': + new_partition = self._create_new_partition() + data += [new_partition] + case 'suggest_partition_layout': + new_partitions = self._suggest_partition_layout(data) + if len(new_partitions) > 0: + data = new_partitions + case 'remove_added_partitions': + choice = self._reset_confirmation() + if choice.value == Menu.yes(): + data = [part for part in data if part.is_exists_or_modify()] + case 'assign_mountpoint' if entry: + entry.mountpoint = self._prompt_mountpoint() + if entry.mountpoint == Path('/boot'): + entry.set_flag(PartitionFlag.Boot) + if SysInfo.has_uefi(): + entry.set_flag(PartitionFlag.ESP) + case 'mark_formatting' if entry: + self._prompt_formatting(entry) + case 'mark_bootable' if entry: + entry.invert_flag(PartitionFlag.Boot) + if SysInfo.has_uefi(): + entry.invert_flag(PartitionFlag.ESP) + case 'set_filesystem' if entry: + fs_type = self._prompt_partition_fs_type() + if fs_type: + entry.fs_type = fs_type + # btrfs subvolumes will define mountpoints + if fs_type == FilesystemType.Btrfs: + entry.mountpoint = None + case 'btrfs_mark_compressed' if entry: + self._toggle_mount_option(entry, BtrfsMountOption.compress) + case 'btrfs_mark_nodatacow' if entry: + self._toggle_mount_option(entry, BtrfsMountOption.nodatacow) + case 'btrfs_set_subvolumes' if entry: + self._set_btrfs_subvolumes(entry) + case 'delete_partition' if entry: + data = self._delete_partition(entry, data) + + return data + + def _delete_partition( + self, + entry: PartitionModification, + data: List[PartitionModification] + ) -> List[PartitionModification]: + if entry.is_exists_or_modify(): + entry.status = ModificationStatus.Delete + return data + else: + return [d for d in data if d != entry] + + def _toggle_mount_option( + self, + partition: PartitionModification, + option: BtrfsMountOption + ): + if option.value not in partition.mount_options: + if option == BtrfsMountOption.compress: + partition.mount_options = [ + o for o in partition.mount_options + if o != BtrfsMountOption.nodatacow.value + ] + + partition.mount_options = [ + o for o in partition.mount_options + if not o.startswith(BtrfsMountOption.compress.name) + ] + + partition.mount_options.append(option.value) + else: + partition.mount_options = [ + o for o in partition.mount_options if o != option.value + ] + + def _set_btrfs_subvolumes(self, partition: PartitionModification): + partition.btrfs_subvols = SubvolumeMenu( + _("Manage btrfs subvolumes for current partition"), + partition.btrfs_subvols + ).run() + + def _prompt_formatting(self, partition: PartitionModification): + # an existing partition can toggle between Exist or Modify + if partition.is_modify(): + partition.status = ModificationStatus.Exist + return + elif partition.exists(): + partition.status = ModificationStatus.Modify + + # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really + # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, + # it's safe to change the filesystem for this partition. + if partition.fs_type == FilesystemType.Crypto_luks: + prompt = str(_('This partition is currently encrypted, to format it a filesystem has to be specified')) + fs_type = self._prompt_partition_fs_type(prompt) + partition.fs_type = fs_type + + if fs_type == FilesystemType.Btrfs: + partition.mountpoint = None + + def _prompt_mountpoint(self) -> Path: + header = str(_('Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + '\n' + header += str(_('If mountpoint /boot is set, then the partition will also be marked as bootable.')) + '\n' + prompt = str(_('Mountpoint: ')) + + print(header) + + while True: + value = TextInput(prompt).run().strip() + + if value: + mountpoint = Path(value) + break + + return mountpoint + + def _prompt_partition_fs_type(self, prompt: str = '') -> FilesystemType: + options = {fs.value: fs for fs in FilesystemType if fs != FilesystemType.Crypto_luks} + + prompt = prompt + '\n' + str(_('Enter a desired filesystem type for the partition')) + choice = Menu(prompt, options, sort=False, skip=False).run() + return options[choice.single_value] + + def _validate_value( + self, + sector_size: SectorSize, + total_size: Size, + text: str, + start: Optional[Size] + ) -> Optional[Size]: + match = re.match(r'([0-9]+)([a-zA-Z|%]*)', text, re.I) + + if match: + str_value, unit = match.groups() + + if unit == '%' and start: + available = total_size - start + value = int(available.value * (int(str_value) / 100)) + unit = available.unit.name + else: + value = int(str_value) + + if unit and unit not in Unit.get_all_units(): + return None + + unit = Unit[unit] if unit else Unit.sectors + return Size(value, unit, sector_size) + + return None + + def _enter_size( + self, + sector_size: SectorSize, + total_size: Size, + prompt: str, + default: Size, + start: Optional[Size], + ) -> Size: + while True: + value = TextInput(prompt).run().strip() + size: Optional[Size] = None + if not value: + size = default + else: + size = self._validate_value(sector_size, total_size, value, start) + + if size: + return size + + warn(f'Invalid value: {value}') + + def _prompt_size(self) -> Tuple[Size, Size]: + device_info = self._device.device_info + + text = str(_('Current free sectors on device {}:')).format(device_info.path) + '\n\n' + free_space_table = FormattedOutput.as_table(device_info.free_space_regions) + prompt = text + free_space_table + '\n' + + total_sectors = device_info.total_size.format_size(Unit.sectors, device_info.sector_size) + total_bytes = device_info.total_size.format_size(Unit.B) + + prompt += str(_('Total: {} / {}')).format(total_sectors, total_bytes) + '\n\n' + prompt += str(_('All entered values can be suffixed with a unit: %, B, KB, KiB, MB, MiB...')) + '\n' + prompt += str(_('If no unit is provided, the value is interpreted as sectors')) + '\n' + print(prompt) + + default_free_sector = self._find_default_free_space() + + if not default_free_sector: + default_free_sector = DefaultFreeSector( + Size(0, Unit.sectors, self._device.device_info.sector_size), + Size(0, Unit.sectors, self._device.device_info.sector_size) + ) + + # prompt until a valid start sector was entered + start_prompt = str(_('Enter start (default: sector {}): ')).format(default_free_sector.start.value) + + start_size = self._enter_size( + device_info.sector_size, + device_info.total_size, + start_prompt, + default_free_sector.start, + None + ) + + if start_size.value == default_free_sector.start.value and default_free_sector.end.value != 0: + end_size = default_free_sector.end + else: + end_size = device_info.total_size + + # prompt until valid end sector was entered + end_prompt = str(_('Enter end (default: {}): ')).format(end_size.as_text()) + end_size = self._enter_size( + device_info.sector_size, + device_info.total_size, + end_prompt, + end_size, + start_size + ) + + return start_size, end_size + + def _find_default_free_space(self) -> Optional[DefaultFreeSector]: + device_info = self._device.device_info + + largest_free_area: Optional[DeviceGeometry] = None + largest_deleted_area: Optional[PartitionModification] = None + + if len(device_info.free_space_regions) > 0: + largest_free_area = max(device_info.free_space_regions, key=lambda r: r.get_length()) + + deleted_partitions = list(filter(lambda x: x.status == ModificationStatus.Delete, self._data)) + if len(deleted_partitions) > 0: + largest_deleted_area = max(deleted_partitions, key=lambda p: p.length) + + def _free_space(space: DeviceGeometry) -> DefaultFreeSector: + start = Size(space.start, Unit.sectors, device_info.sector_size) + end = Size(space.end, Unit.sectors, device_info.sector_size) + return DefaultFreeSector(start, end) + + def _free_deleted(space: PartitionModification) -> DefaultFreeSector: + start = space.start.convert(Unit.sectors, self._device.device_info.sector_size) + end = space.end.convert(Unit.sectors, self._device.device_info.sector_size) + return DefaultFreeSector(start, end) + + if not largest_deleted_area and largest_free_area: + return _free_space(largest_free_area) + elif not largest_free_area and largest_deleted_area: + return _free_deleted(largest_deleted_area) + elif not largest_deleted_area and not largest_free_area: + return None + elif largest_free_area and largest_deleted_area: + free_space = _free_space(largest_free_area) + if free_space.start > largest_deleted_area.start: + return free_space + else: + return _free_deleted(largest_deleted_area) + + return None + + def _create_new_partition(self) -> PartitionModification: + fs_type = self._prompt_partition_fs_type() + + start_size, end_size = self._prompt_size() + length = end_size - start_size + + # new line for the next prompt + print() + + mountpoint = None + if fs_type != FilesystemType.Btrfs: + mountpoint = self._prompt_mountpoint() + + partition = PartitionModification( + status=ModificationStatus.Create, + type=PartitionType.Primary, + start=start_size, + length=length, + fs_type=fs_type, + mountpoint=mountpoint + ) + + if partition.mountpoint == Path('/boot'): + partition.set_flag(PartitionFlag.Boot) + if SysInfo.has_uefi(): + partition.set_flag(PartitionFlag.ESP) + + return partition + + def _reset_confirmation(self) -> MenuSelection: + prompt = str(_('This will remove all newly added partitions, continue?')) + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run() + return choice + + def _suggest_partition_layout(self, data: List[PartitionModification]) -> List[PartitionModification]: + # if modifications have been done already, inform the user + # that this operation will erase those modifications + if any([not entry.exists() for entry in data]): + choice = self._reset_confirmation() + if choice.value == Menu.no(): + return [] + + from ..interactions.disk_conf import suggest_single_disk_layout + + device_modification = suggest_single_disk_layout(self._device) + return device_modification.partitions + + +def manual_partitioning( + device: BDevice, + prompt: str = '', + preset: List[PartitionModification] = [] +) -> List[PartitionModification]: + if not prompt: + prompt = str(_('Partition management: {}')).format(device.device_info.path) + '\n' + prompt += str(_('Total length: {}')).format(device.device_info.total_size.format_size(Unit.MiB)) + + manual_preset = [] + + if not preset: + # we'll display the existing partitions of the device + for partition in device.partition_infos: + manual_preset.append( + PartitionModification.from_existing_partition(partition) + ) + else: + manual_preset = preset + + menu_list = PartitioningList(prompt, device, manual_preset) + partitions: List[PartitionModification] = menu_list.run() + + if menu_list.is_last_choice_cancel(): + return preset + + return partitions diff --git a/archinstall/lib/disk/subvolume_menu.py b/archinstall/lib/disk/subvolume_menu.py new file mode 100644 index 00000000..ea77149d --- /dev/null +++ b/archinstall/lib/disk/subvolume_menu.py @@ -0,0 +1,61 @@ +from pathlib import Path +from typing import List, Optional, Any, TYPE_CHECKING + +from .device_model import SubvolumeModification +from ..menu import TextInput, ListManager + +if TYPE_CHECKING: + _: Any + + +class SubvolumeMenu(ListManager): + def __init__(self, prompt: str, btrfs_subvols: List[SubvolumeModification]): + self._actions = [ + str(_('Add subvolume')), + str(_('Edit subvolume')), + str(_('Delete subvolume')) + ] + super().__init__(prompt, btrfs_subvols, [self._actions[0]], self._actions[1:]) + + def selected_action_display(self, subvolume: SubvolumeModification) -> str: + return str(subvolume.name) + + def _add_subvolume(self, editing: Optional[SubvolumeModification] = None) -> Optional[SubvolumeModification]: + name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run() + + if not name: + return None + + mountpoint = TextInput(f'{_("Subvolume mountpoint")}: ', str(editing.mountpoint) if editing else '').run() + + if not mountpoint: + return None + + return SubvolumeModification(Path(name), Path(mountpoint)) + + def handle_action( + self, + action: str, + entry: Optional[SubvolumeModification], + data: List[SubvolumeModification] + ) -> List[SubvolumeModification]: + if action == self._actions[0]: # add + new_subvolume = self._add_subvolume() + + if new_subvolume is not None: + # in case a user with the same username as an existing user + # was created we'll replace the existing one + data = [d for d in data if d.name != new_subvolume.name] + data += [new_subvolume] + elif entry is not None: + if action == self._actions[1]: # edit subvolume + new_subvolume = self._add_subvolume(entry) + + if new_subvolume is not None: + # we'll remove the original subvolume and add the modified version + data = [d for d in data if d.name != entry.name and d.name != new_subvolume.name] + data += [new_subvolume] + elif action == self._actions[2]: # delete + data = [d for d in data if d != entry] + + return data diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py deleted file mode 100644 index 5809c073..00000000 --- a/archinstall/lib/disk/user_guides.py +++ /dev/null @@ -1,240 +0,0 @@ -from __future__ import annotations -import logging -from typing import Optional, Dict, Any, List, TYPE_CHECKING - -# https://stackoverflow.com/a/39757388/929999 -from ..models.subvolume import Subvolume - -if TYPE_CHECKING: - from .blockdevice import BlockDevice - _: Any - -from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to -from ..hardware import has_uefi -from ..output import log -from ..menu import Menu - - -def suggest_single_disk_layout(block_device :BlockDevice, - default_filesystem :Optional[str] = None, - advanced_options :bool = False) -> Dict[str, Any]: - - if not default_filesystem: - from ..user_interaction import ask_for_main_filesystem_format - default_filesystem = ask_for_main_filesystem_format(advanced_options) - - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB - using_subvolumes = False - using_home_partition = False - compression = False - - if default_filesystem == 'btrfs': - prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - using_subvolumes = choice.value == Menu.yes() - - prompt = str(_('Would you like to use BTRFS compression?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - compression = choice.value == Menu.yes() - - layout = { - block_device.path : { - "wipe" : True, - "partitions" : [] - } - } - - # Used for reference: https://wiki.archlinux.org/title/partitioning - - # 2 MiB is unallocated for GRUB on BIOS. Potentially unneeded for - # other bootloaders? - - # TODO: On BIOS, /boot partition is only needed if the drive will - # be encrypted, otherwise it is not recommended. We should probably - # add a check for whether the drive will be encrypted or not. - layout[block_device.path]['partitions'].append({ - # Boot - "type" : "primary", - "start" : "3MiB", - "size" : "203MiB", - "boot" : True, - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/boot", - "filesystem" : { - "format" : "fat32" - } - }) - - # Increase the UEFI partition if UEFI is detected. - # Also re-align the start to 1MiB since we don't need the first sectors - # like we do in MBR layouts where the boot loader is installed traditionally. - if has_uefi(): - layout[block_device.path]['partitions'][-1]['start'] = '1MiB' - layout[block_device.path]['partitions'][-1]['size'] = '512MiB' - - layout[block_device.path]['partitions'].append({ - # Root - "type" : "primary", - "start" : "206MiB", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/" if not using_subvolumes else None, - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - - if has_uefi(): - layout[block_device.path]['partitions'][-1]['start'] = '513MiB' - - if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: - prompt = str(_('Would you like to create a separate partition for /home?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - using_home_partition = choice.value == Menu.yes() - - # Set a size for / (/root) - if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition: - # We'll use subvolumes - # Or the disk size is too small to allow for a separate /home - # Or the user doesn't want to create a separate partition for /home - layout[block_device.path]['partitions'][-1]['size'] = '100%' - else: - layout[block_device.path]['partitions'][-1]['size'] = f"{min(block_device.size, 20)}GiB" - - if default_filesystem == 'btrfs' and using_subvolumes: - # if input('Do you want to use a recommended structure? (Y/n): ').strip().lower() in ('', 'y', 'yes'): - # https://btrfs.wiki.kernel.org/index.php/FAQ - # https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash - # https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh - layout[block_device.path]['partitions'][1]['btrfs'] = { - 'subvolumes': [ - Subvolume('@', '/'), - Subvolume('@home', '/home'), - Subvolume('@log', '/var/log'), - Subvolume('@pkg', '/var/cache/pacman/pkg'), - Subvolume('@.snapshots', '/.snapshots') - ] - } - elif using_home_partition: - # If we don't want to use subvolumes, - # But we want to be able to re-use data between re-installs.. - # A second partition for /home would be nice if we have the space for it - layout[block_device.path]['partitions'].append({ - # Home - "type" : "primary", - "start" : f"{min(block_device.size, 20)}GiB", - "size" : "100%", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/home", - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - - return layout - - -def suggest_multi_disk_layout(block_devices :List[BlockDevice], default_filesystem :Optional[str] = None, advanced_options :bool = False): - - if not default_filesystem: - from ..user_interaction import ask_for_main_filesystem_format - default_filesystem = ask_for_main_filesystem_format(advanced_options) - - # Not really a rock solid foundation of information to stand on, but it's a start: - # https://www.reddit.com/r/btrfs/comments/m287gp/partition_strategy_for_two_physical_disks/ - # https://www.reddit.com/r/btrfs/comments/9us4hr/what_is_your_btrfs_partitionsubvolumes_scheme/ - - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB - ARCH_LINUX_INSTALLED_SIZE = 20 # GiB, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size? - - block_devices = sort_block_devices_based_on_performance(block_devices).keys() - - home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART) - root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device]) - - if home_device is None or root_device is None: - text = _('The selected drives do not have the minimum capacity required for an automatic suggestion\n') - text += _('Minimum capacity for /home partition: {}GB\n').format(MIN_SIZE_TO_ALLOW_HOME_PART) - text += _('Minimum capacity for Arch Linux partition: {}GB').format(ARCH_LINUX_INSTALLED_SIZE) - Menu(str(text), [str(_('Continue'))], skip=False).run() - return None - - compression = False - - if default_filesystem == 'btrfs': - # prompt = 'Would you like to use BTRFS subvolumes with a default structure?' - # choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() - # using_subvolumes = choice == 'yes' - - prompt = str(_('Would you like to use BTRFS compression?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - compression = choice.value == Menu.yes() - - log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG) - - layout = { - root_device.path : { - "wipe" : True, - "partitions" : [] - }, - home_device.path : { - "wipe" : True, - "partitions" : [] - }, - } - - # TODO: Same deal as with the single disk layout, we should - # probably check if the drive will be encrypted. - layout[root_device.path]['partitions'].append({ - # Boot - "type" : "primary", - "start" : "3MiB", - "size" : "203MiB", - "boot" : True, - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/boot", - "filesystem" : { - "format" : "fat32" - } - }) - - if has_uefi(): - layout[root_device.path]['partitions'][-1]['start'] = '1MiB' - layout[root_device.path]['partitions'][-1]['size'] = '512MiB' - - layout[root_device.path]['partitions'].append({ - # Root - "type" : "primary", - "start" : "206MiB", - "size" : "100%", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/", - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - if has_uefi(): - layout[root_device.path]['partitions'][-1]['start'] = '513MiB' - - layout[home_device.path]['partitions'].append({ - # Home - "type" : "primary", - "start" : "1MiB", - "size" : "100%", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/home", - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - - return layout diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py deleted file mode 100644 index 076a8ba2..00000000 --- a/archinstall/lib/disk/validators.py +++ /dev/null @@ -1,48 +0,0 @@ -from typing import List - -def valid_parted_position(pos :str) -> bool: - if not len(pos): - return False - - if pos.isdigit(): - return True - - pos_lower = pos.lower() - - if (pos_lower.endswith('b') or pos_lower.endswith('s')) and pos[:-1].isdigit(): - return True - - if any(pos_lower.endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit() - for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']): - return True - - return False - - -def fs_types() -> List[str]: - # https://www.gnu.org/software/parted/manual/html_node/mkpart.html - # Above link doesn't agree with `man parted` /mkpart documentation: - """ - fs-type can - be one of "btrfs", "ext2", - "ext3", "ext4", "fat16", - "fat32", "hfs", "hfs+", - "linux-swap", "ntfs", "reis‐ - erfs", "udf", or "xfs". - """ - return [ - "btrfs", - "ext2", - "ext3", "ext4", # `man parted` allows these - "fat16", "fat32", - "hfs", "hfs+", # "hfsx", not included in `man parted` - "linux-swap", - "ntfs", - "reiserfs", - "udf", # "ufs", not included in `man parted` - "xfs", # `man parted` allows this - ] - - -def valid_fs_type(fstype :str) -> bool: - return fstype.lower() in fs_types() |