From 00b0ae7ba439a5a420095175b3bedd52c569db51 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Wed, 19 Apr 2023 20:55:42 +1000 Subject: PyParted and a large rewrite of the underlying partitioning (#1604) * Invert mypy files * Add optional pre-commit hooks * New profile structure * Serialize profiles * Use profile instead of classmethod * Custom profile setup * Separator between back * Support profile import via url * Move profiles module * Refactor files * Remove symlink * Add user to docker group * Update schema description * Handle list services * mypy fixes * mypy fixes * Rename profilesv2 to profiles * flake8 * mypy again * Support selecting DM * Fix mypy * Cleanup * Update greeter setting * Update schema * Revert toml changes * Poc external dependencies * Dependency support * New encryption menu * flake8 * Mypy and flake8 * Unify lsblk command * Update bootloader configuration * Git hooks * Fix import * Pyparted * Remove custom font setting * flake8 * Remove default preview * Manual partitioning menu * Update structure * Disk configuration * Update filesystem * luks2 encryption * Everything works until installation * Btrfsutil * Btrfs handling * Update btrfs * Save encryption config * Fix pipewire issue * Update mypy version * Update all pre-commit * Update package versions * Revert audio/pipewire * Merge master PRs * Add master changes * Merge master changes * Small renaming * Pull master changes * Reset disk enc after disk config change * Generate locals * Update naming * Fix imports * Fix broken sync * Fix pre selection on table menu * Profile menu * Update profile * Fix post_install * Added python-pyparted to PKGBUILD, this requires [testing] to be enabled in order to run makepkg. Package still works via python -m build etc. * Swaped around some setuptools logic in pyproject Since we define `package-data` and `packages` there should be no need for: ``` [tool.setuptools.packages.find] where = ["archinstall", "archinstall.*"] ``` * Removed pyproject collisions. Duplicate definitions. * Made sure pyproject.toml includes languages * Add example and update README * Fix pyproject issues * Generate locale * Refactor imports * Simplify imports * Add profile description and package examples * Align code * Fix mypy * Simplify imports * Fix saving config * Fix wrong luks merge * Refactor installation * Fix cdrom device loading * Fix wrongly merged code * Fix imports and greeter * Don't terminate on partprobe error * Use specific path on partprobe from luks * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update github workflow to test archinstall installation * Update sway merge * Generate locales * Update workflow --------- Co-authored-by: Daniel Girtler Co-authored-by: Anton Hvornum Co-authored-by: Anton Hvornum Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> --- archinstall/lib/disk/__init__.py | 47 +- archinstall/lib/disk/blockdevice.py | 301 ------- archinstall/lib/disk/btrfs/__init__.py | 56 -- archinstall/lib/disk/btrfs/btrfs_helpers.py | 136 --- archinstall/lib/disk/btrfs/btrfspartition.py | 109 --- archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py | 192 ---- archinstall/lib/disk/device_handler.py | 599 +++++++++++++ archinstall/lib/disk/device_model.py | 1033 ++++++++++++++++++++++ archinstall/lib/disk/diskinfo.py | 40 - archinstall/lib/disk/dmcryptdev.py | 48 - archinstall/lib/disk/encryption.py | 174 ---- archinstall/lib/disk/encryption_menu.py | 179 ++++ archinstall/lib/disk/fido.py | 94 ++ archinstall/lib/disk/filesystem.py | 343 ++----- archinstall/lib/disk/helpers.py | 556 ------------ archinstall/lib/disk/mapperdev.py | 92 -- archinstall/lib/disk/partition.py | 661 -------------- archinstall/lib/disk/partitioning_menu.py | 335 +++++++ archinstall/lib/disk/subvolume_menu.py | 101 +++ archinstall/lib/disk/user_guides.py | 240 ----- archinstall/lib/disk/validators.py | 48 - 21 files changed, 2451 insertions(+), 2933 deletions(-) delete mode 100644 archinstall/lib/disk/blockdevice.py delete mode 100644 archinstall/lib/disk/btrfs/__init__.py delete mode 100644 archinstall/lib/disk/btrfs/btrfs_helpers.py delete mode 100644 archinstall/lib/disk/btrfs/btrfspartition.py delete mode 100644 archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py create mode 100644 archinstall/lib/disk/device_handler.py create mode 100644 archinstall/lib/disk/device_model.py delete mode 100644 archinstall/lib/disk/diskinfo.py delete mode 100644 archinstall/lib/disk/dmcryptdev.py delete mode 100644 archinstall/lib/disk/encryption.py create mode 100644 archinstall/lib/disk/encryption_menu.py create mode 100644 archinstall/lib/disk/fido.py delete mode 100644 archinstall/lib/disk/helpers.py delete mode 100644 archinstall/lib/disk/mapperdev.py delete mode 100644 archinstall/lib/disk/partition.py create mode 100644 archinstall/lib/disk/partitioning_menu.py create mode 100644 archinstall/lib/disk/subvolume_menu.py delete mode 100644 archinstall/lib/disk/user_guides.py delete mode 100644 archinstall/lib/disk/validators.py (limited to 'archinstall/lib/disk') diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py index 352d04b9..cdc96373 100644 --- a/archinstall/lib/disk/__init__.py +++ b/archinstall/lib/disk/__init__.py @@ -1,7 +1,40 @@ -from .btrfs import * -from .helpers import * -from .blockdevice import BlockDevice -from .filesystem import Filesystem, MBR, GPT -from .partition import * -from .user_guides import * -from .validators import * \ No newline at end of file +from .device_handler import device_handler, disk_layouts +from .fido import Fido2 +from .filesystem import FilesystemHandler +from .subvolume_menu import SubvolumeMenu +from .partitioning_menu import ( + manual_partitioning, + PartitioningList +) +from .device_model import ( + _DeviceInfo, + BDevice, + DiskLayoutType, + DiskLayoutConfiguration, + PartitionTable, + Unit, + Size, + SubvolumeModification, + DeviceGeometry, + PartitionType, + PartitionFlag, + FilesystemType, + ModificationStatus, + PartitionModification, + DeviceModification, + EncryptionType, + DiskEncryption, + Fido2Device, + LsblkInfo, + CleanType, + get_lsblk_info, + get_all_lsblk_info, + get_lsblk_by_mountpoint +) +from .encryption_menu import ( + select_encryption_type, + select_encrypted_password, + select_hsm, + select_partitions_to_encrypt, + DiskEncryptionMenu, +) diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py deleted file mode 100644 index 178b786a..00000000 --- a/archinstall/lib/disk/blockdevice.py +++ /dev/null @@ -1,301 +0,0 @@ -from __future__ import annotations -import json -import logging -import time - -from collections import OrderedDict -from dataclasses import dataclass -from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING - -from ..exceptions import DiskError, SysCallError -from ..output import log -from ..general import SysCommand -from ..storage import storage - - -if TYPE_CHECKING: - from .partition import Partition - _: Any - - -@dataclass -class BlockSizeInfo: - start: str - end: str - size: str - - -@dataclass -class BlockInfo: - pttype: str - ptuuid: str - size: int - tran: Optional[str] - rota: bool - free_space: Optional[List[BlockSizeInfo]] - - -class BlockDevice: - def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): - if not info: - from .helpers import all_blockdevices - # If we don't give any information, we need to auto-fill it. - # Otherwise any subsequent usage will break. - self.info = all_blockdevices(partitions=False)[path].info - else: - self.info = info - - self._path = path - self.keep_partitions = True - self._block_info = self._fetch_information() - self._partitions: Dict[str, 'Partition'] = {} - - self._load_partitions() - - # TODO: Currently disk encryption is a BIT misleading. - # It's actually partition-encryption, but for future-proofing this - # I'm placing the encryption password on a BlockDevice level. - - def __repr__(self, *args :str, **kwargs :str) -> str: - return self._str_repr - - @property - def path(self) -> str: - return self._path - - @property - def _str_repr(self) -> str: - return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})" - - def as_json(self) -> Dict[str, Any]: - return { - str(_('Device')): self._device_or_backfile, - str(_('Size')): f'{self.size}GB', - str(_('Free space')): f'{self._safe_free_space()}', - str(_('Bus-type')): f'{self.bus_type}' - } - - def __iter__(self) -> Iterator['Partition']: - for partition in self.partitions: - yield self.partitions[partition] - - def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: - if hasattr(self, key): - return getattr(self, key) - - if self.info and key in self.info: - return self.info[key] - - raise KeyError(f'{self.info} does not contain information: "{key}"') - - def __lt__(self, left_comparitor :'BlockDevice') -> bool: - return self._path < left_comparitor.path - - def json(self) -> str: - """ - json() has precedence over __dump__, so this is a way - to give less/partial information for user readability. - """ - return self._path - - def __dump__(self) -> Dict[str, Dict[str, Any]]: - return { - self._path: { - 'partuuid': self.uuid, - 'wipe': self.info.get('wipe', None), - 'partitions': [part.__dump__() for part in self.partitions.values()] - } - } - - def _call_lsblk(self, path: str) -> Dict[str, Any]: - output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8') - if output: - lsblk_info = json.loads(output) - return lsblk_info - - raise DiskError(f'Failed to read disk "{self.path}" with lsblk') - - def _load_partitions(self): - from .partition import Partition - - self._partitions.clear() - - lsblk_info = self._call_lsblk(self._path) - device = lsblk_info['blockdevices'][0] - self._partitions.clear() - - if children := device.get('children', None): - root = f'/dev/{device["name"]}' - for child in children: - part_id = child['name'].removeprefix(device['name']) - self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id) - - def _get_free_space(self) -> Optional[List[BlockSizeInfo]]: - # NOTE: parted -s will default to `cancel` on prompt, skipping any partition - # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, - # so the free will ignore the ESP partition and just give the "free" space. - # Doesn't harm us, but worth noting in case something weird happens. - try: - output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8') - if output: - free_lines = [line for line in output.split('\n') if 'free' in line] - sizes = [] - for free_space in free_lines: - _, start, end, size, *_ = free_space.strip('\r\n;').split(':') - sizes.append(BlockSizeInfo(start, end, size)) - - return sizes - except SysCallError as error: - log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG) - - return None - - def _fetch_information(self) -> BlockInfo: - lsblk_info = self._call_lsblk(self._path) - device = lsblk_info['blockdevices'][0] - free_space = self._get_free_space() - - return BlockInfo( - pttype=device['pttype'], - ptuuid=device['ptuuid'], - size=device['size'], - tran=device['tran'], - rota=device['rota'], - free_space=free_space - ) - - @property - def _device_or_backfile(self) -> Optional[str]: - """ - Returns the actual device-endpoint of the BlockDevice. - If it's a loop-back-device it returns the back-file, - For other types it return self.device - """ - if self.info.get('type') == 'loop': - return self.info['back-file'] - else: - return self.device - - @property - def mountpoint(self) -> None: - """ - A dummy function to enable transparent comparisons of mountpoints. - As blockdevices can't be mounted directly, this will always be None - """ - return None - - @property - def device(self) -> Optional[str]: - """ - Returns the device file of the BlockDevice. - If it's a loop-back-device it returns the /dev/X device, - If it's a ATA-drive it returns the /dev/X device - And if it's a crypto-device it returns the parent device - """ - if "DEVTYPE" not in self.info: - raise DiskError(f'Could not locate backplane info for "{self._path}"') - - if self.info['DEVTYPE'] in ['disk','loop']: - return self._path - elif self.info['DEVTYPE'][:4] == 'raid': - # This should catch /dev/md## raid devices - return self._path - elif self.info['DEVTYPE'] == 'crypt': - if 'pkname' not in self.info: - raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.') - return f"/dev/{self.info['pkname']}" - else: - log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG) - - return None - - @property - def partition_type(self) -> str: - return self._block_info.pttype - - @property - def uuid(self) -> str: - return self._block_info.ptuuid - - @property - def size(self) -> float: - from .helpers import convert_size_to_gb - return convert_size_to_gb(self._block_info.size) - - @property - def bus_type(self) -> Optional[str]: - return self._block_info.tran - - @property - def spinning(self) -> bool: - return self._block_info.rota - - @property - def partitions(self) -> Dict[str, 'Partition']: - return OrderedDict(sorted(self._partitions.items())) - - @property - def partition(self) -> List['Partition']: - return list(self.partitions.values()) - - @property - def first_free_sector(self) -> str: - if block_size := self._largest_free_space(): - return block_size.start - else: - return '512MB' - - @property - def first_end_sector(self) -> str: - if block_size := self._largest_free_space(): - return block_size.end - else: - return f"{self.size}GB" - - def _safe_free_space(self) -> str: - if self._block_info.free_space: - sizes = [free_space.size for free_space in self._block_info.free_space] - return '+'.join(sizes) - return '?' - - def _largest_free_space(self) -> Optional[BlockSizeInfo]: - if self._block_info.free_space: - sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True) - return sorted_sizes[0] - return None - - def _partprobe(self) -> bool: - return SysCommand(['partprobe', self._path]).exit_code == 0 - - def flush_cache(self) -> None: - self._load_partitions() - - def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition: - if not uuid and not partuuid: - raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.") - - log(f"Retrieving partition PARTUUID={partuuid} or UUID={uuid}", level=logging.DEBUG, fg="gray") - - for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)): - for partition_index, partition in self.partitions.items(): - try: - if uuid and partition.uuid and partition.uuid.lower() == uuid.lower(): - log(f"Matched UUID={uuid} against {partition.uuid}", level=logging.DEBUG, fg="gray") - return partition - elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower(): - log(f"Matched PARTUUID={partuuid} against {partition.part_uuid}", level=logging.DEBUG, fg="gray") - return partition - except DiskError as error: - # Most likely a blockdevice that doesn't support or use UUID's - # (like Microsoft recovery partition) - log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray") - pass - - log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) - self.flush_cache() - time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) - - log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) - log(f"Cache: {self._partitions}") - log(f"Partitions: {self.partitions.items()}") - raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.") diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py deleted file mode 100644 index a26e0160..00000000 --- a/archinstall/lib/disk/btrfs/__init__.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import annotations -import pathlib -import glob -import logging -from typing import Union, Dict, TYPE_CHECKING - -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from ...installer import Installer - -from .btrfs_helpers import ( - subvolume_info_from_path as subvolume_info_from_path, - find_parent_subvolume as find_parent_subvolume, - setup_subvolumes as setup_subvolumes, - mount_subvolume as mount_subvolume -) -from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume -from .btrfspartition import BTRFSPartition as BTRFSPartition - -from ...exceptions import DiskError, Deprecated -from ...general import SysCommand -from ...output import log - - -def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: - """ - This function uses btrfs to create a subvolume. - - @installation: archinstall.Installer instance - @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot - """ - - installation_mountpoint = installation.target - if type(installation_mountpoint) == str: - installation_mountpoint = pathlib.Path(installation_mountpoint) - # Set up the required physical structure - if type(subvolume_location) == str: - subvolume_location = pathlib.Path(subvolume_location) - - target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor) - - # Difference from mount_subvolume: - # We only check if the parent exists, since we'll run in to "target path already exists" otherwise - if not target.parent.exists(): - target.parent.mkdir(parents=True) - - if glob.glob(str(target / '*')): - raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)") - - # Remove the target if it exists - if target.exists(): - target.rmdir() - - log(f"Creating a subvolume on {target}", level=logging.INFO) - if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: - raise DiskError(f"Could not create a subvolume at {target}: {cmd}") diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py deleted file mode 100644 index f6d2734a..00000000 --- a/archinstall/lib/disk/btrfs/btrfs_helpers.py +++ /dev/null @@ -1,136 +0,0 @@ -import logging -import re -from pathlib import Path -from typing import Optional, Dict, Any, TYPE_CHECKING - -from ...models.subvolume import Subvolume -from ...exceptions import SysCallError, DiskError -from ...general import SysCommand -from ...output import log -from ...plugins import plugins -from ..helpers import get_mount_info -from .btrfssubvolumeinfo import BtrfsSubvolumeInfo - -if TYPE_CHECKING: - from .btrfspartition import BTRFSPartition - from ...installer import Installer - - -class fstab_btrfs_compression_plugin(): - def __init__(self, partition_dict): - self.partition_dict = partition_dict - - def on_genfstab(self, installation): - with open(f"{installation.target}/etc/fstab", 'r') as fh: - fstab = fh.read() - - # Replace the {installation}/etc/fstab with entries - # using the compress=zstd where the mountpoint has compression set. - with open(f"{installation.target}/etc/fstab", 'w') as fh: - for line in fstab.split('\n'): - # So first we grab the mount options by using subvol=.*? as a locator. - # And we also grab the mountpoint for the entry, for instance /var/log - if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)): - for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []): - # We then locate the correct subvolume and check if it's compressed - if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip(): - # We then sneak in the compress=zstd option if it doesn't already exist: - # We skip entries where compression is already defined - if ',compress=zstd,' not in line: - line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}") - break - - fh.write(f"{line}\n") - - return True - - -def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume): - # we normalize the subvolume name (getting rid of slash at the start if exists. - # In our implementation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = subvolume.name.lstrip('/') - mountpoint = Path(subvolume.mountpoint) - installation_target = Path(installation.target) - - mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) - mountpoint.mkdir(parents=True, exist_ok=True) - mount_options = subvolume.options + [f'subvol={name}'] - - log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") - SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") - - -def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]): - log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") - - for subvolume in partition_dict['btrfs']['subvolumes']: - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = subvolume.name.lstrip('/') - - # We create the subvolume using the BTRFSPartition instance. - # That way we ensure not only easy access, but also accurate mount locations etc. - partition_dict['device_instance'].create_subvolume(name, installation=installation) - - # Make the nodatacow processing now - # It will be the main cause of creation of subvolumes which are not to be mounted - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if subvolume.nodatacow: - if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - - # Make the compress processing now - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - # in this way only zstd compression is activaded - # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - - if subvolume.compress: - if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - - if 'fstab_btrfs_compression_plugin' not in plugins: - plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict) - - -def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]: - try: - subvolume_name = '' - result = {} - for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): - if index == 0: - subvolume_name = line.strip().decode('UTF-8') - continue - - if b':' in line: - key, value = line.strip().decode('UTF-8').split(':', 1) - - # A bit of a hack, until I figure out how @dataclass - # allows for hooking in a pre-processor to do this we have to do it here: - result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() - - return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore - except SysCallError as error: - log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange") - - return None - - -def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]: - # A root path cannot have a parent - if str(path) == '/': - return None - - if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters): - if not (subvolume := subvolume_info_from_path(found_mount['target'])): - if found_mount['target'] == '/': - return None - - return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']]) - - return subvolume - - return None diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py deleted file mode 100644 index d04c9b98..00000000 --- a/archinstall/lib/disk/btrfs/btrfspartition.py +++ /dev/null @@ -1,109 +0,0 @@ -import glob -import pathlib -import logging -from typing import Optional, TYPE_CHECKING - -from ...exceptions import DiskError -from ...storage import storage -from ...output import log -from ...general import SysCommand -from ..partition import Partition -from ..helpers import findmnt -from .btrfs_helpers import ( - subvolume_info_from_path -) - -if TYPE_CHECKING: - from ...installer import Installer - from .btrfssubvolumeinfo import BtrfsSubvolumeInfo - - -class BTRFSPartition(Partition): - def __init__(self, *args, **kwargs): - Partition.__init__(self, *args, **kwargs) - - @property - def subvolumes(self): - for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): - if '[' in filesystem.get('source', ''): - yield subvolume_info_from_path(filesystem['target']) - - def iterate_children(struct): - for c in struct.get('children', []): - if '[' in child.get('source', ''): - yield subvolume_info_from_path(c['target']) - - for sub_child in iterate_children(c): - yield sub_child - - for child in iterate_children(filesystem): - yield child - - def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo': - """ - Subvolumes have to be created within a mountpoint. - This means we need to get the current installation target. - After we get it, we need to verify it is a btrfs subvolume filesystem. - Finally, the destination must be empty. - """ - - # Allow users to override the installation session - if not installation: - installation = storage.get('installation_session') - - # Determain if the path given, is an absolute path or a relative path. - # We do this by checking if the path contains a known mountpoint. - if str(subvolume)[0] == '/': - if filesystems := findmnt(subvolume, traverse=True).get('filesystems'): - if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target): - # Path starts with a known mountpoint which isn't / - # Which means it's an absolute path to a mounted location. - pass - else: - # Since it's not an absolute position with a known start. - # We omit the anchor ('/' basically) and make sure it's appendable - # to the installation.target later - subvolume = subvolume.relative_to(subvolume.anchor) - # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is. - - # If the subvolume is not absolute, then we do two checks: - # 1. Check if the partition itself is mounted somewhere, and use that as a root - # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs - # If both above fail, we need to warn the user that such setup is not supported. - if str(subvolume)[0] != '/': - if self.mountpoint is None and installation is None: - raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.") - elif self.mountpoint: - subvolume = self.mountpoint / subvolume - elif installation: - ongoing_installation_destination = installation.target - if type(ongoing_installation_destination) == str: - ongoing_installation_destination = pathlib.Path(ongoing_installation_destination) - - subvolume = ongoing_installation_destination / subvolume - - subvolume.parent.mkdir(parents=True, exist_ok=True) - - # - - log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey") - - if glob.glob(str(subvolume / '*')): - raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)") - # Ideally we would like to check if the destination is already a subvolume. - # But then we would need the mount-point at this stage as well. - # So we'll comment out this check: - # elif subvolinfo := subvolume_info_from_path(subvolume): - # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") - - # And deal with it here: - SysCommand(f"btrfs subvolume create {subvolume}") - - return subvolume_info_from_path(subvolume) diff --git a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py deleted file mode 100644 index 5f5bdea6..00000000 --- a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py +++ /dev/null @@ -1,192 +0,0 @@ -import pathlib -import datetime -import logging -import string -import random -import shutil -from dataclasses import dataclass -from typing import Optional, List# , TYPE_CHECKING -from functools import cached_property - -# if TYPE_CHECKING: -# from ..blockdevice import BlockDevice - -from ...exceptions import DiskError -from ...general import SysCommand -from ...output import log -from ...storage import storage - - -@dataclass -class BtrfsSubvolumeInfo: - full_path :pathlib.Path - name :str - uuid :str - parent_uuid :str - creation_time :datetime.datetime - subvolume_id :int - generation :int - gen_at_creation :int - parent_id :int - top_level_id :int - send_transid :int - send_time :datetime.datetime - receive_transid :int - received_uuid :Optional[str] = None - flags :Optional[str] = None - receive_time :Optional[datetime.datetime] = None - snapshots :Optional[List] = None - - def __post_init__(self): - self.full_path = pathlib.Path(self.full_path) - - # Convert "-" entries to `None` - if self.parent_uuid == "-": - self.parent_uuid = None - if self.received_uuid == "-": - self.received_uuid = None - if self.flags == "-": - self.flags = None - if self.receive_time == "-": - self.receive_time = None - if self.snapshots == "": - self.snapshots = [] - - # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats) - self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time)) - self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time)) - if self.receive_time: - self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time)) - - @property - def parent_subvolume(self): - from .btrfs_helpers import find_parent_subvolume - - return find_parent_subvolume(self.full_path) - - @property - def root(self) -> bool: - from .btrfs_helpers import subvolume_info_from_path - - # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first - # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. - # It would also be nice if it could use findmnt(self.full_path) and traverse backwards - # finding the last occurrence of a subvolume which 'self' belongs to. - if volume := subvolume_info_from_path(storage['MOUNT_POINT']): - return self.full_path == volume.full_path - - return False - - @cached_property - def partition(self): - from ..helpers import findmnt, get_parent_of_partition, all_blockdevices - from ..partition import Partition - from ..blockdevice import BlockDevice - from ..mapperdev import MapperDev - from .btrfspartition import BTRFSPartition - from .btrfs_helpers import subvolume_info_from_path - - try: - # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device. - if filesystem := findmnt(self.full_path).get('filesystems', []): - if source := filesystem[0].get('source', None): - # Strip away subvolume definitions from findmnt - if '[' in source: - source = source[:source.find('[')] - - if filesystem[0].get('fstype', '') == 'btrfs': - return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - elif filesystem[0].get('source', '').startswith('/dev/mapper'): - return MapperDev(source) - else: - return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - except DiskError: - # Subvolume has never been mounted, we have no reliable way of finding where it is. - # But we have the UUID of the partition, and can begin looking for it by mounting - # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices. - - log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING) - for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items(): - if type(instance) in (Partition, MapperDev): - we_mounted_it = False - detection_mountpoint = instance.mountpoint - if not detection_mountpoint: - if type(instance) == Partition and instance.encrypted: - # TODO: Perhaps support unlocking encrypted volumes? - # This will cause a lot of potential user interactions tho. - log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG) - continue - - detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}") - detection_mountpoint.mkdir(parents=True, exist_ok=True) - - instance.mount(str(detection_mountpoint)) - we_mounted_it = True - - if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])): - if subvolume := subvolume_info_from_path(filesystem[0]['target']): - if subvolume.uuid == self.uuid: - # The top level subvolume matched of ourselves, - # which means the instance we're iterating has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - def iterate_children(struct): - for child in struct.get('children', []): - if '[' in child.get('source', ''): - yield subvolume_info_from_path(child['target']) - - for sub_child in iterate_children(child): - yield sub_child - - for child in iterate_children(filesystem[0]): - if child.uuid == self.uuid: - # We found a child within the instance that has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - if we_mounted_it: - instance.unmount() - shutil.rmtree(detection_mountpoint) - - @cached_property - def mount_options(self) -> Optional[List[str]]: - from ..helpers import findmnt - - if filesystem := findmnt(self.full_path).get('filesystems', []): - return filesystem[0].get('options').split(',') - - def convert_to_ISO_format(self, time_string): - time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '') - iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}" - return iso_string - - def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True): - from ..helpers import findmnt - - try: - if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False): - log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}") - SysCommand(f"umount {mountpoint}") - except DiskError: - # No previously mounted device at the mountpoint - pass - - if not options: - options = [] - - try: - if include_previously_known_options and (cached_options := self.mount_options): - options += cached_options - except DiskError: - pass - - if not any('subvol=' in x for x in options): - options += f'subvol={self.name}' - - SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}") - log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray") - - def unmount(self, recurse :bool = True): - SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") - log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py new file mode 100644 index 00000000..12cf18ea --- /dev/null +++ b/archinstall/lib/disk/device_handler.py @@ -0,0 +1,599 @@ +from __future__ import annotations + +import json +import logging +import os +import time +from pathlib import Path +from typing import List, Dict, Any, Optional, TYPE_CHECKING + +from parted import ( # type: ignore + Disk, Geometry, FileSystem, + PartitionException, DiskLabelException, + getAllDevices, freshDisk, Partition, +) + +from .device_model import ( + DeviceModification, PartitionModification, + BDevice, _DeviceInfo, _PartitionInfo, + FilesystemType, Unit, PartitionTable, + ModificationStatus, get_lsblk_info, LsblkInfo, + _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption +) + +from ..exceptions import DiskError, UnknownFilesystemFormat +from ..general import SysCommand, SysCallError, JSON +from ..luks import Luks2 +from ..output import log +from ..utils.util import is_subpath + +if TYPE_CHECKING: + _: Any + + +class DeviceHandler(object): + _TMP_BTRFS_MOUNT = Path('/mnt/arch_btrfs') + + def __init__(self): + self._devices: Dict[Path, BDevice] = {} + self.load_devices() + + @property + def devices(self) -> List[BDevice]: + return list(self._devices.values()) + + def load_devices(self): + block_devices = {} + + for device in getAllDevices(): + try: + disk = Disk(device) + except DiskLabelException as error: + if 'unrecognised disk label' in getattr(error, 'message', str(error)): + disk = freshDisk(device, PartitionTable.GPT.value) + else: + log(f'Unable to get disk from device: {device}', level=logging.DEBUG) + continue + + device_info = _DeviceInfo.from_disk(disk) + partition_infos = [] + + for partition in disk.partitions: + lsblk_info = get_lsblk_info(partition.path) + fs_type = self._determine_fs_type(partition, lsblk_info) + subvol_infos = [] + + if fs_type == FilesystemType.Btrfs: + subvol_infos = self.get_btrfs_info(partition.path) + + partition_infos.append( + _PartitionInfo.from_partition( + partition, + fs_type, + lsblk_info.partuuid, + lsblk_info.mountpoints, + subvol_infos + ) + ) + + block_device = BDevice(disk, device_info, partition_infos) + block_devices[block_device.device_info.path] = block_device + + self._devices = block_devices + + def _determine_fs_type( + self, + partition: Partition, + lsblk_info: Optional[LsblkInfo] = None + ) -> Optional[FilesystemType]: + try: + if partition.fileSystem: + return FilesystemType(partition.fileSystem.type) + elif lsblk_info is not None: + return FilesystemType(lsblk_info.fstype) if lsblk_info.fstype else None + return None + except ValueError: + log(f'Could not determine the filesystem: {partition.fileSystem}', level=logging.DEBUG) + + return None + + def get_device(self, path: Path) -> Optional[BDevice]: + return self._devices.get(path, None) + + def get_device_by_partition_path(self, partition_path: Path) -> Optional[BDevice]: + partition = self.find_partition(partition_path) + if partition: + return partition.disk.device + return None + + def find_partition(self, path: Path) -> Optional[_PartitionInfo]: + for device in self._devices.values(): + part = next(filter(lambda x: str(x.path) == str(path), device.partition_infos), None) + if part is not None: + return part + return None + + def get_uuid_for_path(self, path: Path) -> Optional[str]: + partition = self.find_partition(path) + return partition.partuuid if partition else None + + def get_btrfs_info(self, dev_path: Path) -> List[_BtrfsSubvolumeInfo]: + lsblk_info = get_lsblk_info(dev_path) + subvol_infos: List[_BtrfsSubvolumeInfo] = [] + + if not lsblk_info.mountpoint: + self.mount(dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) + mountpoint = self._TMP_BTRFS_MOUNT + else: + # when multiple subvolumes are mounted then the lsblk output may look like + # "mountpoint": "/mnt/archinstall/.snapshots" + # "mountpoints": ["/mnt/archinstall/.snapshots", "/mnt/archinstall/home", ..] + # so we'll determine the minimum common path and assume that's the root + path_strings = [str(m) for m in lsblk_info.mountpoints] + common_prefix = os.path.commonprefix(path_strings) + mountpoint = Path(common_prefix) + + try: + result = SysCommand(f'btrfs subvolume list {mountpoint}') + except SysCallError as err: + log(f'Failed to read btrfs subvolume information: {err}', level=logging.DEBUG) + return subvol_infos + + if result.exit_code == 0: + try: + if decoded := result.decode('utf-8'): + # ID 256 gen 16 top level 5 path @ + for line in decoded.splitlines(): + # expected output format: + # ID 257 gen 8 top level 5 path @home + name = Path(line.split(' ')[-1]) + sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None) + subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint)) + except json.decoder.JSONDecodeError as err: + log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR) + raise err + + if not lsblk_info.mountpoint: + self.umount(dev_path) + + return subvol_infos + + def _perform_formatting( + self, + fs_type: FilesystemType, + path: Path, + additional_parted_options: List[str] = [] + ): + options = [] + command = '' + + match fs_type: + case FilesystemType.Btrfs: + options += ['-f'] + command += 'mkfs.btrfs' + case FilesystemType.Fat16: + options += ['-F16'] + command += 'mkfs.fat' + case FilesystemType.Fat32: + options += ['-F32'] + command += 'mkfs.fat' + case FilesystemType.Ext2: + options += ['-F'] + command += 'mkfs.ext2' + case FilesystemType.Ext3: + options += ['-F'] + command += 'mkfs.ext3' + case FilesystemType.Ext4: + options += ['-F'] + command += 'mkfs.ext4' + case FilesystemType.Xfs: + options += ['-f'] + command += 'mkfs.xfs' + case FilesystemType.F2fs: + options += ['-f'] + command += 'mkfs.f2fs' + case FilesystemType.Ntfs: + options += ['-f', '-Q'] + command += 'mkfs.ntfs' + case FilesystemType.Reiserfs: + command += 'mkfs.reiserfs' + case _: + raise UnknownFilesystemFormat(f'Filetype "{fs_type.value}" is not supported') + + options += additional_parted_options + options_str = ' '.join(options) + + log(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') + + try: + if (handle := SysCommand(f"/usr/bin/{command} {options_str} {path}")).exit_code != 0: + mkfs_error = handle.decode() + raise DiskError(f'Could not format {path} with {fs_type.value}: {mkfs_error}') + except SysCallError as error: + msg = f'Could not format {path} with {fs_type.value}: {error.message}' + log(msg, fg='red') + raise DiskError(msg) from error + + def _perform_enc_formatting( + self, + dev_path: Path, + mapper_name: Optional[str], + fs_type: FilesystemType, + enc_conf: DiskEncryption + ): + luks_handler = Luks2( + dev_path, + mapper_name=mapper_name, + password=enc_conf.encryption_password + ) + + key_file = luks_handler.encrypt() + + log(f'Unlocking luks2 device: {dev_path}', level=logging.DEBUG) + luks_handler.unlock(key_file=key_file) + + if not luks_handler.mapper_dev: + raise DiskError('Failed to unlock luks device') + + log(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}', level=logging.INFO) + self._perform_formatting(fs_type, luks_handler.mapper_dev) + + log(f'luks2 locking device: {dev_path}', level=logging.INFO) + luks_handler.lock() + + def format( + self, + modification: DeviceModification, + enc_conf: Optional['DiskEncryption'] = None + ): + """ + Format can be given an overriding path, for instance /dev/null to test + the formatting functionality and in essence the support for the given filesystem. + """ + + # verify that all partitions have a path set (which implies that they have been created) + missing_path = next(filter(lambda x: x.dev_path is None, modification.partitions), None) + if missing_path is not None: + raise ValueError('When formatting, all partitions must have a path set') + + # crypto luks is not known to parted and can therefore not + # be used as a filesystem type in that sense; + invalid_fs_type = next(filter(lambda x: x.fs_type is FilesystemType.Crypto_luks, modification.partitions), None) + if invalid_fs_type is not None: + raise ValueError('Crypto luks cannot be set as a filesystem type') + + # make sure all devices are unmounted + self._umount_all_existing(modification) + + for part_mod in modification.partitions: + # partition will be encrypted + if enc_conf is not None and part_mod in enc_conf.partitions: + self._perform_enc_formatting( + part_mod.real_dev_path, + part_mod.mapper_name, + part_mod.fs_type, + enc_conf + ) + else: + self._perform_formatting(part_mod.fs_type, part_mod.real_dev_path) + + def _perform_partitioning( + self, + part_mod: PartitionModification, + block_device: BDevice, + disk: Disk, + requires_delete: bool + ): + # when we require a delete and the partition to be (re)created + # already exists then we have to delete it first + if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]: + log(f'Delete existing partition: {part_mod.real_dev_path}', level=logging.INFO) + part_info = self.find_partition(part_mod.real_dev_path) + + if not part_info: + raise DiskError(f'No partition for dev path found: {part_mod.real_dev_path}') + + disk.deletePartition(part_info.partition) + disk.commit() + + if part_mod.status == ModificationStatus.Delete: + return + + start_sector = part_mod.start.convert( + Unit.sectors, + block_device.device_info.sector_size + ) + + length_sector = part_mod.length.convert( + Unit.sectors, + block_device.device_info.sector_size + ) + + geometry = Geometry( + device=block_device.disk.device, + start=start_sector.value, + length=length_sector.value + ) + + filesystem = FileSystem(type=part_mod.fs_type.value, geometry=geometry) + + partition = Partition( + disk=disk, + type=part_mod.type.get_partition_code(), + fs=filesystem, + geometry=geometry + ) + + for flag in part_mod.flags: + partition.setFlag(flag.value) + + log(f'\tType: {part_mod.type.value}', level=logging.DEBUG) + log(f'\tFilesystem: {part_mod.fs_type.value}', level=logging.DEBUG) + log(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length', level=logging.DEBUG) + + try: + disk.addPartition(partition=partition, constraint=disk.device.optimalAlignedConstraint) + disk.commit() + + # the creation will take a bit of time + time.sleep(3) + + # the partition has a real path now as it was created + part_mod.dev_path = Path(partition.path) + + info = get_lsblk_info(part_mod.dev_path) + + if not info.partuuid: + raise DiskError(f'Unable to determine new partition uuid: {part_mod.dev_path}') + + part_mod.partuuid = info.partuuid + part_mod.uuid = info.uuid + except PartitionException as ex: + raise DiskError(f'Unable to add partition, most likely due to overlapping sectors: {ex}') from ex + + def create_btrfs_volumes( + self, + part_mod: PartitionModification, + enc_conf: Optional['DiskEncryption'] = None + ): + log(f'Creating subvolumes: {part_mod.real_dev_path}', level=logging.INFO) + + luks_handler = None + + # unlock the partition first if it's encrypted + if enc_conf is not None and part_mod in enc_conf.partitions: + if not part_mod.mapper_name: + raise ValueError('No device path specified for modification') + + luks_handler = self.unlock_luks2_dev( + part_mod.real_dev_path, + part_mod.mapper_name, + enc_conf.encryption_password + ) + + if not luks_handler.mapper_dev: + raise DiskError('Failed to unlock luks device') + + self.mount(luks_handler.mapper_dev, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) + else: + self.mount(part_mod.real_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) + + for sub_vol in part_mod.btrfs_subvols: + log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG) + + if luks_handler is not None: + subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name + else: + subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name + + SysCommand(f"btrfs subvolume create {subvol_path}") + + if sub_vol.nodatacow: + if (result := SysCommand(f'chattr +C {subvol_path}')).exit_code != 0: + raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {result.decode()}') + + if sub_vol.compress: + if (result := SysCommand(f'chattr +c {subvol_path}')).exit_code != 0: + raise DiskError(f'Could not set compress attribute at {subvol_path}: {result}') + + if luks_handler is not None and luks_handler.mapper_dev is not None: + self.umount(luks_handler.mapper_dev) + luks_handler.lock() + else: + self.umount(part_mod.real_dev_path) + + def unlock_luks2_dev(self, dev_path: Path, mapper_name: str, enc_password: str) -> Luks2: + luks_handler = Luks2(dev_path, mapper_name=mapper_name, password=enc_password) + + if not luks_handler.is_unlocked(): + luks_handler.unlock() + + if not luks_handler.is_unlocked(): + raise DiskError(f'Failed to unlock luks2 device: {dev_path}') + + return luks_handler + + def _umount_all_existing(self, modification: DeviceModification): + log(f'Unmounting all partitions: {modification.device_path}', level=logging.INFO) + + existing_partitions = self._devices[modification.device_path].partition_infos + + for partition in existing_partitions: + log(f'Unmounting: {partition.path}', level=logging.DEBUG) + + # un-mount for existing encrypted partitions + if partition.fs_type == FilesystemType.Crypto_luks: + Luks2(partition.path).lock() + else: + self.umount(partition.path, recursive=True) + + def partition( + self, + modification: DeviceModification, + partition_table: Optional[PartitionTable] = None + ): + """ + Create a partition table on the block device and create all partitions. + """ + if modification.wipe: + if partition_table is None: + raise ValueError('Modification is marked as wipe but no partitioning table was provided') + + if partition_table.MBR and len(modification.partitions) > 3: + raise DiskError('Too many partitions on disk, MBR disks can only have 3 primary partitions') + + # make sure all devices are unmounted + self._umount_all_existing(modification) + + # WARNING: the entire device will be wiped and all data lost + if modification.wipe: + self.wipe_dev(modification.device) + part_table = partition_table.value if partition_table else None + disk = freshDisk(modification.device.disk.device, part_table) + else: + log(f'Use existing device: {modification.device_path}') + disk = modification.device.disk + + log(f'Creating partitions: {modification.device_path}') + + # TODO sort by delete first + + for part_mod in modification.partitions: + # don't touch existing partitions + if part_mod.exists(): + continue + + # if the entire disk got nuked then we don't have to delete + # any existing partitions anymore because they're all gone already + requires_delete = modification.wipe is False + self._perform_partitioning(part_mod, modification.device, disk, requires_delete=requires_delete) + + self.partprobe(modification.device.device_info.path) + + def mount( + self, + dev_path: Path, + target_mountpoint: Path, + mount_fs: Optional[str] = None, + create_target_mountpoint: bool = True, + options: List[str] = [] + ): + if create_target_mountpoint and not target_mountpoint.exists(): + target_mountpoint.mkdir(parents=True, exist_ok=True) + + if not target_mountpoint.exists(): + raise ValueError('Target mountpoint does not exist') + + lsblk_info = get_lsblk_info(dev_path) + if target_mountpoint in lsblk_info.mountpoints: + log(f'Device already mounted at {target_mountpoint}') + return + + str_options = ','.join(options) + str_options = f'-o {str_options}' if str_options else '' + + mount_fs = f'-t {mount_fs}' if mount_fs else '' + + command = f'mount {mount_fs} {str_options} {dev_path} {target_mountpoint}' + + log(f'Mounting {dev_path}: command', level=logging.DEBUG) + + try: + result = SysCommand(command) + if result.exit_code != 0: + raise DiskError(f'Could not mount {dev_path}: {command}\n{result.decode()}') + except SysCallError as err: + raise DiskError(f'Could not mount {dev_path}: {command}\n{err.message}') + + def umount(self, mountpoint: Path, recursive: bool = False): + try: + lsblk_info = get_lsblk_info(mountpoint) + except SysCallError as ex: + # this could happen if before partitioning the device contained 3 partitions + # and after partitioning only 2 partitions were created, then the modifications object + # will have a reference to /dev/sX3 which is being tried to umount here now + if 'not a block device' in ex.message: + return + raise ex + + if len(lsblk_info.mountpoints) > 0: + log(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}', level=logging.DEBUG) + + for mountpoint in lsblk_info.mountpoints: + log(f'Unmounting mountpoint: {mountpoint}', level=logging.DEBUG) + + command = 'umount' + + if recursive: + command += ' -R' + + SysCommand(f'{command} {mountpoint}') + + def detect_pre_mounted_mods(self, base_mountpoint: Path) -> List[DeviceModification]: + part_mods: Dict[Path, List[PartitionModification]] = {} + + for device in self.devices: + for part_info in device.partition_infos: + for mountpoint in part_info.mountpoints: + if is_subpath(mountpoint, base_mountpoint): + path = Path(part_info.disk.device.path) + part_mods.setdefault(path, []) + part_mods[path].append(PartitionModification.from_existing_partition(part_info)) + break + + device_mods: List[DeviceModification] = [] + for device_path, mods in part_mods.items(): + device_mod = DeviceModification(self._devices[device_path], False, mods) + device_mods.append(device_mod) + + return device_mods + + def partprobe(self, path: Optional[Path] = None): + if path is not None: + command = f'partprobe {path}' + else: + command = 'partprobe' + + try: + result = SysCommand(command) + if result.exit_code != 0: + log(f'Error calling partprobe: {result.decode()}', level=logging.DEBUG) + raise DiskError(f'Could not perform partprobe on {path}: {result.decode()}') + except SysCallError as error: + log(f"partprobe experienced an error with {path}: {error}", level=logging.DEBUG) + + def _wipe(self, dev_path: Path): + """ + Wipe a device (partition or otherwise) of meta-data, be it file system, LVM, etc. + @param dev_path: Device path of the partition to be wiped. + @type dev_path: str + """ + with open(dev_path, 'wb') as p: + p.write(bytearray(1024)) + + def wipe_dev(self, block_device: BDevice): + """ + Wipe the block device of meta-data, be it file system, LVM, etc. + This is not intended to be secure, but rather to ensure that + auto-discovery tools don't recognize anything here. + """ + log(f'Wiping partitions and metadata: {block_device.device_info.path}') + for partition in block_device.partition_infos: + self._wipe(partition.path) + + self._wipe(block_device.device_info.path) + + +device_handler = DeviceHandler() + + +def disk_layouts() -> str: + try: + lsblk_info = get_all_lsblk_info() + return json.dumps(lsblk_info, indent=4, sort_keys=True, cls=JSON) + except SysCallError as err: + log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") + return '' + except json.decoder.JSONDecodeError as err: + log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") + return '' diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py new file mode 100644 index 00000000..0270a4dd --- /dev/null +++ b/archinstall/lib/disk/device_model.py @@ -0,0 +1,1033 @@ +from __future__ import annotations + +import dataclasses +import json +import logging +import math +import time +import uuid +from dataclasses import dataclass, field +from enum import Enum +from enum import auto +from pathlib import Path +from typing import Optional, List, Dict, TYPE_CHECKING, Any +from typing import Union + +import parted # type: ignore +from parted import Disk, Geometry, Partition + +from ..exceptions import DiskError, SysCallError +from ..general import SysCommand +from ..output import log +from ..storage import storage + +if TYPE_CHECKING: + _: Any + + +class DiskLayoutType(Enum): + Default = 'default_layout' + Manual = 'manual_partitioning' + Pre_mount = 'pre_mounted_config' + + def display_msg(self) -> str: + match self: + case DiskLayoutType.Default: return str(_('Use a best-effort default partition layout')) + case DiskLayoutType.Manual: return str(_('Manual Partitioning')) + case DiskLayoutType.Pre_mount: return str(_('Pre-mounted configuration')) + + +@dataclass +class DiskLayoutConfiguration: + config_type: DiskLayoutType + device_modifications: List[DeviceModification] = field(default_factory=list) + # used for pre-mounted config + relative_mountpoint: Optional[Path] = None + + def __post_init__(self): + if self.config_type == DiskLayoutType.Pre_mount and self.relative_mountpoint is None: + raise ValueError('Must set a relative mountpoint when layout type is pre-mount"') + + def __dump__(self) -> Dict[str, Any]: + return { + 'config_type': self.config_type.value, + 'device_modifications': [mod.__dump__() for mod in self.device_modifications] + } + + @classmethod + def parse_arg(cls, disk_config: Dict[str, List[Dict[str, Any]]]) -> Optional[DiskLayoutConfiguration]: + from .device_handler import device_handler + + device_modifications: List[DeviceModification] = [] + config_type = disk_config.get('config_type', None) + + if not config_type: + raise ValueError('Missing disk layout configuration: config_type') + + config = DiskLayoutConfiguration( + config_type=DiskLayoutType(config_type), + device_modifications=device_modifications + ) + + for entry in disk_config.get('device_modifications', []): + device_path = Path(entry.get('device', None)) if entry.get('device', None) else None + + if not device_path: + continue + + device = device_handler.get_device(device_path) + + if not device: + continue + + device_modification = DeviceModification( + wipe=entry.get('wipe', False), + device=device + ) + + device_partitions: List[PartitionModification] = [] + + for partition in entry.get('partitions', []): + device_partition = PartitionModification( + status=ModificationStatus(partition['status']), + fs_type=FilesystemType(partition['fs_type']), + start=Size.parse_args(partition['start']), + length=Size.parse_args(partition['length']), + mount_options=partition['mount_options'], + mountpoint=Path(partition['mountpoint']) if partition['mountpoint'] else None, + type=PartitionType(partition['type']), + flags=[PartitionFlag[f] for f in partition.get('flags', [])], + btrfs_subvols=SubvolumeModification.parse_args(partition.get('btrfs', [])), + ) + # special 'invisible attr to internally identify the part mod + setattr(device_partition, '_obj_id', partition['obj_id']) + device_partitions.append(device_partition) + + device_modification.partitions = device_partitions + device_modifications.append(device_modification) + + return config + + +class PartitionTable(Enum): + GPT = 'gpt' + MBR = 'msdos' + + +class Unit(Enum): + B = 1 # byte + kB = 1000**1 # kilobyte + MB = 1000**2 # megabyte + GB = 1000**3 # gigabyte + TB = 1000**4 # terabyte + PB = 1000**5 # petabyte + EB = 1000**6 # exabyte + ZB = 1000**7 # zettabyte + YB = 1000**8 # yottabyte + + KiB = 1024**1 # kibibyte + MiB = 1024**2 # mebibyte + GiB = 1024**3 # gibibyte + TiB = 1024**4 # tebibyte + PiB = 1024**5 # pebibyte + EiB = 1024**6 # exbibyte + ZiB = 1024**7 # zebibyte + YiB = 1024**8 # yobibyte + + sectors = 'sectors' # size in sector + + Percent = '%' # size in percentile + + +@dataclass +class Size: + value: int + unit: Unit + sector_size: Optional[Size] = None # only required when unit is sector + total_size: Optional[Size] = None # required when operating on percentages + + def __post_init__(self): + if self.unit == Unit.sectors and self.sector_size is None: + raise ValueError('Sector size is required when unit is sectors') + elif self.unit == Unit.Percent: + if self.value < 0 or self.value > 100: + raise ValueError('Percentage must be between 0 and 100') + elif self.total_size is None: + raise ValueError('Total size is required when unit is percentage') + + @property + def _total_size(self) -> Size: + """ + Save method to get the total size, mainly to satisfy mypy + This shouldn't happen as the Size object fails instantiation on missing total size + """ + if self.unit == Unit.Percent and self.total_size is None: + raise ValueError('Percent unit size must specify a total size') + return self.total_size # type: ignore + + def __dump__(self) -> Dict[str, Any]: + return { + 'value': self.value, + 'unit': self.unit.name, + 'sector_size': self.sector_size.__dump__() if self.sector_size else None, + 'total_size': self._total_size.__dump__() if self._total_size else None + } + + @classmethod + def parse_args(cls, size_arg: Dict[str, Any]) -> Size: + sector_size = size_arg['sector_size'] + total_size = size_arg['total_size'] + + return Size( + size_arg['value'], + Unit[size_arg['unit']], + Size.parse_args(sector_size) if sector_size else None, + Size.parse_args(total_size) if total_size else None + ) + + def convert( + self, + target_unit: Unit, + sector_size: Optional[Size] = None, + total_size: Optional[Size] = None + ) -> Size: + if target_unit == Unit.sectors and sector_size is None: + raise ValueError('If target has unit sector, a sector size must be provided') + + # not sure why we would ever wanna convert to percentages + if target_unit == Unit.Percent and total_size is None: + raise ValueError('Missing paramter total size to be able to convert to percentage') + + if self.unit == target_unit: + return self + elif self.unit == Unit.Percent: + amount = int(self._total_size._normalize() * (self.value / 100)) + return Size(amount, Unit.B) + elif self.unit == Unit.sectors: + norm = self._normalize() + return Size(norm, Unit.B).convert(target_unit, sector_size) + else: + if target_unit == Unit.sectors and sector_size is not None: + norm = self._normalize() + sectors = math.ceil(norm / sector_size.value) + return Size(sectors, Unit.sectors, sector_size) + else: + value = int(self._normalize() / target_unit.value) # type: ignore + return Size(value, target_unit) + + def format_size( + self, + target_unit: Unit, + sector_size: Optional[Size] = None + ) -> str: + if self.unit == Unit.Percent: + return f'{self.value}%' + else: + target_size = self.convert(target_unit, sector_size) + return f'{target_size.value} {target_unit.name}' + + def _normalize(self) -> int: + """ + will normalize the value of the unit to Byte + """ + if self.unit == Unit.Percent: + return self.convert(Unit.B).value + elif self.unit == Unit.sectors and self.sector_size is not None: + return self.value * self.sector_size._normalize() + return int(self.value * self.unit.value) # type: ignore + + def __sub__(self, other: Size) -> Size: + src_norm = self._normalize() + dest_norm = other._normalize() + return Size(abs(src_norm - dest_norm), Unit.B) + + def __lt__(self, other): + return self._normalize() < other._normalize() + + def __le__(self, other): + return self._normalize() <= other._normalize() + + def __eq__(self, other): + return self._normalize() == other._normalize() + + def __ne__(self, other): + return self._normalize() != other._normalize() + + def __gt__(self, other): + return self._normalize() > other._normalize() + + def __ge__(self, other): + return self._normalize() >= other._normalize() + + +@dataclass +class _BtrfsSubvolumeInfo: + name: Path + mountpoint: Optional[Path] + + +@dataclass +class _PartitionInfo: + partition: Partition + name: str + type: PartitionType + fs_type: FilesystemType + path: Path + start: Size + length: Size + flags: List[PartitionFlag] + partuuid: str + disk: Disk + mountpoints: List[Path] + btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list) + + def as_json(self) -> Dict[str, Any]: + info = { + 'Name': self.name, + 'Type': self.type.value, + 'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')), + 'Path': str(self.path), + 'Start': self.start.format_size(Unit.MiB), + 'Length': self.length.format_size(Unit.MiB), + 'Flags': ', '.join([f.name for f in self.flags]) + } + + if self.btrfs_subvol_infos: + info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes' + + return info + + @classmethod + def from_partition( + cls, + partition: Partition, + fs_type: FilesystemType, + partuuid: str, + mountpoints: List[Path], + btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = [] + ) -> _PartitionInfo: + partition_type = PartitionType.get_type_from_code(partition.type) + flags = [f for f in PartitionFlag if partition.getFlag(f.value)] + + start = Size( + partition.geometry.start, + Unit.sectors, + Size(partition.disk.device.sectorSize, Unit.B) + ) + + length = Size(int(partition.getLength(unit='B')), Unit.B) + + return _PartitionInfo( + partition=partition, + name=partition.get_name(), + type=partition_type, + fs_type=fs_type, + path=partition.path, + start=start, + length=length, + flags=flags, + partuuid=partuuid, + disk=partition.disk, + mountpoints=mountpoints, + btrfs_subvol_infos=btrfs_subvol_infos + ) + + +@dataclass +class _DeviceInfo: + model: str + path: Path + type: str + total_size: Size + free_space_regions: List[DeviceGeometry] + sector_size: Size + read_only: bool + dirty: bool + + def as_json(self) -> Dict[str, Any]: + total_free_space = sum([region.get_length(unit=Unit.MiB) for region in self.free_space_regions]) + return { + 'Model': self.model, + 'Path': str(self.path), + 'Type': self.type, + 'Size': self.total_size.format_size(Unit.MiB), + 'Free space': int(total_free_space), + 'Sector size': self.sector_size.value, + 'Read only': self.read_only + } + + @classmethod + def from_disk(cls, disk: Disk) -> _DeviceInfo: + device = disk.device + device_type = parted.devices[device.type] + + sector_size = Size(device.sectorSize, Unit.B) + free_space = [DeviceGeometry(g, sector_size) for g in disk.getFreeSpaceRegions()] + + return _DeviceInfo( + model=device.model.strip(), + path=Path(device.path), + type=device_type, + sector_size=sector_size, + total_size=Size(int(device.getLength(unit='B')), Unit.B), + free_space_regions=free_space, + read_only=device.readOnly, + dirty=device.dirty + ) + + +@dataclass +class SubvolumeModification: + name: Path + mountpoint: Optional[Path] = None + compress: bool = False + nodatacow: bool = False + + @classmethod + def from_existing_subvol_info(cls, info: _BtrfsSubvolumeInfo) -> SubvolumeModification: + return SubvolumeModification(info.name, mountpoint=info.mountpoint) + + @classmethod + def parse_args(cls, subvol_args: List[Dict[str, Any]]) -> List[SubvolumeModification]: + mods = [] + for entry in subvol_args: + if not entry.get('name', None) or not entry.get('mountpoint', None): + log(f'Subvolume arg is missing name: {entry}', level=logging.DEBUG) + continue + + mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None + + mods.append( + SubvolumeModification( + entry['name'], + mountpoint, + entry.get('compress', False), + entry.get('nodatacow', False) + ) + ) + + return mods + + @property + def mount_options(self) -> List[str]: + options = [] + options += ['compress'] if self.compress else [] + options += ['nodatacow'] if self.nodatacow else [] + return options + + @property + def relative_mountpoint(self) -> Path: + """ + Will return the relative path based on the anchor + e.g. Path('/mnt/test') -> Path('mnt/test') + """ + if self.mountpoint is not None: + return self.mountpoint.relative_to(self.mountpoint.anchor) + + raise ValueError('Mountpoint is not specified') + + def is_root(self, relative_mountpoint: Optional[Path] = None) -> bool: + if self.mountpoint: + if relative_mountpoint is not None: + return self.mountpoint.relative_to(relative_mountpoint) == Path('.') + return self.mountpoint == Path('/') + return False + + def __dump__(self) -> Dict[str, Any]: + return { + 'name': str(self.name), + 'mountpoint': str(self.mountpoint), + 'compress': self.compress, + 'nodatacow': self.nodatacow + } + + def as_json(self) -> Dict[str, Any]: + return { + 'name': str(self.name), + 'mountpoint': str(self.mountpoint), + 'compress': self.compress, + 'nodatacow': self.nodatacow + } + + +class DeviceGeometry: + def __init__(self, geometry: Geometry, sector_size: Size): + self._geometry = geometry + self._sector_size = sector_size + + @property + def start(self) -> int: + return self._geometry.start + + @property + def end(self) -> int: + return self._geometry.end + + def get_length(self, unit: Unit = Unit.sectors) -> int: + return self._geometry.getLength(unit.name) + + def as_json(self) -> Dict[str, Any]: + return { + 'Sector size': self._sector_size.value, + 'Start sector': self._geometry.start, + 'End sector': self._geometry.end, + 'Length': self._geometry.getLength() + } + + +@dataclass +class BDevice: + disk: Disk + device_info: _DeviceInfo + partition_infos: List[_PartitionInfo] + + def __hash__(self): + return hash(self.disk.device.path) + + +class PartitionType(Enum): + Boot = 'boot' + Primary = 'primary' + + @classmethod + def get_type_from_code(cls, code: int) -> PartitionType: + if code == parted.PARTITION_NORMAL: + return PartitionType.Primary + + raise DiskError(f'Partition code not supported: {code}') + + def get_partition_code(self) -> Optional[int]: + if self == PartitionType.Primary: + return parted.PARTITION_NORMAL + elif self == PartitionType.Boot: + return parted.PARTITION_BOOT + return None + + +class PartitionFlag(Enum): + Boot = 1 + + +class FilesystemType(Enum): + Btrfs = 'btrfs' + Ext2 = 'ext2' + Ext3 = 'ext3' + Ext4 = 'ext4' + F2fs = 'f2fs' + Fat16 = 'fat16' + Fat32 = 'fat32' + Ntfs = 'ntfs' + Reiserfs = 'reiserfs' + Xfs = 'xfs' + + # this is not a FS known to parted, so be careful + # with the usage from this enum + Crypto_luks = 'crypto_LUKS' + + def is_crypto(self) -> bool: + return self == FilesystemType.Crypto_luks + + @property + def fs_type_mount(self) -> str: + match self: + case FilesystemType.Ntfs: return 'ntfs3' + case FilesystemType.Fat32: return 'vfat' + case _: return self.value # type: ignore + + @property + def installation_pkg(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return 'btrfs-progs' + case FilesystemType.Xfs: return 'xfsprogs' + case FilesystemType.F2fs: return 'f2fs-tools' + case _: return None + + @property + def installation_module(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return 'btrfs' + case _: return None + + @property + def installation_binary(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return '/usr/bin/btrfs' + case _: return None + + @property + def installation_hooks(self) -> Optional[str]: + match self: + case FilesystemType.Btrfs: return 'btrfs' + case _: return None + + +class ModificationStatus(Enum): + Exist = 'existing' + Modify = 'modify' + Delete = 'delete' + Create = 'create' + + +@dataclass +class PartitionModification: + status: ModificationStatus + type: PartitionType + start: Size + length: Size + fs_type: FilesystemType + mountpoint: Optional[Path] = None + mount_options: List[str] = field(default_factory=list) + flags: List[PartitionFlag] = field(default_factory=list) + btrfs_subvols: List[SubvolumeModification] = field(default_factory=list) + + # only set if the device was created or exists + dev_path: Optional[Path] = None + partuuid: Optional[str] = None + uuid: Optional[str] = None + + def __post_init__(self): + # needed to use the object as a dictionary key due to hash func + if not hasattr(self, '_obj_id'): + self._obj_id = uuid.uuid4() + + if self.is_exists_or_modify() and not self.dev_path: + raise ValueError('If partition marked as existing a path must be set') + + def __hash__(self): + return hash(self._obj_id) + + @property + def obj_id(self) -> str: + if hasattr(self, '_obj_id'): + return str(self._obj_id) + return '' + + @property + def real_dev_path(self) -> Path: + if self.dev_path is None: + raise ValueError('Device path was not set') + return self.dev_path + + @classmethod + def from_existing_partition(cls, partition_info: _PartitionInfo) -> PartitionModification: + if partition_info.btrfs_subvol_infos: + mountpoint = None + subvol_mods = [] + for info in partition_info.btrfs_subvol_infos: + subvol_mods.append( + SubvolumeModification.from_existing_subvol_info(info) + ) + else: + mountpoint = partition_info.mountpoints[0] if partition_info.mountpoints else None + subvol_mods = [] + + return PartitionModification( + status=ModificationStatus.Exist, + type=partition_info.type, + start=partition_info.start, + length=partition_info.length, + fs_type=partition_info.fs_type, + dev_path=partition_info.path, + flags=partition_info.flags, + mountpoint=mountpoint, + btrfs_subvols=subvol_mods + ) + + @property + def relative_mountpoint(self) -> Path: + """ + Will return the relative path based on the anchor + e.g. Path('/mnt/test') -> Path('mnt/test') + """ + if self.mountpoint: + return self.mountpoint.relative_to(self.mountpoint.anchor) + + raise ValueError('Mountpoint is not specified') + + def is_boot(self) -> bool: + return PartitionFlag.Boot in self.flags + + def is_root(self, relative_mountpoint: Optional[Path] = None) -> bool: + if relative_mountpoint is not None and self.mountpoint is not None: + return self.mountpoint.relative_to(relative_mountpoint) == Path('.') + elif self.mountpoint is not None: + return Path('/') == self.mountpoint + else: + for subvol in self.btrfs_subvols: + if subvol.is_root(relative_mountpoint): + return True + + return False + + def is_modify(self) -> bool: + return self.status == ModificationStatus.Modify + + def exists(self) -> bool: + return self.status == ModificationStatus.Exist + + def is_exists_or_modify(self) -> bool: + return self.status in [ModificationStatus.Exist, ModificationStatus.Modify] + + @property + def mapper_name(self) -> Optional[str]: + if self.dev_path: + return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.dev_path.name}' + return None + + def set_flag(self, flag: PartitionFlag): + if flag not in self.flags: + self.flags.append(flag) + + def invert_flag(self, flag: PartitionFlag): + if flag in self.flags: + self.flags = [f for f in self.flags if f != flag] + else: + self.set_flag(flag) + + def json(self) -> Dict[str, Any]: + """ + Called for configuration settings + """ + return { + 'obj_id': self.obj_id, + 'status': self.status.value, + 'type': self.type.value, + 'start': self.start.__dump__(), + 'length': self.length.__dump__(), + 'fs_type': self.fs_type.value, + 'mountpoint': str(self.mountpoint) if self.mountpoint else None, + 'mount_options': self.mount_options, + 'flags': [f.name for f in self.flags], + 'btrfs': [vol.__dump__() for vol in self.btrfs_subvols] + } + + def as_json(self) -> Dict[str, Any]: + """ + Called for displaying data in table format + """ + info = { + 'Status': self.status.value, + 'Device': str(self.dev_path) if self.dev_path else '', + 'Type': self.type.value, + 'Start': self.start.format_size(Unit.MiB), + 'Length': self.length.format_size(Unit.MiB), + 'FS type': self.fs_type.value, + 'Mountpoint': self.mountpoint if self.mountpoint else '', + 'Mount options': ', '.join(self.mount_options), + 'Flags': ', '.join([f.name for f in self.flags]), + } + + if self.btrfs_subvols: + info['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes' + + return info + + +@dataclass +class DeviceModification: + device: BDevice + wipe: bool + partitions: List[PartitionModification] = field(default_factory=list) + + @property + def device_path(self) -> Path: + return self.device.device_info.path + + def add_partition(self, partition: PartitionModification): + self.partitions.append(partition) + + def get_boot_partition(self) -> Optional[PartitionModification]: + liltered = filter(lambda x: x.is_boot(), self.partitions) + return next(liltered, None) + + def get_root_partition(self, relative_path: Optional[Path]) -> Optional[PartitionModification]: + filtered = filter(lambda x: x.is_root(relative_path), self.partitions) + return next(filtered, None) + + def __dump__(self) -> Dict[str, Any]: + """ + Called when generating configuration files + """ + return { + 'device': str(self.device.device_info.path), + 'wipe': self.wipe, + 'partitions': [p.json() for p in self.partitions] + } + + +class EncryptionType(Enum): + NoEncryption = "no_encryption" + Partition = "partition" + + @classmethod + def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']: + return { + # str(_('Full disk encryption')): EncryptionType.FullDiskEncryption, + str(_('Partition encryption')): EncryptionType.Partition + } + + @classmethod + def text_to_type(cls, text: str) -> 'EncryptionType': + mapping = cls._encryption_type_mapper() + return mapping[text] + + @classmethod + def type_to_text(cls, type_: 'EncryptionType') -> str: + mapping = cls._encryption_type_mapper() + type_to_text = {type_: text for text, type_ in mapping.items()} + return type_to_text[type_] + + +@dataclass +class DiskEncryption: + encryption_type: EncryptionType = EncryptionType.Partition + encryption_password: str = '' + partitions: List[PartitionModification] = field(default_factory=list) + hsm_device: Optional[Fido2Device] = None + + def should_generate_encryption_file(self, part_mod: PartitionModification) -> bool: + return part_mod in self.partitions and part_mod.mountpoint != Path('/') + + def json(self) -> Dict[str, Any]: + obj: Dict[str, Any] = { + 'encryption_type': self.encryption_type.value, + 'partitions': [p.obj_id for p in self.partitions] + } + + if self.hsm_device: + obj['hsm_device'] = self.hsm_device.json() + + return obj + + @classmethod + def parse_arg( + cls, + disk_config: DiskLayoutConfiguration, + arg: Dict[str, Any], + password: str = '' + ) -> 'DiskEncryption': + enc_partitions = [] + for mod in disk_config.device_modifications: + for part in mod.partitions: + if part.obj_id in arg.get('partitions', []): + enc_partitions.append(part) + + enc = DiskEncryption( + EncryptionType(arg['encryption_type']), + password, + enc_partitions + ) + + if hsm := arg.get('hsm_device', None): + enc.hsm_device = Fido2Device.parse_arg(hsm) + + return enc + + +@dataclass +class Fido2Device: + path: Path + manufacturer: str + product: str + + def json(self) -> Dict[str, str]: + return { + 'path': str(self.path), + 'manufacturer': self.manufacturer, + 'product': self.product + } + + @classmethod + def parse_arg(cls, arg: Dict[str, str]) -> 'Fido2Device': + return Fido2Device( + Path(arg['path']), + arg['manufacturer'], + arg['product'] + ) + + +@dataclass +class LsblkInfo: + name: str = '' + path: Path = Path() + pkname: str = '' + size: Size = Size(0, Unit.B) + log_sec: int = 0 + pttype: str = '' + ptuuid: str = '' + rota: bool = False + tran: Optional[str] = None + partuuid: Optional[str] = None + uuid: Optional[str] = None + fstype: Optional[str] = None + fsver: Optional[str] = None + fsavail: Optional[str] = None + fsuse_percentage: Optional[str] = None + type: Optional[str] = None + mountpoint: Optional[Path] = None + mountpoints: List[Path] = field(default_factory=list) + fsroots: List[Path] = field(default_factory=list) + children: List[LsblkInfo] = field(default_factory=list) + + def json(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'path': str(self.path), + 'pkname': self.pkname, + 'size': self.size.format_size(Unit.MiB), + 'log_sec': self.log_sec, + 'pttype': self.pttype, + 'ptuuid': self.ptuuid, + 'rota': self.rota, + 'tran': self.tran, + 'partuuid': self.partuuid, + 'uuid': self.uuid, + 'fstype': self.fstype, + 'fsver': self.fsver, + 'fsavail': self.fsavail, + 'fsuse_percentage': self.fsuse_percentage, + 'type': self.type, + 'mountpoint': self.mountpoint, + 'mountpoints': [str(m) for m in self.mountpoints], + 'fsroots': [str(r) for r in self.fsroots], + 'children': [c.json() for c in self.children] + } + + @property + def btrfs_subvol_info(self) -> Dict[Path, Path]: + """ + It is assumed that lsblk will contain the fields as + + "mountpoints": ["/mnt/archinstall/log", "/mnt/archinstall/home", "/mnt/archinstall", ...] + "fsroots": ["/@log", "/@home", "/@"...] + + we'll thereby map the fsroot, which are the mounted filesystem roots + to the corresponding mountpoints + """ + return dict(zip(self.fsroots, self.mountpoints)) + + @classmethod + def exclude(cls) -> List[str]: + return ['children'] + + @classmethod + def fields(cls) -> List[str]: + return [f.name for f in dataclasses.fields(LsblkInfo) if f.name not in cls.exclude()] + + @classmethod + def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo: + info = cls() + + for f in cls.fields(): + lsblk_field = _clean_field(f, CleanType.Blockdevice) + data_field = _clean_field(f, CleanType.Dataclass) + + val: Any = None + if isinstance(getattr(info, data_field), Path): + val = Path(blockdevice[lsblk_field]) + elif isinstance(getattr(info, data_field), Size): + val = Size(blockdevice[lsblk_field], Unit.B) + else: + val = blockdevice[lsblk_field] + + setattr(info, data_field, val) + + info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])] + + # sometimes lsblk returns 'mountpoints': [null] + info.mountpoints = [Path(mnt) for mnt in info.mountpoints if mnt] + + fs_roots = [] + for r in info.fsroots: + if r: + path = Path(r) + # store the fsroot entries without the leading / + fs_roots.append(path.relative_to(path.anchor)) + info.fsroots = fs_roots + + return info + + +class CleanType(Enum): + Blockdevice = auto() + Dataclass = auto() + Lsblk = auto() + + +def _clean_field(name: str, clean_type: CleanType) -> str: + match clean_type: + case CleanType.Blockdevice: + return name.replace('_percentage', '%').replace('_', '-') + case CleanType.Dataclass: + return name.lower().replace('-', '_').replace('%', '_percentage') + case CleanType.Lsblk: + return name.replace('_percentage', '%').replace('_', '-') + + +def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int = 3) -> List[LsblkInfo]: + fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()] + lsblk_fields = ','.join(fields) + + if not dev_path: + dev_path = '' + + if retry == 0: + retry = 1 + + result = None + + for i in range(retry): + try: + result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}') + except SysCallError as error: + # Get the output minus the message/info from lsblk if it returns a non-zero exit code. + if error.worker: + err = error.worker.decode('UTF-8') + log(f'Error calling lsblk: {err}', level=logging.DEBUG) + time.sleep(1) + else: + raise error + + if result and result.exit_code == 0: + try: + if decoded := result.decode('utf-8'): + block_devices = json.loads(decoded) + blockdevices = block_devices['blockdevices'] + return [LsblkInfo.from_json(device) for device in blockdevices] + except json.decoder.JSONDecodeError as err: + log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR) + raise err + + raise DiskError(f'Failed to read disk "{dev_path}" with lsblk') + + +def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo: + if infos := _fetch_lsblk_info(dev_path): + return infos[0] + + raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"') + + +def get_all_lsblk_info() -> List[LsblkInfo]: + return _fetch_lsblk_info() + + +def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]: + def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]: + devices = [] + for entry in infos: + if as_prefix: + matches = [m for m in entry.mountpoints if str(m).startswith(str(mountpoint))] + if matches: + devices += [entry] + elif mountpoint in entry.mountpoints: + devices += [entry] + + if len(entry.children) > 0: + if len(match := _check(entry.children)) > 0: + devices += match + + return devices + + all_info = get_all_lsblk_info() + return _check(all_info) diff --git a/archinstall/lib/disk/diskinfo.py b/archinstall/lib/disk/diskinfo.py deleted file mode 100644 index b56ba282..00000000 --- a/archinstall/lib/disk/diskinfo.py +++ /dev/null @@ -1,40 +0,0 @@ -import dataclasses -import json -from dataclasses import dataclass, field -from typing import Optional, List - -from ..general import SysCommand -from ..exceptions import DiskError - -@dataclass -class LsblkInfo: - size: int = 0 - log_sec: int = 0 - pttype: Optional[str] = None - rota: bool = False - tran: Optional[str] = None - ptuuid: Optional[str] = None - partuuid: Optional[str] = None - uuid: Optional[str] = None - fstype: Optional[str] = None - type: Optional[str] = None - mountpoints: List[str] = field(default_factory=list) - - -def get_lsblk_info(dev_path: str) -> LsblkInfo: - fields = [f.name for f in dataclasses.fields(LsblkInfo)] - lsblk_fields = ','.join([f.upper().replace('_', '-') for f in fields]) - - output = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}').decode('UTF-8') - - if output: - block_devices = json.loads(output) - info = block_devices['blockdevices'][0] - lsblk_info = LsblkInfo() - - for f in fields: - setattr(lsblk_info, f, info[f.replace('_', '-')]) - - return lsblk_info - - raise DiskError(f'Failed to read disk "{dev_path}" with lsblk') diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py deleted file mode 100644 index 63392ffb..00000000 --- a/archinstall/lib/disk/dmcryptdev.py +++ /dev/null @@ -1,48 +0,0 @@ -import pathlib -import logging -import json -from dataclasses import dataclass -from typing import Optional -from ..exceptions import SysCallError -from ..general import SysCommand -from ..output import log -from .mapperdev import MapperDev - -@dataclass -class DMCryptDev: - dev_path :pathlib.Path - - @property - def name(self): - with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh: - return fh.read().strip() - - @property - def path(self): - return f"/dev/mapper/{self.dev_path}" - - @property - def blockdev(self): - pass - - @property - def MapperDev(self): - return MapperDev(mappername=self.name) - - @property - def mountpoint(self) -> Optional[str]: - try: - data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode()) - for filesystem in data['filesystems']: - return filesystem.get('target') - - except SysCallError as error: - # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow") - pass - - return None - - @property - def filesystem(self) -> Optional[str]: - return self.MapperDev.filesystem \ No newline at end of file diff --git a/archinstall/lib/disk/encryption.py b/archinstall/lib/disk/encryption.py deleted file mode 100644 index c7496bfa..00000000 --- a/archinstall/lib/disk/encryption.py +++ /dev/null @@ -1,174 +0,0 @@ -from typing import Dict, Optional, Any, TYPE_CHECKING, List - -from ..menu.abstract_menu import Selector, AbstractSubMenu -from ..menu.menu import MenuSelectionType -from ..menu.table_selection_menu import TableMenu -from ..models.disk_encryption import EncryptionType, DiskEncryption -from ..user_interaction.partitioning_conf import current_partition_layout -from ..user_interaction.utils import get_password -from ..menu import Menu -from ..general import secret -from ..hsm.fido import Fido2Device, Fido2 - -if TYPE_CHECKING: - _: Any - - -class DiskEncryptionMenu(AbstractSubMenu): - def __init__(self, data_store: Dict[str, Any], preset: Optional[DiskEncryption], disk_layouts: Dict[str, Any]): - if preset: - self._preset = preset - else: - self._preset = DiskEncryption() - - self._disk_layouts = disk_layouts - super().__init__(data_store=data_store) - - def _setup_selection_menu_options(self): - self._menu_options['encryption_password'] = \ - Selector( - _('Encryption password'), - lambda x: select_encrypted_password(), - display_func=lambda x: secret(x) if x else '', - default=self._preset.encryption_password, - enabled=True - ) - self._menu_options['encryption_type'] = \ - Selector( - _('Encryption type'), - func=lambda preset: select_encryption_type(preset), - display_func=lambda x: EncryptionType.type_to_text(x) if x else None, - dependencies=['encryption_password'], - default=self._preset.encryption_type, - enabled=True - ) - self._menu_options['partitions'] = \ - Selector( - _('Partitions'), - func=lambda preset: select_partitions_to_encrypt(self._disk_layouts, preset), - display_func=lambda x: f'{sum([len(y) for y in x.values()])} {_("Partitions")}' if x else None, - dependencies=['encryption_password'], - default=self._preset.partitions, - preview_func=self._prev_disk_layouts, - enabled=True - ) - self._menu_options['HSM'] = \ - Selector( - description=_('Use HSM to unlock encrypted drive'), - func=lambda preset: select_hsm(preset), - display_func=lambda x: self._display_hsm(x), - dependencies=['encryption_password'], - default=self._preset.hsm_device, - enabled=True - ) - - def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]: - super().run(allow_reset=allow_reset) - - if self._data_store.get('encryption_password', None): - return DiskEncryption( - encryption_password=self._data_store.get('encryption_password', None), - encryption_type=self._data_store['encryption_type'], - partitions=self._data_store.get('partitions', None), - hsm_device=self._data_store.get('HSM', None) - ) - - return None - - def _display_hsm(self, device: Optional[Fido2Device]) -> Optional[str]: - if device: - return device.manufacturer - - if not Fido2.get_fido2_devices(): - return str(_('No HSM devices available')) - return None - - def _prev_disk_layouts(self) -> Optional[str]: - selector = self._menu_options['partitions'] - if selector.has_selection(): - partitions: Dict[str, Any] = selector.current_selection - - all_partitions = [] - for parts in partitions.values(): - all_partitions += parts - - output = str(_('Partitions to be encrypted')) + '\n' - output += current_partition_layout(all_partitions, with_title=False) - return output.rstrip() - return None - - -def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]: - title = str(_('Select disk encryption option')) - options = [ - # _type_to_text(EncryptionType.FullDiskEncryption), - EncryptionType.type_to_text(EncryptionType.Partition) - ] - - preset_value = EncryptionType.type_to_text(preset) - choice = Menu(title, options, preset_values=preset_value).run() - - match choice.type_: - case MenuSelectionType.Reset: return None - case MenuSelectionType.Skip: return preset - case MenuSelectionType.Selection: return EncryptionType.text_to_type(choice.value) # type: ignore - - -def select_encrypted_password() -> Optional[str]: - if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))): - return passwd - return None - - -def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]: - title = _('Select a FIDO2 device to use for HSM') - fido_devices = Fido2.get_fido2_devices() - - if fido_devices: - choice = TableMenu(title, data=fido_devices).run() - match choice.type_: - case MenuSelectionType.Reset: - return None - case MenuSelectionType.Skip: - return preset - case MenuSelectionType.Selection: - return choice.value # type: ignore - - return None - - -def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: Dict[str, Any]) -> Dict[str, Any]: - # If no partitions was marked as encrypted, but a password was supplied and we have some disks to format.. - # Then we need to identify which partitions to encrypt. This will default to / (root). - all_partitions = [] - for blockdevice in disk_layouts.values(): - if partitions := blockdevice.get('partitions'): - partitions = [p for p in partitions if p['mountpoint'] != '/boot'] - all_partitions += partitions - - if all_partitions: - title = str(_('Select which partitions to encrypt')) - partition_table = current_partition_layout(all_partitions, with_title=False).strip() - - choice = TableMenu( - title, - table_data=(all_partitions, partition_table), - multi=True - ).run() - - match choice.type_: - case MenuSelectionType.Reset: - return {} - case MenuSelectionType.Skip: - return preset - case MenuSelectionType.Selection: - selections: List[Any] = choice.value # type: ignore - partitions = {} - - for path, device in disk_layouts.items(): - for part in selections: - if part in device.get('partitions', []): - partitions.setdefault(path, []).append(part) - - return partitions - return {} diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py new file mode 100644 index 00000000..285270fb --- /dev/null +++ b/archinstall/lib/disk/encryption_menu.py @@ -0,0 +1,179 @@ +from pathlib import Path +from typing import Dict, Optional, Any, TYPE_CHECKING, List + +from ..disk import ( + DeviceModification, + PartitionModification, + DiskEncryption, + EncryptionType +) +from ..menu import ( + Selector, + AbstractSubMenu, + MenuSelectionType, + TableMenu +) +from ..user_interaction.utils import get_password +from ..menu import Menu +from ..general import secret +from .fido import Fido2Device, Fido2 +from ..output import FormattedOutput + +if TYPE_CHECKING: + _: Any + + +class DiskEncryptionMenu(AbstractSubMenu): + def __init__( + self, + mods: List[DeviceModification], + data_store: Dict[str, Any], + preset: Optional[DiskEncryption] = None + ): + if preset: + self._preset = preset + else: + self._preset = DiskEncryption() + + self._modifications = mods + super().__init__(data_store=data_store) + + def setup_selection_menu_options(self): + self._menu_options['encryption_password'] = \ + Selector( + _('Encryption password'), + lambda x: select_encrypted_password(), + display_func=lambda x: secret(x) if x else '', + default=self._preset.encryption_password, + enabled=True + ) + self._menu_options['encryption_type'] = \ + Selector( + _('Encryption type'), + func=lambda preset: select_encryption_type(preset), + display_func=lambda x: EncryptionType.type_to_text(x) if x else None, + dependencies=['encryption_password'], + default=self._preset.encryption_type, + enabled=True + ) + self._menu_options['partitions'] = \ + Selector( + _('Partitions'), + func=lambda preset: select_partitions_to_encrypt(self._modifications.device_modifications, preset), + display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None, + dependencies=['encryption_password'], + default=self._preset.partitions, + preview_func=self._prev_disk_layouts, + enabled=True + ) + self._menu_options['HSM'] = \ + Selector( + description=_('Use HSM to unlock encrypted drive'), + func=lambda preset: select_hsm(preset), + display_func=lambda x: self._display_hsm(x), + dependencies=['encryption_password'], + default=self._preset.hsm_device, + enabled=True + ) + + def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]: + super().run(allow_reset=allow_reset) + + if self._data_store.get('encryption_password', None): + return DiskEncryption( + encryption_password=self._data_store.get('encryption_password', None), + encryption_type=self._data_store['encryption_type'], + partitions=self._data_store.get('partitions', None), + hsm_device=self._data_store.get('HSM', None) + ) + + return None + + def _display_hsm(self, device: Optional[Fido2Device]) -> Optional[str]: + if device: + return device.manufacturer + + if not Fido2.get_fido2_devices(): + return str(_('No HSM devices available')) + return None + + def _prev_disk_layouts(self) -> Optional[str]: + partitions: Optional[List[PartitionModification]] = self._menu_options['partitions'].current_selection + if partitions: + output = str(_('Partitions to be encrypted')) + '\n' + output += FormattedOutput.as_table(partitions) + return output.rstrip() + + return None + + +def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]: + title = str(_('Select disk encryption option')) + options = [ + EncryptionType.type_to_text(EncryptionType.Partition) + ] + + preset_value = EncryptionType.type_to_text(preset) + choice = Menu(title, options, preset_values=preset_value).run() + + match choice.type_: + case MenuSelectionType.Reset: return None + case MenuSelectionType.Skip: return preset + case MenuSelectionType.Selection: return EncryptionType.text_to_type(choice.value) # type: ignore + + +def select_encrypted_password() -> Optional[str]: + if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))): + return passwd + return None + + +def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]: + title = _('Select a FIDO2 device to use for HSM') + fido_devices = Fido2.get_fido2_devices() + + if fido_devices: + choice = TableMenu(title, data=fido_devices).run() + match choice.type_: + case MenuSelectionType.Reset: + return None + case MenuSelectionType.Skip: + return preset + case MenuSelectionType.Selection: + return choice.value # type: ignore + + return None + + +def select_partitions_to_encrypt( + modification: List[DeviceModification], + preset: List[PartitionModification] +) -> List[PartitionModification]: + partitions: List[PartitionModification] = [] + + # do not allow encrypting the boot partition + for mod in modification: + partitions += list(filter(lambda x: x.mountpoint != Path('/boot'), mod.partitions)) + + # do not allow encrypting existing partitions that are not marked as wipe + avail_partitions = list(filter(lambda x: not x.exists(), partitions)) + + if avail_partitions: + title = str(_('Select which partitions to encrypt')) + partition_table = FormattedOutput.as_table(avail_partitions) + + choice = TableMenu( + title, + table_data=(avail_partitions, partition_table), + preset=preset, + multi=True + ).run() + + match choice.type_: + case MenuSelectionType.Reset: + return [] + case MenuSelectionType.Skip: + return preset + case MenuSelectionType.Selection: + return choice.multi_value + return [] diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py new file mode 100644 index 00000000..436be4d4 --- /dev/null +++ b/archinstall/lib/disk/fido.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import getpass +import logging +from typing import List + +from .device_model import PartitionModification, Fido2Device +from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes +from ..output import log + + +class Fido2: + _loaded: bool = False + _fido2_devices: List[Fido2Device] = [] + + @classmethod + def get_fido2_devices(cls, reload: bool = False) -> List[Fido2Device]: + """ + Uses systemd-cryptenroll to list the FIDO2 devices + connected that supports FIDO2. + Some devices might show up in udevadm as FIDO2 compliant + when they are in fact not. + + The drawback of systemd-cryptenroll is that it uses human readable format. + That means we get this weird table like structure that is of no use. + + So we'll look for `MANUFACTURER` and `PRODUCT`, we take their index + and we split each line based on those positions. + + Output example: + + PATH MANUFACTURER PRODUCT + /dev/hidraw1 Yubico YubiKey OTP+FIDO+CCID + """ + + # to prevent continous reloading which will slow + # down moving the cursor in the menu + if not cls._loaded or reload: + ret = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8') + if not ret: + log('Unable to retrieve fido2 devices', level=logging.ERROR) + return [] + + fido_devices = clear_vt100_escape_codes(ret) + + manufacturer_pos = 0 + product_pos = 0 + devices = [] + + for line in fido_devices.split('\r\n'): + if '/dev' not in line: + manufacturer_pos = line.find('MANUFACTURER') + product_pos = line.find('PRODUCT') + continue + + path = line[:manufacturer_pos].rstrip() + manufacturer = line[manufacturer_pos:product_pos].rstrip() + product = line[product_pos:] + + devices.append( + Fido2Device(path, manufacturer, product) + ) + + cls._loaded = True + cls._fido2_devices = devices + + return cls._fido2_devices + + @classmethod + def fido2_enroll( + cls, + hsm_device: Fido2Device, + part_mod: PartitionModification, + password: str + ): + worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {part_mod.dev_path}", peek_output=True) + pw_inputted = False + pin_inputted = False + + while worker.is_alive(): + if pw_inputted is False: + if bytes(f"please enter current passphrase for disk {part_mod.dev_path}", 'UTF-8') in worker._trace_log.lower(): + worker.write(bytes(password, 'UTF-8')) + pw_inputted = True + elif pin_inputted is False: + if bytes(f"please enter security token pin", 'UTF-8') in worker._trace_log.lower(): + worker.write(bytes(getpass.getpass(" "), 'UTF-8')) + pin_inputted = True + + log( + f"You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds.", + level=logging.INFO, + fg="yellow" + ) diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 1083df53..6ea99340 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,301 +1,98 @@ from __future__ import annotations -import time -import logging -import json -import pathlib -from typing import Optional, Dict, Any, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -from ..models.disk_encryption import DiskEncryption -if TYPE_CHECKING: - from .blockdevice import BlockDevice - _: Any +import logging +import signal +import sys +import time +from typing import Any, Optional, TYPE_CHECKING -from .partition import Partition -from .validators import valid_fs_type -from ..exceptions import DiskError, SysCallError -from ..general import SysCommand +from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption +from .device_handler import device_handler +from ..hardware import has_uefi from ..output import log -from ..storage import storage - -GPT = 0b00000001 -MBR = 0b00000010 - -# A sane default is 5MiB, that allows for plenty of buffer for GRUB on MBR -# but also 4MiB for memory cards for instance. And another 1MiB to avoid issues. -# (we've been pestered by disk issues since the start, so please let this be here for a few versions) -DEFAULT_PARTITION_START = '5MiB' - -class Filesystem: - # TODO: - # When instance of a HDD is selected, check all usages and gracefully unmount them - # as well as close any crypto handles. - def __init__(self, blockdevice :BlockDevice, mode :int): - self.blockdevice = blockdevice - self.mode = mode - - def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': - return self - - def __repr__(self) -> str: - return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" +from ..menu import Menu - def __exit__(self, *args :str, **kwargs :str) -> bool: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - if len(args) >= 2 and args[1]: - raise args[1] - - SysCommand('sync') - return True - - def partuuid_to_index(self, uuid :str) -> Optional[int]: - for i in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - # We'll use unreliable lbslk to grab children under the /dev/ - output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8')) +if TYPE_CHECKING: + _: Any - for device in output['blockdevices']: - for index, partition in enumerate(device.get('children', [])): - # But we'll use blkid to reliably grab the PARTUUID for that child device (partition) - partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip() - if partition_uuid.lower() == uuid.lower(): - return index - raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") +class FilesystemHandler: + def __init__( + self, + disk_config: DiskLayoutConfiguration, + enc_conf: Optional[DiskEncryption] = None + ): + self._disk_config = disk_config + self._enc_config = enc_conf - def load_layout(self, layout :Dict[str, Any]) -> None: - from ..luks import luks2 - from .btrfs import BTRFSPartition + def perform_filesystem_operations(self, show_countdown: bool = True): + if self._disk_config.config_type == DiskLayoutType.Pre_mount: + log('Disk layout configuration is set to pre-mount, not performing any operations', level=logging.DEBUG) + return - # If the layout tells us to wipe the drive, we do so - if layout.get('wipe', False): - if self.mode == GPT: - if not self.parted_mklabel(self.blockdevice.device, "gpt"): - raise KeyError(f"Could not create a GPT label on {self}") - elif self.mode == MBR: - if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MS-DOS label on {self}") + device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications)) - self.blockdevice.flush_cache() - time.sleep(3) + if not device_mods: + log('No modifications required', level=logging.DEBUG) + return - prev_partition = None - # We then iterate the partitions in order - for partition in layout.get('partitions', []): - # We don't want to re-add an existing partition (those containing a UUID already) - if partition.get('wipe', False) and not partition.get('PARTUUID', None): - start = partition.get('start') or ( - prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START) - partition['device_instance'] = self.add_partition(partition.get('type', 'primary'), - start=start, - end=partition.get('size', '100%'), - partition_format=partition.get('filesystem', {}).get('format', 'btrfs'), - skip_mklabel=layout.get('wipe', False) is not False) + device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods]) - elif (partition_uuid := partition.get('PARTUUID')): - # We try to deal with both UUID and PARTUUID of a partition when it's being re-used. - # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID') - # but for now, lets just attempt to deal with both. - try: - partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid) - except DiskError: - partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid) + # Issue a final warning before we continue with something un-revertable. + # We mention the drive one last time, and count from 5 to 0. + print(str(_(' ! Formatting {} in ')).format(device_paths)) - log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray") - else: - log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING) - continue + if show_countdown: + self._do_countdown() - if partition.get('filesystem', {}).get('format', False): - # needed for backward compatibility with the introduction of the new "format_options" - format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[]) - disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption') + # Setup the blockdevice, filesystem (and optionally encryption). + # Once that's done, we'll hand over to perform_installation() + partition_table = PartitionTable.GPT + if has_uefi() is False: + partition_table = PartitionTable.MBR - if disk_encryption and partition in disk_encryption.all_partitions: - if not partition['device_instance']: - raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") + for mod in device_mods: + device_handler.partition(mod, partition_table=partition_table) + device_handler.format(mod, enc_conf=self._enc_config) - if partition.get('mountpoint',None): - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" - else: - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" + for part_mod in mod.partitions: + if part_mod.fs_type == FilesystemType.Btrfs: + device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config) - partition['device_instance'].encrypt(password=disk_encryption.encryption_password) - # Immediately unlock the encrypted device to format the inner volume - with luks2(partition['device_instance'], loopdev, disk_encryption.encryption_password, auto_unmount=True) as unlocked_device: - if not partition.get('wipe'): - if storage['arguments'] == 'silent': - raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}") - else: - if not partition.get('filesystem'): - partition['filesystem'] = {} + def _do_countdown(self) -> bool: + SIG_TRIGGER = False - if not partition['filesystem'].get('format', False): - while True: - partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip() - if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False: - log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")) - continue - break + def kill_handler(sig: int, frame: Any) -> None: + print() + exit(0) - unlocked_device.format(partition['filesystem']['format'], options=format_options) + def sig_handler(sig: int, frame: Any) -> None: + signal.signal(signal.SIGINT, kill_handler) - elif partition.get('wipe', False): - if not partition['device_instance']: - raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, sig_handler) - partition['device_instance'].format(partition['filesystem']['format'], options=format_options) + for i in range(5, 0, -1): + print(f"{i}", end='') - if partition['filesystem']['format'] == 'btrfs': - # We upgrade the device instance to a BTRFSPartition if we format it as such. - # This is so that we can gain access to more features than otherwise available in Partition() - partition['device_instance'] = BTRFSPartition( - partition['device_instance'].path, - block_device=partition['device_instance'].block_device, - encrypted=False, - filesystem='btrfs', - autodetect_filesystem=False - ) + for x in range(4): + sys.stdout.flush() + time.sleep(0.25) + print(".", end='') - if partition.get('boot', False): - log(f"Marking partition {partition['device_instance']} as bootable.") - self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on') + if SIG_TRIGGER: + prompt = _('Do you really want to abort?') + choice = Menu(prompt, Menu.yes_no(), skip=False).run() + if choice.value == Menu.yes(): + exit(0) - prev_partition = partition + if SIG_TRIGGER is False: + sys.stdin.read() - def find_partition(self, mountpoint :str) -> Partition: - for partition in self.blockdevice: - if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: - return partition + SIG_TRIGGER = False + signal.signal(signal.SIGINT, sig_handler) - def partprobe(self) -> bool: - try: - SysCommand(f'partprobe {self.blockdevice.device}') - except SysCallError as error: - log(f"Could not execute partprobe: {error!r}", level=logging.ERROR, fg="red") - raise DiskError(f"Could not run partprobe on {self.blockdevice.device}: {error!r}") + print() + signal.signal(signal.SIGINT, original_sigint_handler) return True - - def raw_parted(self, string: str) -> SysCommand: - try: - cmd_handle = SysCommand(f'/usr/bin/parted -s {string}') - time.sleep(0.5) - return cmd_handle - except SysCallError as error: - log(f"Parted ended with a bad exit code: {error.exit_code} ({error})", level=logging.ERROR, fg="red") - return error - - def parted(self, string: str) -> bool: - """ - Performs a parted execution of the given string - - :param string: A raw string passed to /usr/bin/parted -s - :type string: str - """ - if (parted_handle := self.raw_parted(string)).exit_code == 0: - return self.partprobe() - else: - raise DiskError(f"Parted failed to add a partition: {parted_handle}") - - def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: - # TODO: Implement this with declarative profiles instead. - raise ValueError("Installation().use_entire_disk() has to be re-worked.") - - def add_partition( - self, - partition_type :str, - start :str, - end :str, - partition_format :Optional[str] = None, - skip_mklabel :bool = False - ) -> Partition: - log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) - - if len(self.blockdevice.partitions) == 0 and skip_mklabel is False: - # If it's a completely empty drive, and we're about to add partitions to it - # we need to make sure there's a filesystem label. - if self.mode == GPT: - if not self.parted_mklabel(self.blockdevice.device, "gpt"): - raise KeyError(f"Could not create a GPT label on {self}") - elif self.mode == MBR: - if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MS-DOS label on {self}") - - self.blockdevice.flush_cache() - - previous_partuuids = [] - for partition in self.blockdevice.partitions.values(): - try: - previous_partuuids.append(partition.part_uuid) - except DiskError: - pass - - # TODO this check should probably run in the setup process rather than during the installation - if self.mode == MBR: - if len(self.blockdevice.partitions) > 3: - DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions") - - if partition_format: - parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}' - else: - parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}' - - log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG) - - if self.parted(parted_string): - for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)): - self.blockdevice.flush_cache() - - new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()] - new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids)) - - if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()): - try: - return self.blockdevice.get_partition(partuuid=new_partuuid) - except Exception as err: - log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red") - log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red") - log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red") - log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red") - log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red") - raise err - else: - log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG) - self.partprobe() - time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) - else: - print("Parted did not return True during partition creation") - - total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()]) - total_partitions.update(previous_partuuids) - - # TODO: This should never be able to happen - log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red") - log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red") - log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red") - - raise DiskError(f"Could not add partition using: {parted_string}") - - def set_name(self, partition: int, name: str) -> bool: - return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 - - def set(self, partition: int, string: str) -> bool: - log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) - return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 - - def parted_mklabel(self, device: str, disk_label: str) -> bool: - log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") - # Try to unmount devices before attempting to run mklabel - try: - SysCommand(f'bash -c "umount {device}?"') - except: - pass - - self.partprobe() - worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0 - self.partprobe() - - return worked diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py deleted file mode 100644 index 80d0cb53..00000000 --- a/archinstall/lib/disk/helpers.py +++ /dev/null @@ -1,556 +0,0 @@ -from __future__ import annotations -import json -import logging -import os # type: ignore -import pathlib -import re -import time -import glob - -from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -from .diskinfo import get_lsblk_info -from ..models.subvolume import Subvolume - -from .blockdevice import BlockDevice -from .dmcryptdev import DMCryptDev -from .mapperdev import MapperDev -from ..exceptions import SysCallError, DiskError -from ..general import SysCommand -from ..output import log -from ..storage import storage - -if TYPE_CHECKING: - from .partition import Partition - - -ROOT_DIR_PATTERN = re.compile('^.*?/devices') -GIGA = 2 ** 30 - -def convert_size_to_gb(size :Union[int, float]) -> float: - return round(size / GIGA,1) - -def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]: - result = {device: 0 for device in block_devices} - - for device, weight in result.items(): - if device.spinning: - weight -= 10 - else: - weight += 5 - - if device.bus_type == 'nvme': - weight += 20 - elif device.bus_type == 'sata': - weight += 10 - - result[device] = weight - - return result - -def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]: - for disk in devices: - if disk.size >= gigabytes: - yield disk - -def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: - if not filter_out: - filter_out = [] - - copy_devices = [*devices] - for filter_device in filter_out: - if filter_device in copy_devices: - copy_devices.pop(copy_devices.index(filter_device)) - - copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes)) - - if not len(copy_devices): - return None - - return max(copy_devices, key=(lambda device : device.size)) - -def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: - if not filter_out: - filter_out = [] - - copy_devices = [*devices] - for filter_device in filter_out: - if filter_device in copy_devices: - copy_devices.pop(copy_devices.index(filter_device)) - - if not len(copy_devices): - return None - - return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) - -def convert_to_gigabytes(string :str) -> float: - unit = string.strip()[-1] - size = float(string.strip()[:-1]) - - if unit == 'M': - size = size / 1024 - elif unit == 'T': - size = size * 1024 - - return size - -def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: - # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 - if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): - with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: - if f.read(1) == '1': - return - - path = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/block/{}'.format(name))) - hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire") - for bus in hotplug_buses: - if os.path.exists('/sys/bus/{}'.format(bus)): - for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)): - device_link = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus))) - if re.search(device_link, path): - return - return True - - -def cleanup_bash_escapes(data :str) -> str: - return data.replace(r'\ ', ' ') - -def blkid(cmd :str) -> Dict[str, Any]: - if '-o' in cmd and '-o export' not in cmd: - raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.") - elif '-o' not in cmd: - cmd += ' -o export' - - try: - raw_data = SysCommand(cmd).decode() - except SysCallError as error: - log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG) - raise error - - result = {} - # Process the raw result - devname = None - for line in raw_data.split('\r\n'): - if not len(line): - devname = None - continue - - key, val = line.split('=', 1) - if key.lower() == 'devname': - devname = val - # Lowercase for backwards compatibility with all_disks() previous use cases - result[devname] = { - "path": devname, - "PATH": devname - } - continue - - result[devname][key] = cleanup_bash_escapes(val) - - return result - -def get_loop_info(path :str) -> Dict[str, Any]: - for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: - if not drive['name'] == path: - continue - - return { - path: { - **drive, - 'type' : 'loop', - 'TYPE' : 'loop', - 'DEVTYPE' : 'loop', - 'PATH' : drive['name'], - 'path' : drive['name'] - } - } - - return {} - -def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]: - result = {} - for device_path, device_information in information.items(): - dev_name = pathlib.Path(device_information['PATH']).name - if not device_information.get('TYPE') or not device_information.get('DEVTYPE'): - with open(f"/sys/class/block/{dev_name}/uevent") as fh: - device_information.update(uevent(fh.read())) - - if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists(): - with dmcrypt_name.open('r') as fh: - device_information['DMCRYPT_NAME'] = fh.read().strip() - - result[device_path] = device_information - - return result - -def uevent(data :str) -> Dict[str, Any]: - information = {} - - for line in data.replace('\r\n', '\n').split('\n'): - if len((line := line.strip())): - key, val = line.split('=', 1) - information[key] = val - - return information - -def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]: - device_information = {} - with open(f"/sys/class/block/{dev_name}/uevent") as fh: - device_information.update(uevent(fh.read())) - - return { - f"/dev/{dev_name}" : { - **device_information, - 'path' : f'/dev/{dev_name}', - 'PATH' : f'/dev/{dev_name}', - 'PTTYPE' : None - } - } - - -def all_disks() -> List[BlockDevice]: - log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow") - return all_blockdevices(partitions=False, mappers=False) - -def get_blockdevice_info(device_path, exclude_iso_dev :bool = True) -> Dict[str, Any]: - for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']): - partprobe(device_path) - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) - - try: - if exclude_iso_dev: - # exclude all devices associated with the iso boot locations - iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt'] - - try: - lsblk_info = get_lsblk_info(device_path) - except DiskError: - continue - - if any([dev in lsblk_info.mountpoints for dev in iso_devs]): - continue - - information = blkid(f'blkid -p -o export {device_path}') - return enrich_blockdevice_information(information) - except SysCallError as ex: - if ex.exit_code == 2: - # Assume that it's a loop device, and try to get info on it - try: - resolved_device_name = device_path.readlink().name - except OSError: - resolved_device_name = device_path.name - - try: - information = get_loop_info(device_path) - if not information: - raise SysCallError(f"Could not get loop information for {resolved_device_name}", exit_code=1) - return enrich_blockdevice_information(information) - - except SysCallError: - information = get_blockdevice_uevent(resolved_device_name) - return enrich_blockdevice_information(information) - else: - # We could not reliably get any information, perhaps the disk is clean of information? - if retry_attempt == storage['DISK_RETRY_ATTEMPTS'] - 1: - raise ex - -def all_blockdevices( - mappers: bool = False, - partitions: bool = False, - error: bool = False, - exclude_iso_dev: bool = True -) -> Dict[str, Any]: - """ - Returns BlockDevice() and Partition() objects for all available devices. - """ - from .partition import Partition - - instances = {} - - # Due to lsblk being highly unreliable for this use case, - # we'll iterate the /sys/class definitions and find the information - # from there. - for block_device in glob.glob("/sys/class/block/*"): - try: - device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}") - except FileNotFoundError: - log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") - - if device_path.exists() is False: - log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") - continue - - information = get_blockdevice_info(device_path) - if not information: - continue - - for path, path_info in information.items(): - if path_info.get('DMCRYPT_NAME'): - instances[path] = DMCryptDev(dev_path=path) - elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'): - if partitions: - instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path)))) - elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': - instances[path] = BlockDevice(path, path_info) - elif path_info.get('TYPE') in ('squashfs', 'erofs'): - # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) - continue - else: - log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow") - - if mappers: - for block_device in glob.glob("/dev/mapper/*"): - if (pathobj := pathlib.Path(block_device)).is_symlink(): - instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name) - - return instances - - -def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path: - partition_name = path.name - pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve() - return f"/dev/{pci_device.parent.name}" - -def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: - collection = all_blockdevices(partitions=False) - for drive in collection: - if size and convert_to_gigabytes(collection[drive]['size']) != size: - continue - if model and (collection[drive]['model'] is None or collection[drive]['model'].lower() != model.lower()): - continue - - return collection[drive] - -def split_bind_name(path :Union[pathlib.Path, str]) -> list: - # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow") - # we check for the bind notation. if exist we'll only use the "true" device path - if '[' in str(path) : # is a bind path (btrfs subvolume path) - device_path, bind_path = str(path).split('[') - bind_path = bind_path[:-1].strip() # remove the ] - else: - device_path = path - bind_path = None - return device_path,bind_path - -def find_mountpoint(device_path :str) -> Dict[str, Any]: - try: - for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']: - yield filesystem - except SysCallError: - return {} - -def findmnt(path :pathlib.Path, traverse :bool = False, ignore :List = [], recurse :bool = True) -> Dict[str, Any]: - for traversal in list(map(str, [str(path)] + list(path.parents))): - if traversal in ignore: - continue - - try: - log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) - if (output := SysCommand(f"/usr/bin/findmnt --json {'--submounts' if recurse else ''} {traversal}").decode('UTF-8')): - return json.loads(output) - - except SysCallError as error: - log(f"Could not get mount information on {path} but continuing and ignoring: {error}", level=logging.INFO, fg="gray") - pass - - if not traverse: - break - - raise DiskError(f"Could not get mount information for path {path}") - - -def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False, ignore :List = []) -> Dict[str, Any]: - import traceback - - log(f"Deprecated: archinstall.get_mount_info(). Use archinstall.findmnt() instead, which does not do any automatic parsing. Please change at:\n{''.join(traceback.format_stack())}") - device_path, bind_path = split_bind_name(path) - output = {} - - for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): - if traversal in ignore: - continue - - try: - log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) - if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')): - break - - except SysCallError as error: - print('ERROR:', error) - pass - - if not traverse: - break - - if not output: - raise DiskError(f"Could not get mount information for device path {device_path}") - - output = json.loads(output) - - # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter - # i.e. the subvolume filesystem we're searching for - if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None: - output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)] - - if 'filesystems' in output: - if len(output['filesystems']) > 1: - raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}") - - if return_real_path: - return output['filesystems'][0], traversal - else: - return output['filesystems'][0] - - if return_real_path: - return {}, traversal - else: - return {} - - -def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]: - for info in data: - if info.get('target') not in filters: - filters[info.get('target')] = None - - filters.update(get_all_targets(info.get('children', []))) - - return filters - -def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: - from .partition import Partition - - try: - output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') - except SysCallError: - return {} - - if not output: - return {} - - output = json.loads(output) - - mounts = {} - - block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True) - - block_devices_mountpoints = {} - for blockdev in block_devices_available.values(): - if not type(blockdev) in (Partition, MapperDev): - continue - - if isinstance(blockdev, Partition): - if blockdev.mountpoints: - for blockdev_mountpoint in blockdev.mountpoints: - block_devices_mountpoints[blockdev_mountpoint] = blockdev - else: - if blockdev.mount_information: - for blockdev_mountpoint in blockdev.mount_information: - block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev - - log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) - - for mountpoint in list(get_all_targets(output['filesystems']).keys()): - # Since all_blockdevices() returns PosixPath objects, we need to convert - # findmnt paths to pathlib.Path() first: - mountpoint = pathlib.Path(mountpoint) - - if mountpoint in block_devices_mountpoints: - if mountpoint not in mounts: - mounts[mountpoint] = block_devices_mountpoints[mountpoint] - # If the already defined mountpoint is a DMCryptDev, and the newly found - # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition. - elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev: - mounts[mountpoint] = block_devices_mountpoints[mountpoint] - - log(f"Available partitions: {mounts}", level=logging.DEBUG) - - return mounts - - -def get_filesystem_type(path :str) -> Optional[str]: - try: - return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip() - except SysCallError: - return None - - -def disk_layouts() -> Optional[Dict[str, Any]]: - try: - if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0: - return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()} - else: - log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow") - return None - except SysCallError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") - return None - except json.decoder.JSONDecodeError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") - return None - - -def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: - for device in block_devices: - for partition in block_devices[device]['partitions']: - if partition.get('mountpoint', None) == relative_mountpoint: - return partition - -def partprobe(path :str = '') -> bool: - try: - if SysCommand(f'bash -c "partprobe {path}"').exit_code == 0: - return True - except SysCallError: - pass - return False - -def convert_device_to_uuid(path :str) -> str: - device_name, bind_name = split_bind_name(path) - - for i in range(storage['DISK_RETRY_ATTEMPTS']): - partprobe(device_name) - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) # TODO: Remove, we should be relying on blkid instead of lsblk - - # TODO: Convert lsblk to blkid - # (lsblk supports BlockDev and Partition UUID grabbing, blkid requires you to pick PTUUID and PARTUUID) - output = json.loads(SysCommand(f"lsblk --json -o+UUID {device_name}").decode('UTF-8')) - - for device in output['blockdevices']: - if (dev_uuid := device.get('uuid', None)): - return dev_uuid - - raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.") - - -def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool: - """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path) - Coded for clarity rather than performance - - Input parms: - :parm partition the partition we check - :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema - - :parm target (a string representing a mount path we want to check for. - :type str - - :parm strict if the check will be strict, target is exactly the mountpoint, or no, where the target is a leaf (f.i. to check if it is in /mnt/archinstall/). Not available for root check ('/') for obvious reasons - - """ - # we create the mountpoint list - if isinstance(partition,dict): - subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', []) - mountpoints = [partition.get('mountpoint')] - mountpoints += [volume.mountpoint for volume in subvolumes] - else: - mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes] - - # we check - if strict or target == '/': - if target in mountpoints: - return True - else: - return False - else: - for mp in mountpoints: - if mp and mp.endswith(target): - return True - return False diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py deleted file mode 100644 index bf1b3583..00000000 --- a/archinstall/lib/disk/mapperdev.py +++ /dev/null @@ -1,92 +0,0 @@ -import glob -import pathlib -import logging -import json -from dataclasses import dataclass -from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING - -from ..exceptions import SysCallError -from ..general import SysCommand -from ..output import log - -if TYPE_CHECKING: - from .btrfs import BtrfsSubvolumeInfo - -@dataclass -class MapperDev: - mappername :str - - @property - def name(self): - return self.mappername - - @property - def path(self): - return f"/dev/mapper/{self.mappername}" - - @property - def part_uuid(self): - return self.partition.part_uuid - - @property - def partition(self): - from .helpers import uevent, get_parent_of_partition - from .partition import Partition - from .blockdevice import BlockDevice - - for mapper in glob.glob('/dev/mapper/*'): - path_obj = pathlib.Path(mapper) - if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink(): - dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve() - - for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"): - partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name - - try: - uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode() - except SysCallError as error: - log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red") - - information = uevent(uevent_data) - block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME']))) - - return Partition(information['DEVNAME'], block_device=block_device) - - raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device") - - @property - def mountpoint(self) -> Optional[pathlib.Path]: - try: - data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) - for filesystem in data['filesystems']: - return pathlib.Path(filesystem.get('target')) - - except SysCallError as error: - # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow") - pass - - return None - - @property - def mountpoints(self) -> List[Dict[str, Any]]: - return [obj['target'] for obj in self.mount_information] - - @property - def mount_information(self) -> List[Dict[str, Any]]: - from .helpers import find_mountpoint - return [{**obj, 'target' : pathlib.Path(obj.get('target', '/dev/null'))} for obj in find_mountpoint(self.path)] - - @property - def filesystem(self) -> Optional[str]: - from .helpers import get_filesystem_type - return get_filesystem_type(self.path) - - @property - def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']: - from .btrfs import subvolume_info_from_path - - for mountpoint in self.mount_information: - if target := mountpoint.get('target'): - if subvolume := subvolume_info_from_path(pathlib.Path(target)): - yield subvolume diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py deleted file mode 100644 index 87eaa6a7..00000000 --- a/archinstall/lib/disk/partition.py +++ /dev/null @@ -1,661 +0,0 @@ -import glob -import time -import logging -import json -import os -import hashlib -import typing -from dataclasses import dataclass, field -from pathlib import Path -from typing import Optional, Dict, Any, List, Union, Iterator - -from .blockdevice import BlockDevice -from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name -from ..storage import storage -from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat -from ..output import log -from ..general import SysCommand -from .btrfs.btrfs_helpers import subvolume_info_from_path -from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo - -@dataclass -class PartitionInfo: - partition_object: 'Partition' - device_path: str # This would be /dev/sda1 for instance - bootable: bool - size: float - sector_size: int - start: Optional[int] - end: Optional[int] - pttype: Optional[str] - filesystem_type: Optional[str] - partuuid: Optional[str] - uuid: Optional[str] - mountpoints: List[Path] = field(default_factory=list) - - def __post_init__(self): - if not all([self.partuuid, self.uuid]): - for i in range(storage['DISK_RETRY_ATTEMPTS']): - lsblk_info = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - try: - lsblk_info = json.loads(lsblk_info) - except json.decoder.JSONDecodeError: - log(f"Could not decode JSON: {lsblk_info}", fg="red", level=logging.ERROR) - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - if not (device := lsblk_info.get('blockdevices', [None])[0]): - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - self.partuuid = device.get('partuuid') - self.uuid = device.get('uuid') - - # Lets build a list of requirements that we would like - # to retry and build (stuff that can take time between partprobes) - requirements = [] - requirements.append(self.partuuid) - - # Unformatted partitions won't have a UUID - if lsblk_info.get('fstype') is not None: - requirements.append(self.uuid) - - if all(requirements): - break - - self.partition_object.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - def get_first_mountpoint(self) -> Optional[Path]: - if len(self.mountpoints) > 0: - return self.mountpoints[0] - return None - - -class Partition: - def __init__( - self, - path: str, - block_device: BlockDevice, - part_id :Optional[str] = None, - filesystem :Optional[str] = None, - mountpoint :Optional[str] = None, - encrypted :bool = False, - autodetect_filesystem :bool = True, - ): - if not part_id: - part_id = os.path.basename(path) - - if type(block_device) is str: - raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") - - self.block_device = block_device - self._path = path - self._part_id = part_id - self._target_mountpoint = mountpoint - self._encrypted = encrypted - self._wipe = False - self._type = 'primary' - - if mountpoint: - self.mount(mountpoint) - - try: - self._partition_info = self._fetch_information() - - if not autodetect_filesystem and filesystem: - self._partition_info.filesystem_type = filesystem - - if self._partition_info.filesystem_type == 'crypto_LUKS': - self._encrypted = True - except DiskError: - self._partition_info = None - - @typing.no_type_check # I hate doint this but I'm currently unsure where this is used. - def __lt__(self, left_comparitor :BlockDevice) -> bool: - if type(left_comparitor) == Partition: - left_comparitor = left_comparitor.path - else: - left_comparitor = str(left_comparitor) - - # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 - return self._path < left_comparitor - - def __repr__(self, *args :str, **kwargs :str) -> str: - mount_repr = '' - if self._partition_info: - if mountpoint := self._partition_info.get_first_mountpoint(): - mount_repr = f", mounted={mountpoint}" - elif self._target_mountpoint: - mount_repr = f", rel_mountpoint={self._target_mountpoint}" - - classname = self.__class__.__name__ - - if not self._partition_info: - return f'{classname}(path={self._path})' - elif self._encrypted: - return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})' - else: - return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})' - - def as_json(self) -> Dict[str, Any]: - """ - this is used for the table representation of the partition (see FormattedOutput) - """ - partition_info = { - 'type': self._type, - 'PARTUUID': self.part_uuid, - 'wipe': self._wipe, - 'boot': self.boot, - 'ESP': self.boot, - 'mountpoint': self._target_mountpoint, - 'encrypted': self._encrypted, - 'start': self.start, - 'size': self.end, - 'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown' - } - - return partition_info - - def __dump__(self) -> Dict[str, Any]: - # TODO remove this in favour of as_json - return { - 'type': self._type, - 'PARTUUID': self.part_uuid, - 'wipe': self._wipe, - 'boot': self.boot, - 'ESP': self.boot, - 'mountpoint': self._target_mountpoint, - 'encrypted': self._encrypted, - 'start': self.start, - 'size': self.end, - 'filesystem': { - 'format': self._partition_info.filesystem_type if self._partition_info else 'None' - } - } - - def _call_lsblk(self) -> Dict[str, Any]: - for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk - # This sleep might be overkill, but lsblk is known to - # work against a chaotic cache that can change during call - # causing no information to be returned (blkid is better) - # time.sleep(1) - - # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) - - try: - output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - except SysCallError as error: - # Get the output minus the message/info from lsblk if it returns a non-zero exit code. - output = error.worker.decode('UTF-8') - if '{' in output: - output = output[output.find('{'):] - - if output: - try: - lsblk_info = json.loads(output) - return lsblk_info - except json.decoder.JSONDecodeError: - log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR) - - raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk') - - def _call_sfdisk(self) -> Dict[str, Any]: - output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8') - - if output: - sfdisk_info = json.loads(output) - partitions = sfdisk_info.get('partitiontable', {}).get('partitions', []) - node = list(filter(lambda x: x['node'] == self._path, partitions)) - - if len(node) > 0: - return node[0] - - return {} - - raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk') - - def _fetch_information(self) -> PartitionInfo: - lsblk_info = self._call_lsblk() - sfdisk_info = self._call_sfdisk() - - if not (device := lsblk_info.get('blockdevices', [])): - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - # Grab the first (and only) block device in the list as we're targeting a specific partition - device = device[0] - - mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint] - bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' - - return PartitionInfo( - partition_object=self, - device_path=self._path, - pttype=device['pttype'], - partuuid=device['partuuid'], - uuid=device['uuid'], - sector_size=device['log-sec'], - size=convert_size_to_gb(device['size']), - start=sfdisk_info.get('start', None), - end=sfdisk_info.get('size', None), - bootable=bootable, - filesystem_type=device['fstype'], - mountpoints=mountpoints - ) - - @property - def target_mountpoint(self) -> Optional[str]: - return self._target_mountpoint - - @property - def path(self) -> str: - return self._path - - @property - def filesystem(self) -> str: - if self._partition_info: - return self._partition_info.filesystem_type - - @property - def mountpoint(self) -> Optional[Path]: - if len(self.mountpoints) > 0: - return self.mountpoints[0] - return None - - @property - def mountpoints(self) -> List[Path]: - if self._partition_info: - return self._partition_info.mountpoints - - @property - def sector_size(self) -> int: - if self._partition_info: - return self._partition_info.sector_size - - @property - def start(self) -> Optional[int]: - if self._partition_info: - return self._partition_info.start - - @property - def end(self) -> Optional[int]: - if self._partition_info: - return self._partition_info.end - - @property - def end_sectors(self) -> Optional[int]: - if self._partition_info: - start = self._partition_info.start - end = self._partition_info.end - if start and end: - return start + end - - @property - def size(self) -> Optional[float]: - if self._partition_info: - return self._partition_info.size - - @property - def boot(self) -> bool: - if self._partition_info: - return self._partition_info.bootable - - @property - def partition_type(self) -> Optional[str]: - if self._partition_info: - return self._partition_info.pttype - - @property - def part_uuid(self) -> str: - if self._partition_info: - return self._partition_info.partuuid - - @property - def uuid(self) -> Optional[str]: - """ - Returns the UUID as returned by lsblk for the **partition**. - This is more reliable than relying on /dev/disk/by-uuid as - it doesn't seam to be able to detect md raid partitions. - For bind mounts all the subvolumes share the same uuid - """ - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if not self.partprobe(): - raise DiskError(f"Could not perform partprobe on {self.device_path}") - - time.sleep(storage.get('DISK_TIMEOUTS', 1) * i) - - partuuid = self._safe_uuid - if partuuid: - return partuuid - - raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") - - @property - def _safe_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None - - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) - - try: - return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip() - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None - - log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}") - - @property - def _safe_part_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None - - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) - - try: - return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None - - log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}") - - if self._partition_info: - return self._partition_info.uuid - - @property - def encrypted(self) -> Union[bool, None]: - return self._encrypted - - @property - def parent(self) -> str: - return self.real_device - - @property - def real_device(self) -> str: - output = SysCommand('lsblk -J').decode('UTF-8') - - if output: - for blockdevice in json.loads(output)['blockdevices']: - if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): - return f"/dev/{parent}" - return self._path - - raise DiskError('Unable to get disk information for command "lsblk -J"') - - @property - def device_path(self) -> str: - """ for bind mounts returns the physical path of the partition - """ - device_path, bind_name = split_bind_name(self._path) - return device_path - - @property - def bind_name(self) -> str: - """ for bind mounts returns the bind name (subvolume path). - Returns none if this property does not exist - """ - device_path, bind_name = split_bind_name(self._path) - return bind_name - - @property - def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]: - from .helpers import findmnt - - def iterate_children_recursively(information): - for child in information.get('children', []): - if target := child.get('target'): - if child.get('fstype') == 'btrfs': - if subvolume := subvolume_info_from_path(Path(target)): - yield subvolume - - if child.get('children'): - for subchild in iterate_children_recursively(child): - yield subchild - - if self._partition_info.filesystem_type == 'btrfs': - for mountpoint in self._partition_info.mountpoints: - if result := findmnt(mountpoint): - for filesystem in result.get('filesystems', []): - if subvolume := subvolume_info_from_path(mountpoint): - yield subvolume - - for child in iterate_children_recursively(filesystem): - yield child - - def partprobe(self) -> bool: - try: - if self.block_device: - return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code - except SysCallError as error: - log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG) - - return False - - def detect_inner_filesystem(self, password :str) -> Optional[str]: - log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) - from ..luks import luks2 - - try: - with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device: - return unlocked_device.filesystem - except SysCallError: - pass - return None - - def has_content(self) -> bool: - fs_type = self._partition_info.filesystem_type - if not fs_type or "swap" in fs_type: - return False - - temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() - temporary_path = Path(temporary_mountpoint) - - temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0: - raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}') - - files = len(glob.glob(f"{temporary_mountpoint}/*")) - iterations = 0 - while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10: - time.sleep(1) - - temporary_path.rmdir() - - return True if files > 0 else False - - def encrypt(self, password: Optional[str] = None) -> str: - """ - A wrapper function for luks2() instances and the .encrypt() method of that instance. - """ - from ..luks import luks2 - - handle = luks2(self, None, None) - return handle.encrypt(self, password=password) - - def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool: - """ - Format can be given an overriding path, for instance /dev/null to test - the formatting functionality and in essence the support for the given filesystem. - """ - if filesystem is None: - filesystem = self._partition_info.filesystem_type - - if path is None: - path = self._path - - # This converts from fat32 -> vfat to unify filesystem names - filesystem = get_mount_fs_type(filesystem) - - # To avoid "unable to open /dev/x: No such file or directory" - start_wait = time.time() - while Path(path).exists() is False and time.time() - start_wait < 10: - time.sleep(0.025) - - if log_formatting: - log(f'Formatting {path} -> {filesystem}', level=logging.INFO) - - try: - if filesystem == 'btrfs': - options = ['-f'] + options - - mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8') - if mkfs and 'UUID:' not in mkfs: - raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'vfat': - options = ['-F32'] + options - log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}") - if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ext4': - options = ['-F'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ext2': - options = ['-F'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = 'ext2' - elif filesystem == 'xfs': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'f2fs': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ntfs3': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'crypto_LUKS': - # from ..luks import luks2 - # encrypted_partition = luks2(self, None, None) - # encrypted_partition.format(path) - self._partition_info.filesystem_type = filesystem - - else: - raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") - except SysCallError as error: - log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange") - if retry is True: - log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange") - time.sleep(storage.get('DISK_TIMEOUTS', 1)) - - return self.format(filesystem, path, log_formatting, options, retry=False) - - if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': - self._encrypted = True - else: - self._encrypted = False - - return True - - def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: - if data['name'] == name: - return parent - elif 'children' in data: - for child in data['children']: - if parent := self.find_parent_of(child, name, parent=data['name']): - return parent - - return None - - def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: - if not self._partition_info.get_first_mountpoint(): - log(f'Mounting {self} to {target}', level=logging.INFO) - - if not fs: - fs = self._partition_info.filesystem_type - - fs_type = get_mount_fs_type(fs) - - Path(target).mkdir(parents=True, exist_ok=True) - - if self.bind_name: - device_path = self.device_path - # TODO options should be better be a list than a string - if options: - options = f"{options},subvol={self.bind_name}" - else: - options = f"subvol={self.bind_name}" - else: - device_path = self._path - try: - if options: - mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}") - else: - mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} {device_path} {target}") - - # TODO: Should be redundant to check for exit_code - if mnt_handle.exit_code != 0: - raise DiskError(f"Could not mount {self._path} to {target} using options {options}") - except SysCallError as err: - raise err - - # Update the partition info since the mount info has changed after this call. - self._partition_info = self._fetch_information() - return True - - return False - - def unmount(self) -> bool: - SysCommand(f"/usr/bin/umount {self._path}") - - # Update the partition info since the mount info has changed after this call. - self._partition_info = self._fetch_information() - return True - - def filesystem_supported(self) -> bool: - """ - The support for a filesystem (this partition) is tested by calling - partition.format() with a path set to '/dev/null' which returns two exceptions: - 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported - 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type - """ - try: - self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False) - except (SysCallError, DiskError): - pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code - except UnknownFilesystemFormat as err: - raise err - return True - - -def get_mount_fs_type(fs :str) -> str: - if fs == 'ntfs': - return 'ntfs3' # Needed to use the Paragon R/W NTFS driver - elif fs == 'fat32': - return 'vfat' # This is the actual type used for fat32 mounting - return fs diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py new file mode 100644 index 00000000..686e8c29 --- /dev/null +++ b/archinstall/lib/disk/partitioning_menu.py @@ -0,0 +1,335 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple + +from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \ + ModificationStatus +from ..menu import Menu, ListManager, MenuSelection, TextInput +from ..output import FormattedOutput, log +from .subvolume_menu import SubvolumeMenu + +if TYPE_CHECKING: + _: Any + + +class PartitioningList(ListManager): + """ + subclass of ListManager for the managing of user accounts + """ + def __init__(self, prompt: str, device: BDevice, device_partitions: List[PartitionModification]): + self._device = device + self._actions = { + 'create_new_partition': str(_('Create a new partition')), + 'suggest_partition_layout': str(_('Suggest partition layout')), + 'remove_added_partitions': str(_('Remove all newly added partitions')), + 'assign_mountpoint': str(_('Assign mountpoint')), + 'mark_formatting': str(_('Mark/Unmark to be formatted (wipes data)')), + 'mark_bootable': str(_('Mark/Unmark as bootable')), + 'set_filesystem': str(_('Change filesystem')), + 'btrfs_mark_compressed': str(_('Mark/Unmark as compressed')), # btrfs only + 'btrfs_set_subvolumes': str(_('Set subvolumes')), # btrfs only + 'delete_partition': str(_('Delete partition')) + } + + display_actions = list(self._actions.values()) + super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:]) + + def reformat(self, data: List[PartitionModification]) -> Dict[str, Optional[PartitionModification]]: + table = FormattedOutput.as_table(data) + rows = table.split('\n') + + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[PartitionModification]] = {f' {rows[0]}': None, f' {rows[1]}': None} + + for row, user in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = user + + return display_data + + def selected_action_display(self, partition: PartitionModification) -> str: + return str(_('Partition')) + + def filter_options(self, selection: PartitionModification, options: List[str]) -> List[str]: + not_filter = [] + + # only display formatting if the partition exists already + if not selection.exists(): + not_filter += [self._actions['mark_formatting']] + else: + # only allow these options if the existing partition + # was marked as formatting, otherwise we run into issues where + # 1. select a new fs -> potentially mark as wipe now + # 2. Switch back to old filesystem -> should unmark wipe now, but + # how do we know it was the original one? + not_filter += [ + self._actions['set_filesystem'], + self._actions['assign_mountpoint'], + self._actions['mark_bootable'], + self._actions['btrfs_mark_compressed'], + self._actions['btrfs_set_subvolumes'] + ] + + # non btrfs partitions shouldn't get btrfs options + if selection.fs_type != FilesystemType.Btrfs: + not_filter += [self._actions['btrfs_mark_compressed'], self._actions['btrfs_set_subvolumes']] + else: + not_filter += [self._actions['assign_mountpoint']] + + return [o for o in options if o not in not_filter] + + def handle_action( + self, + action: str, + entry: Optional[PartitionModification], + data: List[PartitionModification] + ) -> List[PartitionModification]: + action_key = [k for k, v in self._actions.items() if v == action][0] + + match action_key: + case 'create_new_partition': + new_partition = self._create_new_partition() + data += [new_partition] + case 'suggest_partition_layout': + new_partitions = self._suggest_partition_layout(data) + if len(new_partitions) > 0: + data = new_partitions + case 'remove_added_partitions': + choice = self._reset_confirmation() + if choice.value == Menu.yes(): + data = [part for part in data if part.is_exists_or_modify()] + case 'assign_mountpoint' if entry: + entry.mountpoint = self._prompt_mountpoint() + if entry.mountpoint == Path('/boot'): + entry.set_flag(PartitionFlag.Boot) + case 'mark_formatting' if entry: + self._prompt_formatting(entry) + case 'mark_bootable' if entry: + entry.invert_flag(PartitionFlag.Boot) + case 'set_filesystem' if entry: + fs_type = self._prompt_partition_fs_type() + if fs_type: + entry.fs_type = fs_type + # btrfs subvolumes will define mountpoints + if fs_type == FilesystemType.Btrfs: + entry.mountpoint = None + case 'btrfs_mark_compressed' if entry: + self._set_compressed(entry) + case 'btrfs_set_subvolumes' if entry: + self._set_btrfs_subvolumes(entry) + case 'delete_partition' if entry: + data = self._delete_partition(entry, data) + + return data + + def _delete_partition( + self, + entry: PartitionModification, + data: List[PartitionModification] + ) -> List[PartitionModification]: + if entry.is_exists_or_modify(): + entry.status = ModificationStatus.Delete + return data + else: + return [d for d in data if d != entry] + + def _set_compressed(self, partition: PartitionModification): + compression = 'compress=zstd' + + if compression in partition.mount_options: + partition.mount_options = [o for o in partition.mount_options if o != compression] + else: + partition.mount_options.append(compression) + + def _set_btrfs_subvolumes(self, partition: PartitionModification): + partition.btrfs_subvols = SubvolumeMenu( + _("Manage btrfs subvolumes for current partition"), + partition.btrfs_subvols + ).run() + + def _prompt_formatting(self, partition: PartitionModification): + # an existing partition can toggle between Exist or Modify + if partition.is_modify(): + partition.status = ModificationStatus.Exist + return + elif partition.exists(): + partition.status = ModificationStatus.Modify + + # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really + # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, + # it's safe to change the filesystem for this partition. + if partition.fs_type == FilesystemType.Crypto_luks: + prompt = str(_('This partition is currently encrypted, to format it a filesystem has to be specified')) + fs_type = self._prompt_partition_fs_type(prompt) + partition.fs_type = fs_type + + if fs_type == FilesystemType.Btrfs: + partition.mountpoint = None + + def _prompt_mountpoint(self) -> Path: + header = str(_('Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + '\n' + header += str(_('If mountpoint /boot is set, then the partition will also be marked as bootable.')) + '\n' + prompt = str(_('Mountpoint: ')) + + print(header) + + while True: + value = TextInput(prompt).run().strip() + + if value: + mountpoint = Path(value) + break + + return mountpoint + + def _prompt_partition_fs_type(self, prompt: str = '') -> FilesystemType: + options = {fs.value: fs for fs in FilesystemType if fs != FilesystemType.Crypto_luks} + + prompt = prompt + '\n' + str(_('Enter a desired filesystem type for the partition')) + choice = Menu(prompt, options, sort=False, skip=False).run() + return options[choice.single_value] + + def _validate_sector(self, start_sector: str, end_sector: Optional[str] = None) -> bool: + if not start_sector.isdigit(): + return False + + if end_sector: + if end_sector.endswith('%'): + if not end_sector[:-1].isdigit(): + return False + elif not end_sector.isdigit(): + return False + elif int(start_sector) > int(end_sector): + return False + + return True + + def _prompt_sectors(self) -> Tuple[Size, Size]: + device_info = self._device.device_info + + text = str(_('Current free sectors on device {}:')).format(device_info.path) + '\n\n' + free_space_table = FormattedOutput.as_table(device_info.free_space_regions) + prompt = text + free_space_table + '\n' + + total_sectors = device_info.total_size.format_size(Unit.sectors, device_info.sector_size) + prompt += str(_('Total sectors: {}')).format(total_sectors) + '\n' + print(prompt) + + largest_free_area = max(device_info.free_space_regions, key=lambda r: r.get_length()) + + # prompt until a valid start sector was entered + while True: + start_prompt = str(_('Enter the start sector (default: {}): ')).format(largest_free_area.start) + start_sector = TextInput(start_prompt).run().strip() + + if not start_sector or self._validate_sector(start_sector): + break + + log(f'Invalid start sector entered: {start_sector}', fg='red', level=logging.INFO) + + if not start_sector: + start_sector = str(largest_free_area.start) + end_sector = str(largest_free_area.end) + else: + end_sector = '100%' + + # prompt until valid end sector was entered + while True: + end_prompt = str(_('Enter the end sector of the partition (percentage or block number, default: {}): ')).format(end_sector) + end_value = TextInput(end_prompt).run().strip() + + if not end_value or self._validate_sector(start_sector, end_value): + break + + log(f'Invalid end sector entered: {start_sector}', fg='red', level=logging.INFO) + + # override the default value with the user value + if end_value: + end_sector = end_value + + start_size = Size(int(start_sector), Unit.sectors, device_info.sector_size) + + if end_sector.endswith('%'): + end_size = Size(int(end_sector[:-1]), Unit.Percent, device_info.sector_size, device_info.total_size) + else: + end_size = Size(int(end_sector), Unit.sectors, device_info.sector_size) + + return start_size, end_size + + def _create_new_partition(self) -> PartitionModification: + fs_type = self._prompt_partition_fs_type() + + start_size, end_size = self._prompt_sectors() + length = end_size - start_size + + # new line for the next prompt + print() + + mountpoint = None + if fs_type != FilesystemType.Btrfs: + mountpoint = self._prompt_mountpoint() + + partition = PartitionModification( + status=ModificationStatus.Create, + type=PartitionType.Primary, + start=start_size, + length=length, + fs_type=fs_type, + mountpoint=mountpoint + ) + + if partition.mountpoint == Path('/boot'): + partition.set_flag(PartitionFlag.Boot) + + return partition + + def _reset_confirmation(self) -> MenuSelection: + prompt = str(_('This will remove all newly added partitions, continue?')) + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run() + return choice + + def _suggest_partition_layout(self, data: List[PartitionModification]) -> List[PartitionModification]: + # if modifications have been done already, inform the user + # that this operation will erase those modifications + if any([not entry.exists() for entry in data]): + choice = self._reset_confirmation() + if choice.value == Menu.no(): + return [] + + from ..user_interaction.disk_conf import suggest_single_disk_layout + + device_modification = suggest_single_disk_layout(self._device) + return device_modification.partitions + + +def manual_partitioning( + device: BDevice, + prompt: str = '', + preset: List[PartitionModification] = [] +) -> List[PartitionModification]: + if not prompt: + prompt = str(_('Partition management: {}')).format(device.device_info.path) + '\n' + prompt += str(_('Total length: {}')).format(device.device_info.total_size.format_size(Unit.MiB)) + + manual_preset = [] + + if not preset: + # we'll display the existing partitions of the device + for partition in device.partition_infos: + manual_preset.append( + PartitionModification.from_existing_partition(partition) + ) + else: + manual_preset = preset + + menu_list = PartitioningList(prompt, device, manual_preset) + partitions = menu_list.run() + + if menu_list.is_last_choice_cancel(): + return preset + + return partitions diff --git a/archinstall/lib/disk/subvolume_menu.py b/archinstall/lib/disk/subvolume_menu.py new file mode 100644 index 00000000..32a0e616 --- /dev/null +++ b/archinstall/lib/disk/subvolume_menu.py @@ -0,0 +1,101 @@ +from pathlib import Path +from typing import Dict, List, Optional, Any, TYPE_CHECKING + +from .device_model import SubvolumeModification +from ..menu import Menu, TextInput, MenuSelectionType, ListManager +from ..output import FormattedOutput + +if TYPE_CHECKING: + _: Any + + +class SubvolumeMenu(ListManager): + def __init__(self, prompt: str, btrfs_subvols: List[SubvolumeModification]): + self._actions = [ + str(_('Add subvolume')), + str(_('Edit subvolume')), + str(_('Delete subvolume')) + ] + super().__init__(prompt, btrfs_subvols, [self._actions[0]], self._actions[1:]) + + def reformat(self, data: List[SubvolumeModification]) -> Dict[str, Optional[SubvolumeModification]]: + table = FormattedOutput.as_table(data) + rows = table.split('\n') + + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[SubvolumeModification]] = {f' {rows[0]}': None, f' {rows[1]}': None} + + for row, subvol in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = subvol + + return display_data + + def selected_action_display(self, subvolume: SubvolumeModification) -> str: + return str(subvolume.name) + + def _prompt_options(self, editing: Optional[SubvolumeModification] = None) -> List[str]: + preset_options = [] + if editing: + preset_options = editing.mount_options + + choice = Menu( + str(_("Select the desired subvolume options ")), + ['nodatacow', 'compress'], + skip=True, + preset_values=preset_options, + multi=True + ).run() + + if choice.type_ == MenuSelectionType.Selection: + return choice.value # type: ignore + + return [] + + def _add_subvolume(self, editing: Optional[SubvolumeModification] = None) -> Optional[SubvolumeModification]: + name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run() + + if not name: + return None + + mountpoint = TextInput(f'{_("Subvolume mountpoint")}: ', str(editing.mountpoint) if editing else '').run() + + if not mountpoint: + return None + + options = self._prompt_options(editing) + + subvolume = SubvolumeModification(Path(name), Path(mountpoint)) + subvolume.compress = 'compress' in options + subvolume.nodatacow = 'nodatacow' in options + + return subvolume + + def handle_action( + self, + action: str, + entry: Optional[SubvolumeModification], + data: List[SubvolumeModification] + ) -> List[SubvolumeModification]: + if action == self._actions[0]: # add + new_subvolume = self._add_subvolume() + + if new_subvolume is not None: + # in case a user with the same username as an existing user + # was created we'll replace the existing one + data = [d for d in data if d.name != new_subvolume.name] + data += [new_subvolume] + elif entry is not None: + if action == self._actions[1]: # edit subvolume + new_subvolume = self._add_subvolume(entry) + + if new_subvolume is not None: + # we'll remove the original subvolume and add the modified version + data = [d for d in data if d.name != entry.name and d.name != new_subvolume.name] + data += [new_subvolume] + elif action == self._actions[2]: # delete + data = [d for d in data if d != entry] + + return data diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py deleted file mode 100644 index 5809c073..00000000 --- a/archinstall/lib/disk/user_guides.py +++ /dev/null @@ -1,240 +0,0 @@ -from __future__ import annotations -import logging -from typing import Optional, Dict, Any, List, TYPE_CHECKING - -# https://stackoverflow.com/a/39757388/929999 -from ..models.subvolume import Subvolume - -if TYPE_CHECKING: - from .blockdevice import BlockDevice - _: Any - -from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to -from ..hardware import has_uefi -from ..output import log -from ..menu import Menu - - -def suggest_single_disk_layout(block_device :BlockDevice, - default_filesystem :Optional[str] = None, - advanced_options :bool = False) -> Dict[str, Any]: - - if not default_filesystem: - from ..user_interaction import ask_for_main_filesystem_format - default_filesystem = ask_for_main_filesystem_format(advanced_options) - - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB - using_subvolumes = False - using_home_partition = False - compression = False - - if default_filesystem == 'btrfs': - prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - using_subvolumes = choice.value == Menu.yes() - - prompt = str(_('Would you like to use BTRFS compression?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - compression = choice.value == Menu.yes() - - layout = { - block_device.path : { - "wipe" : True, - "partitions" : [] - } - } - - # Used for reference: https://wiki.archlinux.org/title/partitioning - - # 2 MiB is unallocated for GRUB on BIOS. Potentially unneeded for - # other bootloaders? - - # TODO: On BIOS, /boot partition is only needed if the drive will - # be encrypted, otherwise it is not recommended. We should probably - # add a check for whether the drive will be encrypted or not. - layout[block_device.path]['partitions'].append({ - # Boot - "type" : "primary", - "start" : "3MiB", - "size" : "203MiB", - "boot" : True, - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/boot", - "filesystem" : { - "format" : "fat32" - } - }) - - # Increase the UEFI partition if UEFI is detected. - # Also re-align the start to 1MiB since we don't need the first sectors - # like we do in MBR layouts where the boot loader is installed traditionally. - if has_uefi(): - layout[block_device.path]['partitions'][-1]['start'] = '1MiB' - layout[block_device.path]['partitions'][-1]['size'] = '512MiB' - - layout[block_device.path]['partitions'].append({ - # Root - "type" : "primary", - "start" : "206MiB", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/" if not using_subvolumes else None, - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - - if has_uefi(): - layout[block_device.path]['partitions'][-1]['start'] = '513MiB' - - if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: - prompt = str(_('Would you like to create a separate partition for /home?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - using_home_partition = choice.value == Menu.yes() - - # Set a size for / (/root) - if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition: - # We'll use subvolumes - # Or the disk size is too small to allow for a separate /home - # Or the user doesn't want to create a separate partition for /home - layout[block_device.path]['partitions'][-1]['size'] = '100%' - else: - layout[block_device.path]['partitions'][-1]['size'] = f"{min(block_device.size, 20)}GiB" - - if default_filesystem == 'btrfs' and using_subvolumes: - # if input('Do you want to use a recommended structure? (Y/n): ').strip().lower() in ('', 'y', 'yes'): - # https://btrfs.wiki.kernel.org/index.php/FAQ - # https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash - # https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh - layout[block_device.path]['partitions'][1]['btrfs'] = { - 'subvolumes': [ - Subvolume('@', '/'), - Subvolume('@home', '/home'), - Subvolume('@log', '/var/log'), - Subvolume('@pkg', '/var/cache/pacman/pkg'), - Subvolume('@.snapshots', '/.snapshots') - ] - } - elif using_home_partition: - # If we don't want to use subvolumes, - # But we want to be able to re-use data between re-installs.. - # A second partition for /home would be nice if we have the space for it - layout[block_device.path]['partitions'].append({ - # Home - "type" : "primary", - "start" : f"{min(block_device.size, 20)}GiB", - "size" : "100%", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/home", - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - - return layout - - -def suggest_multi_disk_layout(block_devices :List[BlockDevice], default_filesystem :Optional[str] = None, advanced_options :bool = False): - - if not default_filesystem: - from ..user_interaction import ask_for_main_filesystem_format - default_filesystem = ask_for_main_filesystem_format(advanced_options) - - # Not really a rock solid foundation of information to stand on, but it's a start: - # https://www.reddit.com/r/btrfs/comments/m287gp/partition_strategy_for_two_physical_disks/ - # https://www.reddit.com/r/btrfs/comments/9us4hr/what_is_your_btrfs_partitionsubvolumes_scheme/ - - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB - ARCH_LINUX_INSTALLED_SIZE = 20 # GiB, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size? - - block_devices = sort_block_devices_based_on_performance(block_devices).keys() - - home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART) - root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device]) - - if home_device is None or root_device is None: - text = _('The selected drives do not have the minimum capacity required for an automatic suggestion\n') - text += _('Minimum capacity for /home partition: {}GB\n').format(MIN_SIZE_TO_ALLOW_HOME_PART) - text += _('Minimum capacity for Arch Linux partition: {}GB').format(ARCH_LINUX_INSTALLED_SIZE) - Menu(str(text), [str(_('Continue'))], skip=False).run() - return None - - compression = False - - if default_filesystem == 'btrfs': - # prompt = 'Would you like to use BTRFS subvolumes with a default structure?' - # choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() - # using_subvolumes = choice == 'yes' - - prompt = str(_('Would you like to use BTRFS compression?')) - choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() - compression = choice.value == Menu.yes() - - log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG) - - layout = { - root_device.path : { - "wipe" : True, - "partitions" : [] - }, - home_device.path : { - "wipe" : True, - "partitions" : [] - }, - } - - # TODO: Same deal as with the single disk layout, we should - # probably check if the drive will be encrypted. - layout[root_device.path]['partitions'].append({ - # Boot - "type" : "primary", - "start" : "3MiB", - "size" : "203MiB", - "boot" : True, - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/boot", - "filesystem" : { - "format" : "fat32" - } - }) - - if has_uefi(): - layout[root_device.path]['partitions'][-1]['start'] = '1MiB' - layout[root_device.path]['partitions'][-1]['size'] = '512MiB' - - layout[root_device.path]['partitions'].append({ - # Root - "type" : "primary", - "start" : "206MiB", - "size" : "100%", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/", - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - if has_uefi(): - layout[root_device.path]['partitions'][-1]['start'] = '513MiB' - - layout[home_device.path]['partitions'].append({ - # Home - "type" : "primary", - "start" : "1MiB", - "size" : "100%", - "encrypted" : False, - "wipe" : True, - "mountpoint" : "/home", - "filesystem" : { - "format" : default_filesystem, - "mount_options" : ["compress=zstd"] if compression else [] - } - }) - - return layout diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py deleted file mode 100644 index 076a8ba2..00000000 --- a/archinstall/lib/disk/validators.py +++ /dev/null @@ -1,48 +0,0 @@ -from typing import List - -def valid_parted_position(pos :str) -> bool: - if not len(pos): - return False - - if pos.isdigit(): - return True - - pos_lower = pos.lower() - - if (pos_lower.endswith('b') or pos_lower.endswith('s')) and pos[:-1].isdigit(): - return True - - if any(pos_lower.endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit() - for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']): - return True - - return False - - -def fs_types() -> List[str]: - # https://www.gnu.org/software/parted/manual/html_node/mkpart.html - # Above link doesn't agree with `man parted` /mkpart documentation: - """ - fs-type can - be one of "btrfs", "ext2", - "ext3", "ext4", "fat16", - "fat32", "hfs", "hfs+", - "linux-swap", "ntfs", "reis‐ - erfs", "udf", or "xfs". - """ - return [ - "btrfs", - "ext2", - "ext3", "ext4", # `man parted` allows these - "fat16", "fat32", - "hfs", "hfs+", # "hfsx", not included in `man parted` - "linux-swap", - "ntfs", - "reiserfs", - "udf", # "ufs", not included in `man parted` - "xfs", # `man parted` allows this - ] - - -def valid_fs_type(fstype :str) -> bool: - return fstype.lower() in fs_types() -- cgit v1.2.3-70-g09d2