Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
authorDaniel Girtler <girtler.daniel@gmail.com>2024-04-15 18:49:00 +1000
committerGitHub <noreply@github.com>2024-04-15 18:49:00 +1000
commitb470b16ec923260cfd9c5b9f2b88e0a39611b463 (patch)
tree25a32fd904f739e181a62a62451637bcf7cd6588 /archinstall/lib
parent7d9e9d8ba0bcba888ec46554f87dfc414c73f9c4 (diff)
LVM support (#2104)
* Submenu for disk configuration * Update * Add LVM manual config * PV selection * LVM volume menu * Update * Fix mypy * Update * Update * Update * Update * Update * Update * Update * Update * Update LVM * Update * Update * Btrfs support * Refactor * LVM on Luks * Luks on LVM * Update * LVM on Luks * Update * Update * mypy * Update * Fix bug with LuksOnLvm and Btrfs * Update * Update * Info -> Debug output
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk/__init__.py9
-rw-r--r--archinstall/lib/disk/device_handler.py264
-rw-r--r--archinstall/lib/disk/device_model.py408
-rw-r--r--archinstall/lib/disk/disk_menu.py140
-rw-r--r--archinstall/lib/disk/encryption_menu.py131
-rw-r--r--archinstall/lib/disk/fido.py8
-rw-r--r--archinstall/lib/disk/filesystem.py295
-rw-r--r--archinstall/lib/disk/partitioning_menu.py18
-rw-r--r--archinstall/lib/disk/subvolume_menu.py18
-rw-r--r--archinstall/lib/global_menu.py70
-rw-r--r--archinstall/lib/installer.py427
-rw-r--r--archinstall/lib/interactions/disk_conf.py134
-rw-r--r--archinstall/lib/interactions/manage_users_conf.py18
-rw-r--r--archinstall/lib/luks.py30
-rw-r--r--archinstall/lib/menu/abstract_menu.py93
-rw-r--r--archinstall/lib/menu/list_manager.py28
-rw-r--r--archinstall/lib/menu/menu.py8
-rw-r--r--archinstall/lib/menu/table_selection_menu.py4
-rw-r--r--archinstall/lib/mirrors.py15
19 files changed, 1709 insertions, 409 deletions
diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py
index 24dafef5..7f881273 100644
--- a/archinstall/lib/disk/__init__.py
+++ b/archinstall/lib/disk/__init__.py
@@ -11,6 +11,11 @@ from .device_model import (
BDevice,
DiskLayoutType,
DiskLayoutConfiguration,
+ LvmLayoutType,
+ LvmConfiguration,
+ LvmVolumeGroup,
+ LvmVolume,
+ LvmVolumeStatus,
PartitionTable,
Unit,
Size,
@@ -30,7 +35,7 @@ from .device_model import (
CleanType,
get_lsblk_info,
get_all_lsblk_info,
- get_lsblk_by_mountpoint
+ get_lsblk_by_mountpoint,
)
from .encryption_menu import (
select_encryption_type,
@@ -39,3 +44,5 @@ from .encryption_menu import (
select_partitions_to_encrypt,
DiskEncryptionMenu,
)
+
+from .disk_menu import DiskLayoutConfigurationMenu
diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py
index 6e91ac2e..7ba70382 100644
--- a/archinstall/lib/disk/device_handler.py
+++ b/archinstall/lib/disk/device_handler.py
@@ -3,8 +3,9 @@ from __future__ import annotations
import json
import os
import logging
+import time
from pathlib import Path
-from typing import List, Dict, Any, Optional, TYPE_CHECKING
+from typing import List, Dict, Any, Optional, TYPE_CHECKING, Literal, Iterable
from parted import ( # type: ignore
Disk, Geometry, FileSystem,
@@ -17,11 +18,12 @@ from .device_model import (
BDevice, _DeviceInfo, _PartitionInfo,
FilesystemType, Unit, PartitionTable,
ModificationStatus, get_lsblk_info, LsblkInfo,
- _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption
+ _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption, LvmVolumeGroup, LvmVolume, Size, LvmGroupInfo,
+ SectorSize, LvmVolumeInfo, LvmPVInfo, SubvolumeModification, BtrfsMountOption
)
from ..exceptions import DiskError, UnknownFilesystemFormat
-from ..general import SysCommand, SysCallError, JSON
+from ..general import SysCommand, SysCallError, JSON, SysCommandWorker
from ..luks import Luks2
from ..output import debug, error, info, warn, log
from ..utils.util import is_subpath
@@ -189,7 +191,7 @@ class DeviceHandler(object):
return subvol_infos
- def _perform_formatting(
+ def format(
self,
fs_type: FilesystemType,
path: Path,
@@ -234,7 +236,7 @@ class DeviceHandler(object):
options += additional_parted_options
options_str = ' '.join(options)
- info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
+ debug(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
try:
SysCommand(f"/usr/bin/{command} {options_str} {path}")
@@ -243,7 +245,33 @@ class DeviceHandler(object):
error(msg)
raise DiskError(msg) from err
- def _perform_enc_formatting(
+ def encrypt(
+ self,
+ dev_path: Path,
+ mapper_name: Optional[str],
+ enc_password: str,
+ lock_after_create: bool = True
+ ) -> Luks2:
+ luks_handler = Luks2(
+ dev_path,
+ mapper_name=mapper_name,
+ password=enc_password
+ )
+
+ key_file = luks_handler.encrypt()
+
+ luks_handler.unlock(key_file=key_file)
+
+ if not luks_handler.mapper_dev:
+ raise DiskError('Failed to unlock luks device')
+
+ if lock_after_create:
+ debug(f'luks2 locking device: {dev_path}')
+ luks_handler.lock()
+
+ return luks_handler
+
+ def format_encrypted(
self,
dev_path: Path,
mapper_name: Optional[str],
@@ -258,71 +286,160 @@ class DeviceHandler(object):
key_file = luks_handler.encrypt()
- debug(f'Unlocking luks2 device: {dev_path}')
luks_handler.unlock(key_file=key_file)
if not luks_handler.mapper_dev:
raise DiskError('Failed to unlock luks device')
info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}')
- self._perform_formatting(fs_type, luks_handler.mapper_dev)
+ self.format(fs_type, luks_handler.mapper_dev)
info(f'luks2 locking device: {dev_path}')
luks_handler.lock()
- def _validate_partitions(self, partitions: List[PartitionModification]):
- checks = {
- # verify that all partitions have a path set (which implies that they have been created)
- lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
- # crypto luks is not a valid file system type
- lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError('Crypto luks cannot be set as a filesystem type'),
- # file system type must be set
- lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
- }
-
- for check, exc in checks.items():
- found = next(filter(check, partitions), None)
- if found is not None:
- raise exc
-
- def format(
+ def _lvm_info(
self,
- device_mod: DeviceModification,
- enc_conf: Optional['DiskEncryption'] = None
- ):
- """
- Format can be given an overriding path, for instance /dev/null to test
- the formatting functionality and in essence the support for the given filesystem.
- """
+ cmd: str,
+ info_type: Literal['lv', 'vg', 'pvseg']
+ ) -> Optional[Any]:
+ raw_info = SysCommand(cmd).decode().split('\n')
- # only verify partitions that are being created or modified
- create_or_modify_parts = [p for p in device_mod.partitions if p.is_create_or_modify()]
+ # for whatever reason the output sometimes contains
+ # "File descriptor X leaked leaked on vgs invocation
+ data = '\n'.join([raw for raw in raw_info if 'File descriptor' not in raw])
- self._validate_partitions(create_or_modify_parts)
+ debug(f'LVM info: {data}')
- # make sure all devices are unmounted
- self._umount_all_existing(device_mod.device_path)
-
- for part_mod in create_or_modify_parts:
- # partition will be encrypted
- if enc_conf is not None and part_mod in enc_conf.partitions:
- self._perform_enc_formatting(
- part_mod.safe_dev_path,
- part_mod.mapper_name,
- part_mod.safe_fs_type,
- enc_conf
- )
- else:
- self._perform_formatting(part_mod.safe_fs_type, part_mod.safe_dev_path)
+ reports = json.loads(data)
+
+ for report in reports['report']:
+ if len(report[info_type]) != 1:
+ raise ValueError(f'Report does not contain any entry')
- # synchronize with udev before using lsblk
- SysCommand('udevadm settle')
+ entry = report[info_type][0]
- lsblk_info = self._fetch_part_info(part_mod.safe_dev_path)
+ match info_type:
+ case 'pvseg':
+ return LvmPVInfo(
+ pv_name=Path(entry['pv_name']),
+ lv_name=entry['lv_name'],
+ vg_name=entry['vg_name'],
+ )
+ case 'lv':
+ return LvmVolumeInfo(
+ lv_name=entry['lv_name'],
+ vg_name=entry['vg_name'],
+ lv_size=Size(int(entry[f'lv_size'][:-1]), Unit.B, SectorSize.default())
+ )
+ case 'vg':
+ return LvmGroupInfo(
+ vg_uuid=entry['vg_uuid'],
+ vg_size=Size(int(entry[f'vg_size'][:-1]), Unit.B, SectorSize.default())
+ )
+
+ return None
- part_mod.partn = lsblk_info.partn
- part_mod.partuuid = lsblk_info.partuuid
- part_mod.uuid = lsblk_info.uuid
+ def _lvm_info_with_retry(self, cmd: str, info_type: Literal['lv', 'vg', 'pvseg']) -> Optional[Any]:
+ attempts = 3
+
+ for attempt_nr in range(attempts):
+ try:
+ return self._lvm_info(cmd, info_type)
+ except ValueError:
+ time.sleep(attempt_nr + 1)
+
+ raise ValueError(f'Failed to fetch {info_type} information')
+
+ def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]:
+ cmd = (
+ 'lvs --reportformat json '
+ '--unit B '
+ f'-S lv_name={lv_name}'
+ )
+
+ return self._lvm_info_with_retry(cmd, 'lv')
+
+ def lvm_group_info(self, vg_name: str) -> Optional[LvmGroupInfo]:
+ cmd = (
+ 'vgs --reportformat json '
+ '--unit B '
+ '-o vg_name,vg_uuid,vg_size '
+ f'-S vg_name={vg_name}'
+ )
+
+ return self._lvm_info_with_retry(cmd, 'vg')
+
+ def lvm_pvseg_info(self, vg_name: str, lv_name: str) -> Optional[LvmPVInfo]:
+ cmd = (
+ 'pvs '
+ '--segments -o+lv_name,vg_name '
+ f'-S vg_name={vg_name},lv_name={lv_name} '
+ '--reportformat json '
+ )
+
+ return self._lvm_info_with_retry(cmd, 'pvseg')
+
+ def lvm_vol_change(self, vol: LvmVolume, activate: bool):
+ active_flag = 'y' if activate else 'n'
+ cmd = f'lvchange -a {active_flag} {vol.safe_dev_path}'
+
+ debug(f'lvchange volume: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_export_vg(self, vg: LvmVolumeGroup):
+ cmd = f'vgexport {vg.name}'
+
+ debug(f'vgexport: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_import_vg(self, vg: LvmVolumeGroup):
+ cmd = f'vgimport {vg.name}'
+
+ debug(f'vgimport: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_vol_reduce(self, vol_path: Path, amount: Size):
+ val = amount.format_size(Unit.B, include_unit=False)
+ cmd = f'lvreduce -L -{val}B {vol_path}'
+
+ debug(f'Reducing LVM volume size: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_pv_create(self, pvs: Iterable[Path]):
+ cmd = 'pvcreate ' + ' '.join([str(pv) for pv in pvs])
+ debug(f'Creating LVM PVS: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str):
+ pvs_str = ' '.join([str(pv) for pv in pvs])
+ cmd = f'vgcreate --yes {vg_name} {pvs_str}'
+
+ debug(f'Creating LVM group: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None):
+ if offset is not None:
+ length = volume.length - offset
+ else:
+ length = volume.length
+
+ length_str = length.format_size(Unit.B, include_unit=False)
+ cmd = f'lvcreate --yes -L {length_str}B {vg_name} -n {volume.name}'
+
+ debug(f'Creating volume: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ volume.vg_name = vg_name
+ volume.dev_path = Path(f'/dev/{vg_name}/{volume.name}')
def _setup_partition(
self,
@@ -385,7 +502,7 @@ class DeviceHandler(object):
# the partition has a path now that it has been added
part_mod.dev_path = Path(partition.path)
- def _fetch_part_info(self, path: Path) -> LsblkInfo:
+ def fetch_part_info(self, path: Path) -> LsblkInfo:
lsblk_info = get_lsblk_info(path)
if not lsblk_info.partn:
@@ -404,6 +521,37 @@ class DeviceHandler(object):
return lsblk_info
+ def create_lvm_btrfs_subvolumes(
+ self,
+ path: Path,
+ btrfs_subvols: List[SubvolumeModification],
+ mount_options: List[str]
+ ):
+ info(f'Creating subvolumes: {path}')
+
+ self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
+
+ for sub_vol in btrfs_subvols:
+ debug(f'Creating subvolume: {sub_vol.name}')
+
+ subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
+
+ SysCommand(f"btrfs subvolume create {subvol_path}")
+
+ if BtrfsMountOption.nodatacow.value in mount_options:
+ try:
+ SysCommand(f'chattr +C {subvol_path}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}')
+
+ if BtrfsMountOption.compress.value in mount_options:
+ try:
+ SysCommand(f'chattr +c {subvol_path}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}')
+
+ self.umount(path)
+
def create_btrfs_volumes(
self,
part_mod: PartitionModification,
@@ -468,8 +616,8 @@ class DeviceHandler(object):
return luks_handler
- def _umount_all_existing(self, device_path: Path):
- info(f'Unmounting all existing partitions: {device_path}')
+ def umount_all_existing(self, device_path: Path):
+ debug(f'Unmounting all existing partitions: {device_path}')
existing_partitions = self._devices[device_path].partition_infos
@@ -498,7 +646,7 @@ class DeviceHandler(object):
raise DiskError('Too many partitions on disk, MBR disks can only have 3 primary partitions')
# make sure all devices are unmounted
- self._umount_all_existing(modification.device_path)
+ self.umount_all_existing(modification.device_path)
# WARNING: the entire device will be wiped and all data lost
if modification.wipe:
diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py
index fe96203c..1cd3d674 100644
--- a/archinstall/lib/disk/device_model.py
+++ b/archinstall/lib/disk/device_model.py
@@ -41,6 +41,8 @@ class DiskLayoutType(Enum):
class DiskLayoutConfiguration:
config_type: DiskLayoutType
device_modifications: List[DeviceModification] = field(default_factory=list)
+ lvm_config: Optional[LvmConfiguration] = None
+
# used for pre-mounted config
mountpoint: Optional[Path] = None
@@ -51,13 +53,18 @@ class DiskLayoutConfiguration:
'mountpoint': str(self.mountpoint)
}
else:
- return {
+ config: Dict[str, Any] = {
'config_type': self.config_type.value,
- 'device_modifications': [mod.json() for mod in self.device_modifications]
+ 'device_modifications': [mod.json() for mod in self.device_modifications],
}
+ if self.lvm_config:
+ config['lvm_config'] = self.lvm_config.json()
+
+ return config
+
@classmethod
- def parse_arg(cls, disk_config: Dict[str, List[Dict[str, Any]]]) -> Optional[DiskLayoutConfiguration]:
+ def parse_arg(cls, disk_config: Dict[str, Any]) -> Optional[DiskLayoutConfiguration]:
from .device_handler import device_handler
device_modifications: List[DeviceModification] = []
@@ -124,6 +131,10 @@ class DiskLayoutConfiguration:
device_modification.partitions = device_partitions
device_modifications.append(device_modification)
+ # Parse LVM configuration from settings
+ if (lvm_arg := disk_config.get('lvm_config', None)) is not None:
+ config.lvm_config = LvmConfiguration.parse_arg(lvm_arg, config)
+
return config
@@ -133,24 +144,24 @@ class PartitionTable(Enum):
class Unit(Enum):
- B = 1 # byte
- kB = 1000**1 # kilobyte
- MB = 1000**2 # megabyte
- GB = 1000**3 # gigabyte
- TB = 1000**4 # terabyte
- PB = 1000**5 # petabyte
- EB = 1000**6 # exabyte
- ZB = 1000**7 # zettabyte
- YB = 1000**8 # yottabyte
-
- KiB = 1024**1 # kibibyte
- MiB = 1024**2 # mebibyte
- GiB = 1024**3 # gibibyte
- TiB = 1024**4 # tebibyte
- PiB = 1024**5 # pebibyte
- EiB = 1024**6 # exbibyte
- ZiB = 1024**7 # zebibyte
- YiB = 1024**8 # yobibyte
+ B = 1 # byte
+ kB = 1000 ** 1 # kilobyte
+ MB = 1000 ** 2 # megabyte
+ GB = 1000 ** 3 # gigabyte
+ TB = 1000 ** 4 # terabyte
+ PB = 1000 ** 5 # petabyte
+ EB = 1000 ** 6 # exabyte
+ ZB = 1000 ** 7 # zettabyte
+ YB = 1000 ** 8 # yottabyte
+
+ KiB = 1024 ** 1 # kibibyte
+ MiB = 1024 ** 2 # mebibyte
+ GiB = 1024 ** 3 # gibibyte
+ TiB = 1024 ** 4 # tebibyte
+ PiB = 1024 ** 5 # pebibyte
+ EiB = 1024 ** 6 # exbibyte
+ ZiB = 1024 ** 7 # zebibyte
+ YiB = 1024 ** 8 # yobibyte
sectors = 'sectors' # size in sector
@@ -575,7 +586,7 @@ class PartitionFlag(Enum):
Which is the way libparted checks for its flags: https://git.savannah.gnu.org/gitweb/?p=parted.git;a=blob;f=libparted/labels/gpt.c;hb=4a0e468ed63fff85a1f9b923189f20945b32f4f1#l183
"""
Boot = _ped.PARTITION_BOOT
- XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
+ XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
ESP = _ped.PARTITION_ESP
@@ -658,6 +669,10 @@ class PartitionModification:
flags: List[PartitionFlag] = field(default_factory=list)
btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+ # only set when modification was created from an existing
+ # partition info object to be able to reference it back
+ part_info: Optional[_PartitionInfo] = None
+
# only set if the device was created or exists
dev_path: Optional[Path] = None
partn: Optional[int] = None
@@ -724,7 +739,8 @@ class PartitionModification:
uuid=partition_info.uuid,
flags=partition_info.flags,
mountpoint=mountpoint,
- btrfs_subvols=subvol_mods
+ btrfs_subvols=subvol_mods,
+ part_info=partition_info
)
@property
@@ -832,6 +848,270 @@ class PartitionModification:
return part_mod
+class LvmLayoutType(Enum):
+ Default = 'default'
+
+ # Manual = 'manual_lvm'
+
+ def display_msg(self) -> str:
+ match self:
+ case LvmLayoutType.Default:
+ return str(_('Default layout'))
+ # case LvmLayoutType.Manual:
+ # return str(_('Manual configuration'))
+
+ raise ValueError(f'Unknown type: {self}')
+
+
+@dataclass
+class LvmVolumeGroup:
+ name: str
+ pvs: List[PartitionModification]
+ volumes: List[LvmVolume] = field(default_factory=list)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'lvm_pvs': [p.obj_id for p in self.pvs],
+ 'volumes': [vol.json() for vol in self.volumes]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmVolumeGroup:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmVolumeGroup(
+ arg['name'],
+ lvm_pvs,
+ [LvmVolume.parse_arg(vol) for vol in arg['volumes']]
+ )
+
+ def contains_lv(self, lv: LvmVolume) -> bool:
+ return lv in self.volumes
+
+
+class LvmVolumeStatus(Enum):
+ Exist = 'existing'
+ Modify = 'modify'
+ Delete = 'delete'
+ Create = 'create'
+
+
+@dataclass
+class LvmVolume:
+ status: LvmVolumeStatus
+ name: str
+ fs_type: FilesystemType
+ length: Size
+ mountpoint: Optional[Path]
+ mount_options: List[str] = field(default_factory=list)
+ btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+
+ # volume group name
+ vg_name: Optional[str] = None
+ # mapper device path /dev/<vg>/<vol>
+ dev_path: Optional[Path] = None
+
+ def __post_init__(self):
+ # needed to use the object as a dictionary key due to hash func
+ if not hasattr(self, '_obj_id'):
+ self._obj_id = uuid.uuid4()
+
+ def __hash__(self):
+ return hash(self._obj_id)
+
+ @property
+ def obj_id(self) -> str:
+ if hasattr(self, '_obj_id'):
+ return str(self._obj_id)
+ return ''
+
+ @property
+ def mapper_name(self) -> Optional[str]:
+ if self.dev_path:
+ return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.safe_dev_path.name}'
+ return None
+
+ @property
+ def mapper_path(self) -> Path:
+ if self.mapper_name:
+ return Path(f'/dev/mapper/{self.mapper_name}')
+
+ raise ValueError('No mapper path set')
+
+ @property
+ def safe_dev_path(self) -> Path:
+ if self.dev_path:
+ return self.dev_path
+ raise ValueError('No device path for volume defined')
+
+ @property
+ def safe_fs_type(self) -> FilesystemType:
+ if self.fs_type is None:
+ raise ValueError('File system type is not set')
+ return self.fs_type
+
+ @property
+ def relative_mountpoint(self) -> Path:
+ """
+ Will return the relative path based on the anchor
+ e.g. Path('/mnt/test') -> Path('mnt/test')
+ """
+ if self.mountpoint is not None:
+ return self.mountpoint.relative_to(self.mountpoint.anchor)
+
+ raise ValueError('Mountpoint is not specified')
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any]) -> LvmVolume:
+ volume = LvmVolume(
+ status=LvmVolumeStatus(arg['status']),
+ name=arg['name'],
+ fs_type=FilesystemType(arg['fs_type']),
+ length=Size.parse_args(arg['length']),
+ mountpoint=Path(arg['mountpoint']) if arg['mountpoint'] else None,
+ mount_options=arg.get('mount_options', []),
+ btrfs_subvols=SubvolumeModification.parse_args(arg.get('btrfs', []))
+ )
+
+ setattr(volume, '_obj_id', arg['obj_id'])
+
+ return volume
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'obj_id': self.obj_id,
+ 'status': self.status.value,
+ 'name': self.name,
+ 'fs_type': self.fs_type.value,
+ 'length': self.length.json(),
+ 'mountpoint': str(self.mountpoint) if self.mountpoint else None,
+ 'mount_options': self.mount_options,
+ 'btrfs': [vol.json() for vol in self.btrfs_subvols]
+ }
+
+ def table_data(self) -> Dict[str, Any]:
+ part_mod = {
+ 'Type': self.status.value,
+ 'Name': self.name,
+ 'Size': self.length.format_highest(),
+ 'FS type': self.fs_type.value,
+ 'Mountpoint': str(self.mountpoint) if self.mountpoint else '',
+ 'Mount options': ', '.join(self.mount_options),
+ 'Btrfs': '{} {}'.format(str(len(self.btrfs_subvols)), 'vol')
+ }
+ return part_mod
+
+ def is_modify(self) -> bool:
+ return self.status == LvmVolumeStatus.Modify
+
+ def exists(self) -> bool:
+ return self.status == LvmVolumeStatus.Exist
+
+ def is_exists_or_modify(self) -> bool:
+ return self.status in [LvmVolumeStatus.Exist, LvmVolumeStatus.Modify]
+
+ def is_root(self) -> bool:
+ if self.mountpoint is not None:
+ return Path('/') == self.mountpoint
+ else:
+ for subvol in self.btrfs_subvols:
+ if subvol.is_root():
+ return True
+
+ return False
+
+
+@dataclass
+class LvmGroupInfo:
+ vg_size: Size
+ vg_uuid: str
+
+
+@dataclass
+class LvmVolumeInfo:
+ lv_name: str
+ vg_name: str
+ lv_size: Size
+
+
+@dataclass
+class LvmPVInfo:
+ pv_name: Path
+ lv_name: str
+ vg_name: str
+
+
+@dataclass
+class LvmConfiguration:
+ config_type: LvmLayoutType
+ vol_groups: List[LvmVolumeGroup]
+
+ def __post_init__(self):
+ # make sure all volume groups have unique PVs
+ pvs = []
+ for group in self.vol_groups:
+ for pv in group.pvs:
+ if pv in pvs:
+ raise ValueError('A PV cannot be used in multiple volume groups')
+ pvs.append(pv)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'config_type': self.config_type.value,
+ 'vol_groups': [vol_gr.json() for vol_gr in self.vol_groups]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmConfiguration:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmConfiguration(
+ config_type=LvmLayoutType(arg['config_type']),
+ vol_groups=[LvmVolumeGroup.parse_arg(vol_group, disk_config) for vol_group in arg['vol_groups']],
+ )
+
+ def get_all_pvs(self) -> List[PartitionModification]:
+ pvs = []
+ for vg in self.vol_groups:
+ pvs += vg.pvs
+
+ return pvs
+
+ def get_all_volumes(self) -> List[LvmVolume]:
+ volumes = []
+
+ for vg in self.vol_groups:
+ volumes += vg.volumes
+
+ return volumes
+
+ def get_root_volume(self) -> Optional[LvmVolume]:
+ for vg in self.vol_groups:
+ filtered = next(filter(lambda x: x.is_root(), vg.volumes), None)
+ if filtered:
+ return filtered
+
+ return None
+
+
+# def get_lv_crypt_uuid(self, lv: LvmVolume, encryption: EncryptionType) -> str:
+# """
+# Find the LUKS superblock UUID for the device that
+# contains the given logical volume
+# """
+# for vg in self.vol_groups:
+# if vg.contains_lv(lv):
+
+
@dataclass
class DeviceModification:
device: BDevice
@@ -885,11 +1165,16 @@ class DeviceModification:
class EncryptionType(Enum):
NoEncryption = "no_encryption"
Luks = "luks"
+ LvmOnLuks = 'lvm_on_luks'
+ LuksOnLvm = 'luks_on_lvm'
@classmethod
def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']:
return {
- 'Luks': EncryptionType.Luks
+ str(_('No Encryption')): EncryptionType.NoEncryption,
+ str(_('LUKS')): EncryptionType.Luks,
+ str(_('LVM on LUKS')): EncryptionType.LvmOnLuks,
+ str(_('LUKS on LVM')): EncryptionType.LuksOnLvm
}
@classmethod
@@ -906,18 +1191,31 @@ class EncryptionType(Enum):
@dataclass
class DiskEncryption:
- encryption_type: EncryptionType = EncryptionType.Luks
+ encryption_type: EncryptionType = EncryptionType.NoEncryption
encryption_password: str = ''
partitions: List[PartitionModification] = field(default_factory=list)
+ lvm_volumes: List[LvmVolume] = field(default_factory=list)
hsm_device: Optional[Fido2Device] = None
- def should_generate_encryption_file(self, part_mod: PartitionModification) -> bool:
- return part_mod in self.partitions and part_mod.mountpoint != Path('/')
+ def __post_init__(self):
+ if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions:
+ raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined')
+
+ if self.encryption_type == EncryptionType.LuksOnLvm and not self.lvm_volumes:
+ raise ValueError('LuksOnLvm encryption require LMV volumes to be defined')
+
+ def should_generate_encryption_file(self, dev: PartitionModification | LvmVolume) -> bool:
+ if isinstance(dev, PartitionModification):
+ return dev in self.partitions and dev.mountpoint != Path('/')
+ elif isinstance(dev, LvmVolume):
+ return dev in self.lvm_volumes and dev.mountpoint != Path('/')
+ return False
def json(self) -> Dict[str, Any]:
obj: Dict[str, Any] = {
'encryption_type': self.encryption_type.value,
- 'partitions': [p.obj_id for p in self.partitions]
+ 'partitions': [p.obj_id for p in self.partitions],
+ 'lvm_volumes': [vol.obj_id for vol in self.lvm_volumes]
}
if self.hsm_device:
@@ -926,22 +1224,46 @@ class DiskEncryption:
return obj
@classmethod
+ def validate_enc(cls, disk_config: DiskLayoutConfiguration) -> bool:
+ partitions = []
+
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ partitions.append(part)
+
+ if len(partitions) > 2: # assume one boot and at least 2 additional
+ if disk_config.lvm_config:
+ return False
+
+ return True
+
+ @classmethod
def parse_arg(
cls,
disk_config: DiskLayoutConfiguration,
arg: Dict[str, Any],
password: str = ''
- ) -> 'DiskEncryption':
+ ) -> Optional['DiskEncryption']:
+ if not cls.validate_enc(disk_config):
+ return None
+
enc_partitions = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
if part.obj_id in arg.get('partitions', []):
enc_partitions.append(part)
+ volumes = []
+ if disk_config.lvm_config:
+ for vol in disk_config.lvm_config.get_all_volumes():
+ if vol.obj_id in arg.get('lvm_volumes', []):
+ volumes.append(vol)
+
enc = DiskEncryption(
EncryptionType(arg['encryption_type']),
password,
- enc_partitions
+ enc_partitions,
+ volumes
)
if hsm := arg.get('hsm_device', None):
@@ -992,7 +1314,7 @@ class LsblkInfo:
tran: Optional[str] = None
partn: Optional[int] = None
partuuid: Optional[str] = None
- parttype :Optional[str] = None
+ parttype: Optional[str] = None
uuid: Optional[str] = None
fstype: Optional[str] = None
fsver: Optional[str] = None
@@ -1017,7 +1339,7 @@ class LsblkInfo:
'tran': self.tran,
'partn': self.partn,
'partuuid': self.partuuid,
- 'parttype' : self.parttype,
+ 'parttype': self.parttype,
'uuid': self.uuid,
'fstype': self.fstype,
'fsver': self.fsver,
@@ -1102,13 +1424,24 @@ def _clean_field(name: str, clean_type: CleanType) -> str:
return name.replace('_percentage', '%').replace('_', '-')
-def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[LsblkInfo]:
+def _fetch_lsblk_info(
+ dev_path: Optional[Union[Path, str]] = None,
+ reverse: bool = False,
+ full_dev_path: bool = False,
+ retry: int = 3
+) -> List[LsblkInfo]:
fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()]
cmd = ['lsblk', '--json', '--bytes', '--output', '+' + ','.join(fields)]
if dev_path:
cmd.append(str(dev_path))
+ if reverse:
+ cmd.append('--inverse')
+
+ if full_dev_path:
+ cmd.append('--paths')
+
try:
result = SysCommand(cmd).decode()
except SysCallError as err:
@@ -1132,8 +1465,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[Lsblk
return [LsblkInfo.from_json(device) for device in blockdevices]
-def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
- if infos := _fetch_lsblk_info(dev_path):
+def get_lsblk_info(
+ dev_path: Union[Path, str],
+ reverse: bool = False,
+ full_dev_path: bool = False
+) -> LsblkInfo:
+ if infos := _fetch_lsblk_info(dev_path, reverse=reverse, full_dev_path=full_dev_path):
return infos[0]
raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"')
@@ -1142,6 +1479,7 @@ def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
def get_all_lsblk_info() -> List[LsblkInfo]:
return _fetch_lsblk_info()
+
def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]:
def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]:
devices = []
diff --git a/archinstall/lib/disk/disk_menu.py b/archinstall/lib/disk/disk_menu.py
new file mode 100644
index 00000000..a7d9ccc3
--- /dev/null
+++ b/archinstall/lib/disk/disk_menu.py
@@ -0,0 +1,140 @@
+from typing import Dict, Optional, Any, TYPE_CHECKING, List
+
+from . import DiskLayoutConfiguration, DiskLayoutType
+from .device_model import LvmConfiguration
+from ..disk import (
+ DeviceModification
+)
+from ..interactions import select_disk_config
+from ..interactions.disk_conf import select_lvm_config
+from ..menu import (
+ Selector,
+ AbstractSubMenu
+)
+from ..output import FormattedOutput
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class DiskLayoutConfigurationMenu(AbstractSubMenu):
+ def __init__(
+ self,
+ disk_layout_config: Optional[DiskLayoutConfiguration],
+ data_store: Dict[str, Any],
+ advanced: bool = False
+ ):
+ self._disk_layout_config = disk_layout_config
+ self._advanced = advanced
+
+ super().__init__(data_store=data_store, preview_size=0.5)
+
+ def setup_selection_menu_options(self):
+ self._menu_options['disk_config'] = \
+ Selector(
+ _('Partitioning'),
+ lambda x: self._select_disk_layout_config(x),
+ display_func=lambda x: self._display_disk_layout(x),
+ preview_func=self._prev_disk_layouts,
+ default=self._disk_layout_config,
+ enabled=True
+ )
+ self._menu_options['lvm_config'] = \
+ Selector(
+ _('Logical Volume Management (LVM)'),
+ lambda x: self._select_lvm_config(x),
+ display_func=lambda x: self.defined_text if x else '',
+ preview_func=self._prev_lvm_config,
+ default=self._disk_layout_config.lvm_config if self._disk_layout_config else None,
+ dependencies=[self._check_dep_lvm],
+ enabled=True
+ )
+
+ def run(self, allow_reset: bool = True) -> Optional[DiskLayoutConfiguration]:
+ super().run(allow_reset=allow_reset)
+
+ disk_layout_config: Optional[DiskLayoutConfiguration] = self._data_store.get('disk_config', None)
+
+ if disk_layout_config:
+ disk_layout_config.lvm_config = self._data_store.get('lvm_config', None)
+
+ return disk_layout_config
+
+ def _check_dep_lvm(self) -> bool:
+ disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+
+ if disk_layout_conf and disk_layout_conf.config_type == DiskLayoutType.Default:
+ return True
+
+ return False
+
+ def _select_disk_layout_config(
+ self,
+ preset: Optional[DiskLayoutConfiguration]
+ ) -> Optional[DiskLayoutConfiguration]:
+ disk_config = select_disk_config(preset, advanced_option=self._advanced)
+
+ if disk_config != preset:
+ self._menu_options['lvm_config'].set_current_selection(None)
+
+ return disk_config
+
+ def _select_lvm_config(self, preset: Optional[LvmConfiguration]) -> Optional[LvmConfiguration]:
+ disk_config: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+ if disk_config:
+ return select_lvm_config(disk_config, preset=preset)
+ return preset
+
+ def _display_disk_layout(self, current_value: Optional[DiskLayoutConfiguration] = None) -> str:
+ if current_value:
+ return current_value.config_type.display_msg()
+ return ''
+
+ def _prev_disk_layouts(self) -> Optional[str]:
+ disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+
+ if disk_layout_conf:
+ device_mods: List[DeviceModification] = \
+ list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications))
+
+ if device_mods:
+ output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg())
+ output_btrfs = ''
+
+ for mod in device_mods:
+ # create partition table
+ partition_table = FormattedOutput.as_table(mod.partitions)
+
+ output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n'
+ output_partition += partition_table + '\n'
+
+ # create btrfs table
+ btrfs_partitions = list(
+ filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions)
+ )
+ for partition in btrfs_partitions:
+ output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n'
+
+ output = output_partition + output_btrfs
+ return output.rstrip()
+
+ return None
+
+ def _prev_lvm_config(self) -> Optional[str]:
+ lvm_config: Optional[LvmConfiguration] = self._menu_options['lvm_config'].current_selection
+
+ if lvm_config:
+ output = '{}: {}\n'.format(str(_('Configuration')), lvm_config.config_type.display_msg())
+
+ for vol_gp in lvm_config.vol_groups:
+ pv_table = FormattedOutput.as_table(vol_gp.pvs)
+ output += '{}:\n{}'.format(str(_('Physical volumes')), pv_table)
+
+ output += f'\nVolume Group: {vol_gp.name}'
+
+ lvm_volumes = FormattedOutput.as_table(vol_gp.volumes)
+ output += '\n\n{}:\n{}'.format(str(_('Volumes')), lvm_volumes)
+
+ return output
+
+ return None
diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py
index c3a1c32f..b0e292ce 100644
--- a/archinstall/lib/disk/encryption_menu.py
+++ b/archinstall/lib/disk/encryption_menu.py
@@ -1,6 +1,7 @@
from pathlib import Path
from typing import Dict, Optional, Any, TYPE_CHECKING, List
+from . import LvmConfiguration, LvmVolume
from ..disk import (
DeviceModification,
DiskLayoutConfiguration,
@@ -40,31 +41,41 @@ class DiskEncryptionMenu(AbstractSubMenu):
super().__init__(data_store=data_store)
def setup_selection_menu_options(self):
+ self._menu_options['encryption_type'] = \
+ Selector(
+ _('Encryption type'),
+ func=lambda preset: select_encryption_type(self._disk_config, preset),
+ display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
+ default=self._preset.encryption_type,
+ enabled=True,
+ )
self._menu_options['encryption_password'] = \
Selector(
_('Encryption password'),
lambda x: select_encrypted_password(),
+ dependencies=[self._check_dep_enc_type],
display_func=lambda x: secret(x) if x else '',
default=self._preset.encryption_password,
enabled=True
)
- self._menu_options['encryption_type'] = \
- Selector(
- _('Encryption type'),
- func=lambda preset: select_encryption_type(preset),
- display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
- dependencies=['encryption_password'],
- default=self._preset.encryption_type,
- enabled=True
- )
self._menu_options['partitions'] = \
Selector(
_('Partitions'),
func=lambda preset: select_partitions_to_encrypt(self._disk_config.device_modifications, preset),
display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None,
- dependencies=['encryption_password'],
+ dependencies=[self._check_dep_partitions],
default=self._preset.partitions,
- preview_func=self._prev_disk_layouts,
+ preview_func=self._prev_partitions,
+ enabled=True
+ )
+ self._menu_options['lvm_vols'] = \
+ Selector(
+ _('LVM volumes'),
+ func=lambda preset: self._select_lvm_vols(preset),
+ display_func=lambda x: f'{len(x)} {_("LVM volumes")}' if x else None,
+ dependencies=[self._check_dep_lvm_vols],
+ default=self._preset.lvm_volumes,
+ preview_func=self._prev_lvm_vols,
enabled=True
)
self._menu_options['HSM'] = \
@@ -73,19 +84,54 @@ class DiskEncryptionMenu(AbstractSubMenu):
func=lambda preset: select_hsm(preset),
display_func=lambda x: self._display_hsm(x),
preview_func=self._prev_hsm,
- dependencies=['encryption_password'],
+ dependencies=[self._check_dep_enc_type],
default=self._preset.hsm_device,
enabled=True
)
+ def _select_lvm_vols(self, preset: List[LvmVolume]) -> List[LvmVolume]:
+ if self._disk_config.lvm_config:
+ return select_lvm_vols_to_encrypt(self._disk_config.lvm_config, preset=preset)
+ return []
+
+ def _check_dep_enc_type(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type != EncryptionType.NoEncryption:
+ return True
+ return False
+
+ def _check_dep_partitions(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks]:
+ return True
+ return False
+
+ def _check_dep_lvm_vols(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type == EncryptionType.LuksOnLvm:
+ return True
+ return False
+
def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]:
super().run(allow_reset=allow_reset)
- if self._data_store.get('encryption_password', None):
+ enc_type = self._data_store.get('encryption_type', None)
+ enc_password = self._data_store.get('encryption_password', None)
+ enc_partitions = self._data_store.get('partitions', None)
+ enc_lvm_vols = self._data_store.get('lvm_vols', None)
+
+ if enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and enc_partitions:
+ enc_lvm_vols = []
+
+ if enc_type == EncryptionType.LuksOnLvm:
+ enc_partitions = []
+
+ if enc_type != EncryptionType.NoEncryption and enc_password and (enc_partitions or enc_lvm_vols):
return DiskEncryption(
- encryption_password=self._data_store.get('encryption_password', None),
- encryption_type=self._data_store['encryption_type'],
- partitions=self._data_store.get('partitions', None),
+ encryption_password=enc_password,
+ encryption_type=enc_type,
+ partitions=enc_partitions,
+ lvm_volumes=enc_lvm_vols,
hsm_device=self._data_store.get('HSM', None)
)
@@ -97,7 +143,7 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
- def _prev_disk_layouts(self) -> Optional[str]:
+ def _prev_partitions(self) -> Optional[str]:
partitions: Optional[List[PartitionModification]] = self._menu_options['partitions'].current_selection
if partitions:
output = str(_('Partitions to be encrypted')) + '\n'
@@ -106,6 +152,15 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
+ def _prev_lvm_vols(self) -> Optional[str]:
+ volumes: Optional[List[PartitionModification]] = self._menu_options['lvm_vols'].current_selection
+ if volumes:
+ output = str(_('LVM volumes to be encrypted')) + '\n'
+ output += FormattedOutput.as_table(volumes)
+ return output.rstrip()
+
+ return None
+
def _prev_hsm(self) -> Optional[str]:
try:
Fido2.get_fido2_devices()
@@ -123,13 +178,19 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
-def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]:
+def select_encryption_type(disk_config: DiskLayoutConfiguration, preset: EncryptionType) -> Optional[EncryptionType]:
title = str(_('Select disk encryption option'))
- options = [
- EncryptionType.type_to_text(EncryptionType.Luks)
- ]
+
+ if disk_config.lvm_config:
+ options = [
+ EncryptionType.type_to_text(EncryptionType.LvmOnLuks),
+ EncryptionType.type_to_text(EncryptionType.LuksOnLvm)
+ ]
+ else:
+ options = [EncryptionType.type_to_text(EncryptionType.Luks)]
preset_value = EncryptionType.type_to_text(preset)
+
choice = Menu(title, options, preset_values=preset_value).run()
match choice.type_:
@@ -197,3 +258,31 @@ def select_partitions_to_encrypt(
case MenuSelectionType.Selection:
return choice.multi_value
return []
+
+
+def select_lvm_vols_to_encrypt(
+ lvm_config: LvmConfiguration,
+ preset: List[LvmVolume]
+) -> List[LvmVolume]:
+ volumes: List[LvmVolume] = lvm_config.get_all_volumes()
+
+ if volumes:
+ title = str(_('Select which LVM volumes to encrypt'))
+ partition_table = FormattedOutput.as_table(volumes)
+
+ choice = TableMenu(
+ title,
+ table_data=(volumes, partition_table),
+ preset=preset,
+ multi=True
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Reset:
+ return []
+ case MenuSelectionType.Skip:
+ return preset
+ case MenuSelectionType.Selection:
+ return choice.multi_value
+
+ return []
diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py
index 49904c17..5a139534 100644
--- a/archinstall/lib/disk/fido.py
+++ b/archinstall/lib/disk/fido.py
@@ -4,7 +4,7 @@ import getpass
from pathlib import Path
from typing import List
-from .device_model import PartitionModification, Fido2Device
+from .device_model import Fido2Device
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
from ..output import error, info
from ..exceptions import SysCallError
@@ -72,16 +72,16 @@ class Fido2:
def fido2_enroll(
cls,
hsm_device: Fido2Device,
- part_mod: PartitionModification,
+ dev_path: Path,
password: str
):
- worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {part_mod.dev_path}", peek_output=True)
+ worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {dev_path}", peek_output=True)
pw_inputted = False
pin_inputted = False
while worker.is_alive():
if pw_inputted is False:
- if bytes(f"please enter current passphrase for disk {part_mod.dev_path}", 'UTF-8') in worker._trace_log.lower():
+ if bytes(f"please enter current passphrase for disk {dev_path}", 'UTF-8') in worker._trace_log.lower():
worker.write(bytes(password, 'UTF-8'))
pw_inputted = True
elif pin_inputted is False:
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 9c6e6d35..5c11896e 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -3,13 +3,21 @@ from __future__ import annotations
import signal
import sys
import time
-from typing import Any, Optional, TYPE_CHECKING
+from pathlib import Path
+from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set
-from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption
from .device_handler import device_handler
+from .device_model import (
+ DiskLayoutConfiguration, DiskLayoutType, PartitionTable,
+ FilesystemType, DiskEncryption, LvmVolumeGroup,
+ Size, Unit, SectorSize, PartitionModification, EncryptionType,
+ LvmVolume, LvmConfiguration
+)
from ..hardware import SysInfo
-from ..output import debug
+from ..luks import Luks2
from ..menu import Menu
+from ..output import debug, info
+from ..general import SysCommand
if TYPE_CHECKING:
_: Any
@@ -52,13 +60,288 @@ class FilesystemHandler:
for mod in device_mods:
device_handler.partition(mod, partition_table=partition_table)
- device_handler.format(mod, enc_conf=self._enc_config)
- for part_mod in mod.partitions:
- if part_mod.is_create_or_modify():
+ if self._disk_config.lvm_config:
+ for mod in device_mods:
+ if boot_part := mod.get_boot_partition():
+ debug(f'Formatting boot partition: {boot_part.dev_path}')
+ self._format_partitions(
+ [boot_part],
+ mod.device_path
+ )
+
+ self.perform_lvm_operations()
+ else:
+ for mod in device_mods:
+ self._format_partitions(
+ mod.partitions,
+ mod.device_path
+ )
+
+ for part_mod in mod.partitions:
if part_mod.fs_type == FilesystemType.Btrfs:
device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config)
+ def _format_partitions(
+ self,
+ partitions: List[PartitionModification],
+ device_path: Path
+ ):
+ """
+ Format can be given an overriding path, for instance /dev/null to test
+ the formatting functionality and in essence the support for the given filesystem.
+ """
+
+ # don't touch existing partitions
+ create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()]
+
+ self._validate_partitions(create_or_modify_parts)
+
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(device_path)
+
+ for part_mod in create_or_modify_parts:
+ # partition will be encrypted
+ if self._enc_config is not None and part_mod in self._enc_config.partitions:
+ device_handler.format_encrypted(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ part_mod.safe_fs_type,
+ self._enc_config
+ )
+ else:
+ device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path)
+
+ # synchronize with udev before using lsblk
+ SysCommand('udevadm settle')
+
+ lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path)
+
+ part_mod.partn = lsblk_info.partn
+ part_mod.partuuid = lsblk_info.partuuid
+ part_mod.uuid = lsblk_info.uuid
+
+ def _validate_partitions(self, partitions: List[PartitionModification]):
+ checks = {
+ # verify that all partitions have a path set (which implies that they have been created)
+ lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
+ # crypto luks is not a valid file system type
+ lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError(
+ 'Crypto luks cannot be set as a filesystem type'),
+ # file system type must be set
+ lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
+ }
+
+ for check, exc in checks.items():
+ found = next(filter(check, partitions), None)
+ if found is not None:
+ raise exc
+
+ def perform_lvm_operations(self):
+ info('Setting up LVM config...')
+
+ if not self._disk_config.lvm_config:
+ return
+
+ if self._enc_config:
+ self._setup_lvm_encrypted(
+ self._disk_config.lvm_config,
+ self._enc_config
+ )
+ else:
+ self._setup_lvm(self._disk_config.lvm_config)
+ self._format_lvm_vols(self._disk_config.lvm_config)
+
+ def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption):
+ if enc_config.encryption_type == EncryptionType.LvmOnLuks:
+ enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False)
+
+ self._setup_lvm(lvm_config, enc_mods)
+ self._format_lvm_vols(lvm_config)
+
+ # export the lvm group safely otherwise the Luks cannot be closed
+ self._safely_close_lvm(lvm_config)
+
+ for luks in enc_mods.values():
+ luks.lock()
+ elif enc_config.encryption_type == EncryptionType.LuksOnLvm:
+ self._setup_lvm(lvm_config)
+ enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False)
+ self._format_lvm_vols(lvm_config, enc_vols)
+
+ for luks in enc_vols.values():
+ luks.lock()
+
+ self._safely_close_lvm(lvm_config)
+
+ def _safely_close_lvm(self, lvm_config: LvmConfiguration):
+ for vg in lvm_config.vol_groups:
+ for vol in vg.volumes:
+ device_handler.lvm_vol_change(vol, False)
+
+ device_handler.lvm_export_vg(vg)
+
+ def _setup_lvm(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ self._lvm_create_pvs(lvm_config, enc_mods)
+
+ for vg in lvm_config.vol_groups:
+ pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods)
+
+ device_handler.lvm_vg_create(pv_dev_paths, vg.name)
+
+ # figure out what the actual available size in the group is
+ vg_info = device_handler.lvm_group_info(vg.name)
+
+ if not vg_info:
+ raise ValueError('Unable to fetch VG info')
+
+ # the actual available LVM Group size will be smaller than the
+ # total PVs size due to reserved metadata storage etc.
+ # so we'll have a look at the total avail. size, check the delta
+ # to the desired sizes and subtract some equally from the actually
+ # created volume
+ avail_size = vg_info.vg_size
+ desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default()))
+
+ delta = desired_size - avail_size
+ max_vol_offset = delta.convert(Unit.B)
+
+ max_vol = max(vg.volumes, key=lambda x: x.length)
+
+ for lv in vg.volumes:
+ offset = max_vol_offset if lv == max_vol else None
+
+ debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}')
+ device_handler.lvm_vol_create(vg.name, lv, offset)
+
+ while True:
+ debug('Fetching LVM volume info')
+ lv_info = device_handler.lvm_vol_info(lv.name)
+ if lv_info is not None:
+ break
+
+ time.sleep(1)
+
+ self._lvm_vol_handle_e2scrub(vg)
+
+ def _format_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+ ):
+ for vol in lvm_config.get_all_volumes():
+ if enc_vol := enc_vols.get(vol, None):
+ if not enc_vol.mapper_dev:
+ raise ValueError('No mapper device defined')
+ path = enc_vol.mapper_dev
+ else:
+ path = vol.safe_dev_path
+
+ # wait a bit otherwise the mkfs will fail as it can't
+ # find the mapper device yet
+ device_handler.format(vol.fs_type, path)
+
+ if vol.fs_type == FilesystemType.Btrfs:
+ device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options)
+
+ def _lvm_create_pvs(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ pv_paths: Set[Path] = set()
+
+ for vg in lvm_config.vol_groups:
+ pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods)
+
+ device_handler.lvm_pv_create(pv_paths)
+
+ def _get_all_pv_dev_paths(
+ self,
+ pvs: List[PartitionModification],
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ) -> Set[Path]:
+ pv_paths: Set[Path] = set()
+
+ for pv in pvs:
+ if enc_pv := enc_mods.get(pv, None):
+ if mapper := enc_pv.mapper_dev:
+ pv_paths.add(mapper)
+ else:
+ pv_paths.add(pv.safe_dev_path)
+
+ return pv_paths
+
+ def _encrypt_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[LvmVolume, Luks2]:
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+
+ for vol in lvm_config.get_all_volumes():
+ if vol in enc_config.lvm_volumes:
+ luks_handler = device_handler.encrypt(
+ vol.safe_dev_path,
+ vol.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create
+ )
+
+ enc_vols[vol] = luks_handler
+
+ return enc_vols
+
+ def _encrypt_partitions(
+ self,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[PartitionModification, Luks2]:
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+
+ for mod in self._disk_config.device_modifications:
+ partitions = mod.partitions
+
+ # don't touch existing partitions
+ filtered_part = [p for p in partitions if not p.exists()]
+
+ self._validate_partitions(filtered_part)
+
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(mod.device_path)
+
+ enc_mods = {}
+
+ for part_mod in filtered_part:
+ if part_mod in enc_config.partitions:
+ luks_handler = device_handler.encrypt(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create=lock_after_create
+ )
+
+ enc_mods[part_mod] = luks_handler
+
+ return enc_mods
+
+ def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup):
+ # from arch wiki:
+ # If a logical volume will be formatted with ext4, leave at least 256 MiB
+ # free space in the volume group to allow using e2scrub
+ if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]):
+ largest_vol = max(vol_gp.volumes, key=lambda x: x.length)
+
+ device_handler.lvm_vol_reduce(
+ largest_vol.safe_dev_path,
+ Size(256, Unit.MiB, SectorSize.default())
+ )
+
def _do_countdown(self) -> bool:
SIG_TRIGGER = False
diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py
index 823605e3..330f61a3 100644
--- a/archinstall/lib/disk/partitioning_menu.py
+++ b/archinstall/lib/disk/partitioning_menu.py
@@ -2,7 +2,7 @@ from __future__ import annotations
import re
from pathlib import Path
-from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple
+from typing import Any, TYPE_CHECKING, List, Optional, Tuple
from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \
ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption
@@ -38,21 +38,6 @@ class PartitioningList(ListManager):
display_actions = list(self._actions.values())
super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:])
- def reformat(self, data: List[PartitionModification]) -> Dict[str, Optional[PartitionModification]]:
- table = FormattedOutput.as_table(data)
- rows = table.split('\n')
-
- # these are the header rows of the table and do not map to any User obviously
- # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
- # the selectable rows so the header has to be aligned
- display_data: Dict[str, Optional[PartitionModification]] = {f' {rows[0]}': None, f' {rows[1]}': None}
-
- for row, user in zip(rows[2:], data):
- row = row.replace('|', '\\|')
- display_data[row] = user
-
- return display_data
-
def selected_action_display(self, partition: PartitionModification) -> str:
return str(_('Partition'))
@@ -258,7 +243,6 @@ class PartitioningList(ListManager):
while True:
value = TextInput(prompt).run().strip()
size: Optional[Size] = None
-
if not value:
size = default
else:
diff --git a/archinstall/lib/disk/subvolume_menu.py b/archinstall/lib/disk/subvolume_menu.py
index 48afa829..ea77149d 100644
--- a/archinstall/lib/disk/subvolume_menu.py
+++ b/archinstall/lib/disk/subvolume_menu.py
@@ -1,9 +1,8 @@
from pathlib import Path
-from typing import Dict, List, Optional, Any, TYPE_CHECKING
+from typing import List, Optional, Any, TYPE_CHECKING
from .device_model import SubvolumeModification
from ..menu import TextInput, ListManager
-from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@@ -18,21 +17,6 @@ class SubvolumeMenu(ListManager):
]
super().__init__(prompt, btrfs_subvols, [self._actions[0]], self._actions[1:])
- def reformat(self, data: List[SubvolumeModification]) -> Dict[str, Optional[SubvolumeModification]]:
- table = FormattedOutput.as_table(data)
- rows = table.split('\n')
-
- # these are the header rows of the table and do not map to any User obviously
- # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
- # the selectable rows so the header has to be aligned
- display_data: Dict[str, Optional[SubvolumeModification]] = {f' {rows[0]}': None, f' {rows[1]}': None}
-
- for row, subvol in zip(rows[2:], data):
- row = row.replace('|', '\\|')
- display_data[row] = subvol
-
- return display_data
-
def selected_action_display(self, subvolume: SubvolumeModification) -> str:
return str(subvolume.name)
diff --git a/archinstall/lib/global_menu.py b/archinstall/lib/global_menu.py
index e65915db..1b5e779b 100644
--- a/archinstall/lib/global_menu.py
+++ b/archinstall/lib/global_menu.py
@@ -14,7 +14,6 @@ from .models.audio_configuration import Audio, AudioConfiguration
from .models.users import User
from .output import FormattedOutput
from .profile.profile_menu import ProfileConfiguration
-from .storage import storage
from .configuration import save_config
from .interactions import add_number_of_parallel_downloads
from .interactions import ask_additional_packages_to_install
@@ -30,7 +29,6 @@ from .interactions import select_additional_repositories
from .interactions import select_kernel
from .utils.util import format_cols
from .interactions import ask_ntp
-from .interactions.disk_conf import select_disk_config
if TYPE_CHECKING:
_: Any
@@ -38,7 +36,6 @@ if TYPE_CHECKING:
class GlobalMenu(AbstractMenu):
def __init__(self, data_store: Dict[str, Any]):
- self._defined_text = str(_('Defined'))
super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3)
def setup_selection_menu_options(self):
@@ -54,20 +51,20 @@ class GlobalMenu(AbstractMenu):
_('Locales'),
lambda preset: self._locale_selection(preset),
preview_func=self._prev_locale,
- display_func=lambda x: self._defined_text if x else '')
+ display_func=lambda x: self.defined_text if x else '')
self._menu_options['mirror_config'] = \
Selector(
_('Mirrors'),
lambda preset: self._mirror_configuration(preset),
- display_func=lambda x: self._defined_text if x else '',
+ display_func=lambda x: self.defined_text if x else '',
preview_func=self._prev_mirror_config
)
self._menu_options['disk_config'] = \
Selector(
_('Disk configuration'),
lambda preset: self._select_disk_config(preset),
- preview_func=self._prev_disk_layouts,
- display_func=lambda x: self._display_disk_layout(x),
+ preview_func=self._prev_disk_config,
+ display_func=lambda x: self.defined_text if x else '',
)
self._menu_options['disk_encryption'] = \
Selector(
@@ -75,7 +72,8 @@ class GlobalMenu(AbstractMenu):
lambda preset: self._disk_encryption(preset),
preview_func=self._prev_disk_encryption,
display_func=lambda x: self._display_disk_encryption(x),
- dependencies=['disk_config'])
+ dependencies=['disk_config']
+ )
self._menu_options['swap'] = \
Selector(
_('Swap'),
@@ -140,7 +138,7 @@ class GlobalMenu(AbstractMenu):
Selector(
_('Additional packages'),
lambda preset: ask_additional_packages_to_install(preset),
- display_func=lambda x: self._defined_text if x else '',
+ display_func=lambda x: self.defined_text if x else '',
preview_func=self._prev_additional_pkgs,
default=[])
self._menu_options['additional-repositories'] = \
@@ -247,14 +245,17 @@ class GlobalMenu(AbstractMenu):
return config.type.display_msg()
def _disk_encryption(self, preset: Optional[disk.DiskEncryption]) -> Optional[disk.DiskEncryption]:
- mods: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+ disk_config: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
- if not mods:
+ if not disk_config:
# this should not happen as the encryption menu has the disk_config as dependency
raise ValueError('No disk layout specified')
+ if not disk.DiskEncryption.validate_enc(disk_config):
+ return None
+
data_store: Dict[str, Any] = {}
- disk_encryption = disk.DiskEncryptionMenu(mods, data_store, preset=preset).run()
+ disk_encryption = disk.DiskEncryptionMenu(disk_config, data_store, preset=preset).run()
return disk_encryption
def _locale_selection(self, preset: LocaleConfiguration) -> LocaleConfiguration:
@@ -287,44 +288,35 @@ class GlobalMenu(AbstractMenu):
return format_cols(packages, None)
return None
- def _prev_disk_layouts(self) -> Optional[str]:
+ def _prev_disk_config(self) -> Optional[str]:
selector = self._menu_options['disk_config']
disk_layout_conf: Optional[disk.DiskLayoutConfiguration] = selector.current_selection
+ output = ''
if disk_layout_conf:
- device_mods: List[disk.DeviceModification] = \
- list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications))
-
- if device_mods:
- output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg())
- output_btrfs = ''
+ output += str(_('Configuration type: {}')).format(disk_layout_conf.config_type.display_msg())
- for mod in device_mods:
- # create partition table
- partition_table = FormattedOutput.as_table(mod.partitions)
-
- output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n'
- output_partition += partition_table + '\n'
-
- # create btrfs table
- btrfs_partitions = list(
- filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions)
- )
- for partition in btrfs_partitions:
- output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n'
+ if disk_layout_conf.lvm_config:
+ output += '\n{}: {}'.format(str(_('LVM configuration type')), disk_layout_conf.lvm_config.config_type.display_msg())
- output = output_partition + output_btrfs
- return output.rstrip()
+ if output:
+ return output
return None
- def _display_disk_layout(self, current_value: Optional[disk.DiskLayoutConfiguration] = None) -> str:
+ def _display_disk_config(self, current_value: Optional[disk.DiskLayoutConfiguration] = None) -> str:
if current_value:
return current_value.config_type.display_msg()
return ''
def _prev_disk_encryption(self) -> Optional[str]:
+ disk_config: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+
+ if disk_config and not disk.DiskEncryption.validate_enc(disk_config):
+ return str(_('LVM disk encryption with more than 2 partitions is currently not supported'))
+
encryption: Optional[disk.DiskEncryption] = self._menu_options['disk_encryption'].current_selection
+
if encryption:
enc_type = disk.EncryptionType.type_to_text(encryption.encryption_type)
output = str(_('Encryption type')) + f': {enc_type}\n'
@@ -332,6 +324,8 @@ class GlobalMenu(AbstractMenu):
if encryption.partitions:
output += 'Partitions: {} selected'.format(len(encryption.partitions)) + '\n'
+ elif encryption.lvm_volumes:
+ output += 'LVM volumes: {} selected'.format(len(encryption.lvm_volumes)) + '\n'
if encryption.hsm_device:
output += f'HSM: {encryption.hsm_device.manufacturer}'
@@ -425,10 +419,8 @@ class GlobalMenu(AbstractMenu):
self,
preset: Optional[disk.DiskLayoutConfiguration] = None
) -> Optional[disk.DiskLayoutConfiguration]:
- disk_config = select_disk_config(
- preset,
- storage['arguments'].get('advanced', False)
- )
+ data_store: Dict[str, Any] = {}
+ disk_config = disk.DiskLayoutConfigurationMenu(preset, data_store).run()
if disk_config != preset:
self._menu_options['disk_encryption'].set_current_selection(None)
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 37121118..8292a3be 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -52,7 +52,7 @@ class Installer:
`Installer()` is the wrapper for most basic installation steps.
It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
"""
- self.base_packages = base_packages or __packages__[:3]
+ self._base_packages = base_packages or __packages__[:3]
self.kernels = kernels or ['linux']
self._disk_config = disk_config
@@ -64,11 +64,11 @@ class Installer:
self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None}
for kernel in self.kernels:
- self.base_packages.append(kernel)
+ self._base_packages.append(kernel)
# If using accessibility tools in the live environment, append those to the packages list
if accessibility_tools_in_use():
- self.base_packages.extend(__accessibility_packages__)
+ self._base_packages.extend(__accessibility_packages__)
self.post_base_install: List[Callable] = []
@@ -90,6 +90,8 @@ class Installer:
self._fstab_entries: List[str] = []
self._zram_enabled = False
+ self._disable_fstrim = False
+
self.pacman = Pacman(self.target, storage['arguments'].get('silent', False))
def __enter__(self) -> 'Installer':
@@ -198,31 +200,71 @@ class Installer:
self._verify_service_stop()
def mount_ordered_layout(self):
- info('Mounting partitions in order')
+ debug('Mounting ordered layout')
+
+ luks_handlers: Dict[Any, Luks2] = {}
+
+ match self._disk_encryption.encryption_type:
+ case disk.EncryptionType.NoEncryption:
+ self._mount_lvm_layout()
+ case disk.EncryptionType.Luks:
+ luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions)
+ case disk.EncryptionType.LvmOnLuks:
+ luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions)
+ self._import_lvm()
+ self._mount_lvm_layout(luks_handlers)
+ case disk.EncryptionType.LuksOnLvm:
+ self._import_lvm()
+ luks_handlers = self._prepare_luks_lvm(self._disk_encryption.lvm_volumes)
+ self._mount_lvm_layout(luks_handlers)
+
+ # mount all regular partitions
+ self._mount_partition_layout(luks_handlers)
+
+ def _mount_partition_layout(self, luks_handlers: Dict[Any, Luks2]):
+ debug('Mounting partition layout')
+
+ # do not mount any PVs part of the LVM configuration
+ pvs = []
+ if self._disk_config.lvm_config:
+ pvs = self._disk_config.lvm_config.get_all_pvs()
for mod in self._disk_config.device_modifications:
+ not_pv_part_mods = list(filter(lambda x: x not in pvs, mod.partitions))
+
# partitions have to mounted in the right order on btrfs the mountpoint will
# be empty as the actual subvolumes are getting mounted instead so we'll use
# '/' just for sorting
- sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/'))
-
- enc_partitions = []
- if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption:
- enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions))
-
- # attempt to decrypt all luks partitions
- luks_handlers = self._prepare_luks_partitions(enc_partitions)
+ sorted_part_mods = sorted(not_pv_part_mods, key=lambda x: x.mountpoint or Path('/'))
for part_mod in sorted_part_mods:
if luks_handler := luks_handlers.get(part_mod):
- # mount encrypted partition
self._mount_luks_partition(part_mod, luks_handler)
else:
- # partition is not encrypted
self._mount_partition(part_mod)
- def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[
- disk.PartitionModification, Luks2]:
+ def _mount_lvm_layout(self, luks_handlers: Dict[Any, Luks2] = {}):
+ lvm_config = self._disk_config.lvm_config
+
+ if not lvm_config:
+ debug('No lvm config defined to be mounted')
+ return
+
+ debug('Mounting LVM layout')
+
+ for vg in lvm_config.vol_groups:
+ sorted_vol = sorted(vg.volumes, key=lambda x: x.mountpoint or Path('/'))
+
+ for vol in sorted_vol:
+ if luks_handler := luks_handlers.get(vol):
+ self._mount_luks_volume(vol, luks_handler)
+ else:
+ self._mount_lvm_vol(vol)
+
+ def _prepare_luks_partitions(
+ self,
+ partitions: List[disk.PartitionModification]
+ ) -> Dict[disk.PartitionModification, Luks2]:
return {
part_mod: disk.device_handler.unlock_luks2_dev(
part_mod.dev_path,
@@ -233,6 +275,33 @@ class Installer:
if part_mod.mapper_name and part_mod.dev_path
}
+ def _import_lvm(self):
+ lvm_config = self._disk_config.lvm_config
+
+ if not lvm_config:
+ debug('No lvm config defined to be imported')
+ return
+
+ for vg in lvm_config.vol_groups:
+ disk.device_handler.lvm_import_vg(vg)
+
+ for vol in vg.volumes:
+ disk.device_handler.lvm_vol_change(vol, True)
+
+ def _prepare_luks_lvm(
+ self,
+ lvm_volumes: List[disk.LvmVolume]
+ ) -> Dict[disk.LvmVolume, Luks2]:
+ return {
+ vol: disk.device_handler.unlock_luks2_dev(
+ vol.dev_path,
+ vol.mapper_name,
+ self._disk_encryption.encryption_password
+ )
+ for vol in lvm_volumes
+ if vol.mapper_name and vol.dev_path
+ }
+
def _mount_partition(self, part_mod: disk.PartitionModification):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined
if part_mod.mountpoint and part_mod.dev_path:
@@ -246,14 +315,32 @@ class Installer:
part_mod.mount_options
)
+ def _mount_lvm_vol(self, volume: disk.LvmVolume):
+ if volume.fs_type != disk.FilesystemType.Btrfs:
+ if volume.mountpoint and volume.dev_path:
+ target = self.target / volume.relative_mountpoint
+ disk.device_handler.mount(volume.dev_path, target, options=volume.mount_options)
+
+ if volume.fs_type == disk.FilesystemType.Btrfs and volume.dev_path:
+ self._mount_btrfs_subvol(volume.dev_path, volume.btrfs_subvols, volume.mount_options)
+
def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2):
- # it would be none if it's btrfs as the subvolumes will have the mountpoints defined
- if part_mod.mountpoint and luks_handler.mapper_dev:
- target = self.target / part_mod.relative_mountpoint
- disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
+ if part_mod.fs_type != disk.FilesystemType.Btrfs:
+ if part_mod.mountpoint and luks_handler.mapper_dev:
+ target = self.target / part_mod.relative_mountpoint
+ disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
- self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols)
+ self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options)
+
+ def _mount_luks_volume(self, volume: disk.LvmVolume, luks_handler: Luks2):
+ if volume.fs_type != disk.FilesystemType.Btrfs:
+ if volume.mountpoint and luks_handler.mapper_dev:
+ target = self.target / volume.relative_mountpoint
+ disk.device_handler.mount(luks_handler.mapper_dev, target, options=volume.mount_options)
+
+ if volume.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
+ self._mount_btrfs_subvol(luks_handler.mapper_dev, volume.btrfs_subvols, volume.mount_options)
def _mount_btrfs_subvol(
self,
@@ -262,13 +349,23 @@ class Installer:
mount_options: List[str] = []
):
for subvol in subvolumes:
- disk.device_handler.mount(
- dev_path,
- self.target / subvol.relative_mountpoint,
- options=mount_options + [f'subvol={subvol.name}']
- )
+ mountpoint = self.target / subvol.relative_mountpoint
+ mount_options = mount_options + [f'subvol={subvol.name}']
+ disk.device_handler.mount(dev_path, mountpoint, options=mount_options)
def generate_key_files(self):
+ match self._disk_encryption.encryption_type:
+ case disk.EncryptionType.Luks:
+ self._generate_key_files_partitions()
+ case disk.EncryptionType.LuksOnLvm:
+ self._generate_key_file_lvm_volumes()
+ case disk.EncryptionType.LvmOnLuks:
+ # currently LvmOnLuks only supports a single
+ # partitioning layout (boot + partition)
+ # so we won't need any keyfile generation atm
+ pass
+
+ def _generate_key_files_partitions(self):
for part_mod in self._disk_encryption.partitions:
gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod)
@@ -279,14 +376,36 @@ class Installer:
)
if gen_enc_file and not part_mod.is_root():
- info(f'Creating key-file: {part_mod.dev_path}')
+ debug(f'Creating key-file: {part_mod.dev_path}')
luks_handler.create_keyfile(self.target)
if part_mod.is_root() and not gen_enc_file:
if self._disk_encryption.hsm_device:
disk.Fido2.fido2_enroll(
self._disk_encryption.hsm_device,
- part_mod,
+ part_mod.safe_dev_path,
+ self._disk_encryption.encryption_password
+ )
+
+ def _generate_key_file_lvm_volumes(self):
+ for vol in self._disk_encryption.lvm_volumes:
+ gen_enc_file = self._disk_encryption.should_generate_encryption_file(vol)
+
+ luks_handler = Luks2(
+ vol.safe_dev_path,
+ mapper_name=vol.mapper_name,
+ password=self._disk_encryption.encryption_password
+ )
+
+ if gen_enc_file and not vol.is_root():
+ info(f'Creating key-file: {vol.dev_path}')
+ luks_handler.create_keyfile(self.target)
+
+ if vol.is_root() and not gen_enc_file:
+ if self._disk_encryption.hsm_device:
+ disk.Fido2.fido2_enroll(
+ self._disk_encryption.hsm_device,
+ vol.safe_dev_path,
self._disk_encryption.encryption_password
)
@@ -393,7 +512,7 @@ class Installer:
for entry in self._fstab_entries:
fp.write(f'{entry}\n')
- def set_hostname(self, hostname: str, *args: str, **kwargs: str) -> None:
+ def set_hostname(self, hostname: str):
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
@@ -444,7 +563,7 @@ class Installer:
(self.target / 'etc/locale.conf').write_text(f'LANG={lang_value}\n')
return True
- def set_timezone(self, zone: str, *args: str, **kwargs: str) -> bool:
+ def set_timezone(self, zone: str) -> bool:
if not zone:
return True
if not len(zone):
@@ -532,7 +651,7 @@ class Installer:
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
- self.base_packages.append('iwd')
+ self._base_packages.append('iwd')
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
@@ -608,51 +727,98 @@ class Installer:
return vendor.get_ucode()
return None
- def minimal_installation(
- self,
- testing: bool = False,
- multilib: bool = False,
- mkinitcpio: bool = True,
- hostname: str = 'archinstall',
- locale_config: LocaleConfiguration = LocaleConfiguration.default()
- ):
- _disable_fstrim = False
+ def _handle_partition_installation(self):
+ pvs = []
+ if self._disk_config.lvm_config:
+ pvs = self._disk_config.lvm_config.get_all_pvs()
+
for mod in self._disk_config.device_modifications:
for part in mod.partitions:
- if part.fs_type is not None:
- if (pkg := part.fs_type.installation_pkg) is not None:
- self.base_packages.append(pkg)
- if (module := part.fs_type.installation_module) is not None:
+ if part in pvs or part.fs_type is None:
+ continue
+
+ if (pkg := part.fs_type.installation_pkg) is not None:
+ self._base_packages.append(pkg)
+ if (module := part.fs_type.installation_module) is not None:
+ self._modules.append(module)
+ if (binary := part.fs_type.installation_binary) is not None:
+ self._binaries.append(binary)
+
+ # https://github.com/archlinux/archinstall/issues/1837
+ if part.fs_type.fs_type_mount == 'btrfs':
+ self._disable_fstrim = True
+
+ # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
+ if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
+ if 'fsck' in self._hooks:
+ self._hooks.remove('fsck')
+
+ if part in self._disk_encryption.partitions:
+ if self._disk_encryption.hsm_device:
+ # Required by mkinitcpio to add support for fido2-device options
+ self.pacman.strap('libfido2')
+
+ if 'sd-encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
+ else:
+ if 'encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
+
+ def _handle_lvm_installation(self):
+ if not self._disk_config.lvm_config:
+ return
+
+ self.add_additional_packages('lvm2')
+ self._hooks.insert(self._hooks.index('filesystems') - 1, 'lvm2')
+
+ for vg in self._disk_config.lvm_config.vol_groups:
+ for vol in vg.volumes:
+ if vol.fs_type is not None:
+ if (pkg := vol.fs_type.installation_pkg) is not None:
+ self._base_packages.append(pkg)
+ if (module := vol.fs_type.installation_module) is not None:
self._modules.append(module)
- if (binary := part.fs_type.installation_binary) is not None:
+ if (binary := vol.fs_type.installation_binary) is not None:
self._binaries.append(binary)
- # https://github.com/archlinux/archinstall/issues/1837
- if part.fs_type.fs_type_mount == 'btrfs':
- _disable_fstrim = True
+ if vol.fs_type.fs_type_mount == 'btrfs':
+ self._disable_fstrim = True
# There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
- if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
+ if vol.fs_type.fs_type_mount == 'ntfs3' and vol.mountpoint == self.target:
if 'fsck' in self._hooks:
self._hooks.remove('fsck')
- if part in self._disk_encryption.partitions:
- if self._disk_encryption.hsm_device:
- # Required by mkinitcpio to add support for fido2-device options
- self.pacman.strap('libfido2')
+ if self._disk_encryption.encryption_type in [disk.EncryptionType.LvmOnLuks, disk.EncryptionType.LuksOnLvm]:
+ if self._disk_encryption.hsm_device:
+ # Required by mkinitcpio to add support for fido2-device options
+ self.pacman.strap('libfido2')
+
+ if 'sd-encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('lvm2') - 1, 'sd-encrypt')
+ else:
+ if 'encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('lvm2') - 1, 'encrypt')
- if 'sd-encrypt' not in self._hooks:
- self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
- else:
- if 'encrypt' not in self._hooks:
- self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
+ def minimal_installation(
+ self,
+ testing: bool = False,
+ multilib: bool = False,
+ mkinitcpio: bool = True,
+ hostname: str = 'archinstall',
+ locale_config: LocaleConfiguration = LocaleConfiguration.default()
+ ):
+ if self._disk_config.lvm_config:
+ self._handle_lvm_installation()
+ else:
+ self._handle_partition_installation()
if not SysInfo.has_uefi():
- self.base_packages.append('grub')
+ self._base_packages.append('grub')
if ucode := self._get_microcode():
(self.target / 'boot' / ucode).unlink(missing_ok=True)
- self.base_packages.append(ucode.stem)
+ self._base_packages.append(ucode.stem)
else:
debug('Archinstall will not install any ucode.')
@@ -673,7 +839,7 @@ class Installer:
pacman_conf.apply()
- self.pacman.strap(self.base_packages)
+ self.pacman.strap(self._base_packages)
self.helper_flags['base-strapped'] = True
pacman_conf.persist()
@@ -685,7 +851,7 @@ class Installer:
# https://github.com/archlinux/archinstall/issues/880
# https://github.com/archlinux/archinstall/issues/1837
# https://github.com/archlinux/archinstall/issues/1841
- if not _disable_fstrim:
+ if not self._disable_fstrim:
self.enable_periodic_trim()
# TODO: Support locale and timezone
@@ -742,13 +908,24 @@ class Installer:
return boot
return None
- def _get_root_partition(self) -> Optional[disk.PartitionModification]:
- for mod in self._disk_config.device_modifications:
- if root := mod.get_root_partition():
- return root
+ def _get_root(self) -> Optional[disk.PartitionModification | disk.LvmVolume]:
+ if self._disk_config.lvm_config:
+ return self._disk_config.lvm_config.get_root_volume()
+ else:
+ for mod in self._disk_config.device_modifications:
+ if root := mod.get_root_partition():
+ return root
return None
- def _get_kernel_params(
+ def _get_luks_uuid_from_mapper_dev(self, mapper_dev_path: Path) -> str:
+ lsblk_info = disk.get_lsblk_info(mapper_dev_path, reverse=True, full_dev_path=True)
+
+ if not lsblk_info.children or not lsblk_info.children[0].uuid:
+ raise ValueError('Unable to determine UUID of luks superblock')
+
+ return lsblk_info.children[0].uuid
+
+ def _get_kernel_params_partition(
self,
root_partition: disk.PartitionModification,
id_root: bool = True,
@@ -784,20 +961,74 @@ class Installer:
debug(f'Identifying root partition by UUID: {root_partition.uuid}')
kernel_parameters.append(f'root=UUID={root_partition.uuid}')
+ return kernel_parameters
+
+ def _get_kernel_params_lvm(
+ self,
+ lvm: disk.LvmVolume
+ ) -> List[str]:
+ kernel_parameters = []
+
+ match self._disk_encryption.encryption_type:
+ case disk.EncryptionType.LvmOnLuks:
+ if not lvm.vg_name:
+ raise ValueError(f'Unable to determine VG name for {lvm.name}')
+
+ pv_seg_info = disk.device_handler.lvm_pvseg_info(lvm.vg_name, lvm.name)
+
+ if not pv_seg_info:
+ raise ValueError(f'Unable to determine PV segment info for {lvm.vg_name}/{lvm.name}')
+
+ uuid = self._get_luks_uuid_from_mapper_dev(pv_seg_info.pv_name)
+
+ if self._disk_encryption.hsm_device:
+ debug(f'LvmOnLuks, encrypted root partition, HSM, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'rd.luks.name={uuid}=cryptlvm root={lvm.safe_dev_path}')
+ else:
+ debug(f'LvmOnLuks, encrypted root partition, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'cryptdevice=UUID={uuid}:cryptlvm root={lvm.safe_dev_path}')
+ case disk.EncryptionType.LuksOnLvm:
+ uuid = self._get_luks_uuid_from_mapper_dev(lvm.mapper_path)
+
+ if self._disk_encryption.hsm_device:
+ debug(f'LuksOnLvm, encrypted root partition, HSM, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'rd.luks.name={uuid}=root root=/dev/mapper/root')
+ else:
+ debug(f'LuksOnLvm, encrypted root partition, identifying by UUID: {uuid}')
+ kernel_parameters.append(f'cryptdevice=UUID={uuid}:root root=/dev/mapper/root')
+ case disk.EncryptionType.NoEncryption:
+ debug(f'Identifying root lvm by mapper device: {lvm.dev_path}')
+ kernel_parameters.append(f'root={lvm.safe_dev_path}')
+
+ return kernel_parameters
+
+ def _get_kernel_params(
+ self,
+ root: disk.PartitionModification | disk.LvmVolume,
+ id_root: bool = True,
+ partuuid: bool = True
+ ) -> List[str]:
+ kernel_parameters = []
+
+ if isinstance(root, disk.LvmVolume):
+ kernel_parameters = self._get_kernel_params_lvm(root)
+ else:
+ kernel_parameters = self._get_kernel_params_partition(root, id_root, partuuid)
+
# Zswap should be disabled when using zram.
# https://github.com/archlinux/archinstall/issues/881
if self._zram_enabled:
kernel_parameters.append('zswap.enabled=0')
if id_root:
- for sub_vol in root_partition.btrfs_subvols:
+ for sub_vol in root.btrfs_subvols:
if sub_vol.is_root():
kernel_parameters.append(f'rootflags=subvol={sub_vol.name}')
break
kernel_parameters.append('rw')
- kernel_parameters.append(f'rootfstype={root_partition.safe_fs_type.fs_type_mount}')
+ kernel_parameters.append(f'rootfstype={root.safe_fs_type.fs_type_mount}')
kernel_parameters.extend(self._kernel_params)
debug(f'kernel parameters: {" ".join(kernel_parameters)}')
@@ -807,10 +1038,12 @@ class Installer:
def _add_systemd_bootloader(
self,
boot_partition: disk.PartitionModification,
- root_partition: disk.PartitionModification,
+ root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification],
uki_enabled: bool = False
):
+ debug('Installing systemd bootloader')
+
self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi():
@@ -882,7 +1115,7 @@ class Installer:
f'# Created on: {self.init_time}'
)
- options = 'options ' + ' '.join(self._get_kernel_params(root_partition))
+ options = 'options ' + ' '.join(self._get_kernel_params(root))
for kernel in self.kernels:
for variant in ("", "-fallback"):
@@ -904,15 +1137,17 @@ class Installer:
def _add_grub_bootloader(
self,
boot_partition: disk.PartitionModification,
- root_partition: disk.PartitionModification,
+ root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification]
):
+ debug('Installing grub bootloader')
+
self.pacman.strap('grub') # no need?
grub_default = self.target / 'etc/default/grub'
config = grub_default.read_text()
- kernel_parameters = ' '.join(self._get_kernel_params(root_partition, False, False))
+ kernel_parameters = ' '.join(self._get_kernel_params(root, False, False))
config = re.sub(r'(GRUB_CMDLINE_LINUX=")("\n)', rf'\1{kernel_parameters}\2', config, 1)
grub_default.write_text(config)
@@ -934,7 +1169,7 @@ class Installer:
info(f"GRUB EFI partition: {efi_partition.dev_path}")
- self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
+ self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
boot_dir_arg = []
if boot_partition.mountpoint and boot_partition.mountpoint != boot_dir:
@@ -988,8 +1223,10 @@ class Installer:
self,
boot_partition: disk.PartitionModification,
efi_partition: Optional[disk.PartitionModification],
- root_partition: disk.PartitionModification
+ root: disk.PartitionModification | disk.LvmVolume
):
+ debug('Installing limine bootloader')
+
self.pacman.strap('limine')
info(f"Limine boot partition: {boot_partition.dev_path}")
@@ -1052,7 +1289,7 @@ Exec = /bin/sh -c "{hook_command}"
hook_path = hooks_dir / '99-limine.hook'
hook_path.write_text(hook_contents)
- kernel_params = ' '.join(self._get_kernel_params(root_partition))
+ kernel_params = ' '.join(self._get_kernel_params(root))
config_contents = 'TIMEOUT=5\n'
for kernel in self.kernels:
@@ -1075,9 +1312,11 @@ Exec = /bin/sh -c "{hook_command}"
def _add_efistub_bootloader(
self,
boot_partition: disk.PartitionModification,
- root_partition: disk.PartitionModification,
+ root: disk.PartitionModification | disk.LvmVolume,
uki_enabled: bool = False
):
+ debug('Installing efistub bootloader')
+
self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi():
@@ -1092,7 +1331,7 @@ Exec = /bin/sh -c "{hook_command}"
entries = (
'initrd=/initramfs-{kernel}.img',
- *self._get_kernel_params(root_partition)
+ *self._get_kernel_params(root)
)
cmdline = [' '.join(entries)]
@@ -1122,7 +1361,7 @@ Exec = /bin/sh -c "{hook_command}"
def _config_uki(
self,
- root_partition: disk.PartitionModification,
+ root: disk.PartitionModification | disk.LvmVolume,
efi_partition: Optional[disk.PartitionModification]
):
if not efi_partition or not efi_partition.mountpoint:
@@ -1130,7 +1369,7 @@ Exec = /bin/sh -c "{hook_command}"
# Set up kernel command line
with open(self.target / 'etc/kernel/cmdline', 'w') as cmdline:
- kernel_parameters = self._get_kernel_params(root_partition)
+ kernel_parameters = self._get_kernel_params(root)
cmdline.write(' '.join(kernel_parameters) + '\n')
diff_mountpoint = None
@@ -1191,37 +1430,33 @@ Exec = /bin/sh -c "{hook_command}"
efi_partition = self._get_efi_partition()
boot_partition = self._get_boot_partition()
- root_partition = self._get_root_partition()
+ root = self._get_root()
if boot_partition is None:
raise ValueError(f'Could not detect boot at mountpoint {self.target}')
- if root_partition is None:
+ if root is None:
raise ValueError(f'Could not detect root at mountpoint {self.target}')
info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}')
if uki_enabled:
- self._config_uki(root_partition, efi_partition)
+ self._config_uki(root, efi_partition)
match bootloader:
case Bootloader.Systemd:
- self._add_systemd_bootloader(boot_partition, root_partition, efi_partition, uki_enabled)
+ self._add_systemd_bootloader(boot_partition, root, efi_partition, uki_enabled)
case Bootloader.Grub:
- self._add_grub_bootloader(boot_partition, root_partition, efi_partition)
+ self._add_grub_bootloader(boot_partition, root, efi_partition)
case Bootloader.Efistub:
- self._add_efistub_bootloader(boot_partition, root_partition, uki_enabled)
+ self._add_efistub_bootloader(boot_partition, root, uki_enabled)
case Bootloader.Limine:
- self._add_limine_bootloader(boot_partition, efi_partition, root_partition)
+ self._add_limine_bootloader(boot_partition, efi_partition, root)
def add_additional_packages(self, packages: Union[str, List[str]]) -> bool:
return self.pacman.strap(packages)
- def _enable_users(self, service: str, users: List[User]):
- for user in users:
- self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username)
-
- def enable_sudo(self, entity: str, group :bool = False):
+ def enable_sudo(self, entity: str, group: bool = False):
info(f'Enabling sudo permissions for {entity}')
sudoers_dir = f"{self.target}/etc/sudoers.d"
@@ -1237,7 +1472,7 @@ Exec = /bin/sh -c "{hook_command}"
# We count how many files are there already so we know which number to prefix the file with
num_of_rules_already = len(os.listdir(sudoers_dir))
- file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc
+ file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc
# Guarantees that entity str does not contain invalid characters for a linux file name:
# \ / : * ? " < > |
@@ -1293,7 +1528,7 @@ Exec = /bin/sh -c "{hook_command}"
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
- def user_set_pw(self, user :str, password :str) -> bool:
+ def user_set_pw(self, user: str, password: str) -> bool:
info(f'Setting password for {user}')
if user == 'root':
@@ -1310,7 +1545,7 @@ Exec = /bin/sh -c "{hook_command}"
except SysCallError:
return False
- def user_set_shell(self, user :str, shell :str) -> bool:
+ def user_set_shell(self, user: str, shell: str) -> bool:
info(f'Setting shell for {user} to {shell}')
try:
@@ -1319,7 +1554,7 @@ Exec = /bin/sh -c "{hook_command}"
except SysCallError:
return False
- def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
+ def chown(self, owner: str, path: str, options: List[str] = []) -> bool:
cleaned_path = path.replace('\'', '\\\'')
try:
SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'")
diff --git a/archinstall/lib/interactions/disk_conf.py b/archinstall/lib/interactions/disk_conf.py
index 9d0042d6..f80af9ca 100644
--- a/archinstall/lib/interactions/disk_conf.py
+++ b/archinstall/lib/interactions/disk_conf.py
@@ -58,7 +58,7 @@ def select_devices(preset: List[disk.BDevice] = []) -> List[disk.BDevice]:
case MenuSelectionType.Reset: return []
case MenuSelectionType.Skip: return preset
case MenuSelectionType.Selection:
- selected_device_info: List[disk._DeviceInfo] = choice.value # type: ignore
+ selected_device_info: List[disk._DeviceInfo] = choice.single_value
selected_devices = []
for device in devices:
@@ -73,7 +73,6 @@ def get_default_partition_layout(
filesystem_type: Optional[disk.FilesystemType] = None,
advanced_option: bool = False
) -> List[disk.DeviceModification]:
-
if len(devices) == 1:
device_modification = suggest_single_disk_layout(
devices[0],
@@ -133,7 +132,7 @@ def select_disk_config(
case MenuSelectionType.Reset: return None
case MenuSelectionType.Selection:
if choice.single_value == pre_mount_mode:
- output = "You will use whatever drive-setup is mounted at the specified directory\n"
+ output = 'You will use whatever drive-setup is mounted at the specified directory\n'
output += "WARNING: Archinstall won't check the suitability of this setup\n"
try:
@@ -151,7 +150,6 @@ def select_disk_config(
)
preset_devices = [mod.device for mod in preset.device_modifications] if preset else []
-
devices = select_devices(preset_devices)
if not devices:
@@ -177,6 +175,36 @@ def select_disk_config(
return None
+def select_lvm_config(
+ disk_config: disk.DiskLayoutConfiguration,
+ preset: Optional[disk.LvmConfiguration] = None,
+) -> Optional[disk.LvmConfiguration]:
+ default_mode = disk.LvmLayoutType.Default.display_msg()
+
+ options = [default_mode]
+
+ preset_value = preset.config_type.display_msg() if preset else None
+ warning = str(_('Are you sure you want to reset this setting?'))
+
+ choice = Menu(
+ _('Select a LVM option'),
+ options,
+ allow_reset=True,
+ allow_reset_warning_msg=warning,
+ sort=False,
+ preview_size=0.2,
+ preset_values=preset_value
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Skip: return preset
+ case MenuSelectionType.Reset: return None
+ case MenuSelectionType.Selection:
+ if choice.single_value == default_mode:
+ return suggest_lvm_layout(disk_config)
+ return preset
+
+
def _boot_partition(sector_size: disk.SectorSize, using_gpt: bool) -> disk.PartitionModification:
flags = [disk.PartitionFlag.Boot]
if using_gpt:
@@ -199,7 +227,7 @@ def _boot_partition(sector_size: disk.SectorSize, using_gpt: bool) -> disk.Parti
)
-def select_main_filesystem_format(advanced_options=False) -> disk.FilesystemType:
+def select_main_filesystem_format(advanced_options: bool = False) -> disk.FilesystemType:
options = {
'btrfs': disk.FilesystemType.Btrfs,
'ext4': disk.FilesystemType.Ext4,
@@ -250,7 +278,6 @@ def suggest_single_disk_layout(
prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?'))
choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
using_subvolumes = choice.value == Menu.yes()
-
mount_options = select_mount_options()
device_modification = disk.DeviceModification(device, wipe=True)
@@ -288,7 +315,11 @@ def suggest_single_disk_layout(
root_start = boot_partition.start + boot_partition.length
# Set a size for / (/root)
- if using_subvolumes or device_size_gib < min_size_to_allow_home_part or not using_home_partition:
+ if (
+ using_subvolumes
+ or device_size_gib < min_size_to_allow_home_part
+ or not using_home_partition
+ ):
root_length = device.device_info.total_size - root_start
else:
root_length = min(device.device_info.total_size, root_partition_size)
@@ -305,6 +336,7 @@ def suggest_single_disk_layout(
fs_type=filesystem_type,
mount_options=mount_options
)
+
device_modification.add_partition(root_partition)
if using_subvolumes:
@@ -388,9 +420,9 @@ def suggest_multi_disk_layout(
device_paths = ', '.join([str(d.device_info.path) for d in devices])
- debug(f"Suggesting multi-disk-layout for devices: {device_paths}")
- debug(f"/root: {root_device.device_info.path}")
- debug(f"/home: {home_device.device_info.path}")
+ debug(f'Suggesting multi-disk-layout for devices: {device_paths}')
+ debug(f'/root: {root_device.device_info.path}')
+ debug(f'/home: {home_device.device_info.path}')
root_device_modification = disk.DeviceModification(root_device, wipe=True)
home_device_modification = disk.DeviceModification(home_device, wipe=True)
@@ -444,3 +476,85 @@ def suggest_multi_disk_layout(
home_device_modification.add_partition(home_partition)
return [root_device_modification, home_device_modification]
+
+
+def suggest_lvm_layout(
+ disk_config: disk.DiskLayoutConfiguration,
+ filesystem_type: Optional[disk.FilesystemType] = None,
+ vg_grp_name: str = 'ArchinstallVg',
+) -> disk.LvmConfiguration:
+ if disk_config.config_type != disk.DiskLayoutType.Default:
+ raise ValueError('LVM suggested volumes are only available for default partitioning')
+
+ using_subvolumes = False
+ btrfs_subvols = []
+ home_volume = True
+ mount_options = []
+
+ if not filesystem_type:
+ filesystem_type = select_main_filesystem_format()
+
+ if filesystem_type == disk.FilesystemType.Btrfs:
+ prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ using_subvolumes = choice.value == Menu.yes()
+
+ mount_options = select_mount_options()
+
+ if using_subvolumes:
+ btrfs_subvols = [
+ disk.SubvolumeModification(Path('@'), Path('/')),
+ disk.SubvolumeModification(Path('@home'), Path('/home')),
+ disk.SubvolumeModification(Path('@log'), Path('/var/log')),
+ disk.SubvolumeModification(Path('@pkg'), Path('/var/cache/pacman/pkg')),
+ disk.SubvolumeModification(Path('@.snapshots'), Path('/.snapshots')),
+ ]
+
+ home_volume = False
+
+ boot_part: Optional[disk.PartitionModification] = None
+ other_part: List[disk.PartitionModification] = []
+
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.is_boot():
+ boot_part = part
+ else:
+ other_part.append(part)
+
+ if not boot_part:
+ raise ValueError('Unable to find boot partition in partition modifications')
+
+ total_vol_available = sum(
+ [p.length for p in other_part],
+ disk.Size(0, disk.Unit.B, disk.SectorSize.default()),
+ )
+ root_vol_size = disk.Size(20, disk.Unit.GiB, disk.SectorSize.default())
+ home_vol_size = total_vol_available - root_vol_size
+
+ lvm_vol_group = disk.LvmVolumeGroup(vg_grp_name, pvs=other_part, )
+
+ root_vol = disk.LvmVolume(
+ status=disk.LvmVolumeStatus.Create,
+ name='root',
+ fs_type=filesystem_type,
+ length=root_vol_size,
+ mountpoint=Path('/'),
+ btrfs_subvols=btrfs_subvols,
+ mount_options=mount_options
+ )
+
+ lvm_vol_group.volumes.append(root_vol)
+
+ if home_volume:
+ home_vol = disk.LvmVolume(
+ status=disk.LvmVolumeStatus.Create,
+ name='home',
+ fs_type=filesystem_type,
+ length=home_vol_size,
+ mountpoint=Path('/home'),
+ )
+
+ lvm_vol_group.volumes.append(home_vol)
+
+ return disk.LvmConfiguration(disk.LvmLayoutType.Default, [lvm_vol_group])
diff --git a/archinstall/lib/interactions/manage_users_conf.py b/archinstall/lib/interactions/manage_users_conf.py
index ca912283..886f85b6 100644
--- a/archinstall/lib/interactions/manage_users_conf.py
+++ b/archinstall/lib/interactions/manage_users_conf.py
@@ -1,12 +1,11 @@
from __future__ import annotations
import re
-from typing import Any, Dict, TYPE_CHECKING, List, Optional
+from typing import Any, TYPE_CHECKING, List, Optional
from .utils import get_password
from ..menu import Menu, ListManager
from ..models.users import User
-from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@@ -26,21 +25,6 @@ class UserList(ListManager):
]
super().__init__(prompt, lusers, [self._actions[0]], self._actions[1:])
- def reformat(self, data: List[User]) -> Dict[str, Any]:
- table = FormattedOutput.as_table(data)
- rows = table.split('\n')
-
- # these are the header rows of the table and do not map to any User obviously
- # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
- # the selectable rows so the header has to be aligned
- display_data: Dict[str, Optional[User]] = {f' {rows[0]}': None, f' {rows[1]}': None}
-
- for row, user in zip(rows[2:], data):
- row = row.replace('|', '\\|')
- display_data[row] = user
-
- return display_data
-
def selected_action_display(self, user: User) -> str:
return user.username
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index c917420e..50e15cee 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -60,7 +60,7 @@ class Luks2:
iter_time: int = 10000,
key_file: Optional[Path] = None
) -> Path:
- info(f'Luks2 encrypting: {self.luks_dev_path}')
+ debug(f'Luks2 encrypting: {self.luks_dev_path}')
byte_password = self._password_bytes()
@@ -87,12 +87,15 @@ class Luks2:
'luksFormat', str(self.luks_dev_path),
])
+ debug(f'cryptsetup format: {cryptsetup_args}')
+
# Retry formatting the volume because archinstall can some times be too quick
# which generates a "Device /dev/sdX does not exist or access denied." between
# setting up partitions and us trying to encrypt it.
for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS'] + 1):
try:
- SysCommand(cryptsetup_args)
+ result = SysCommand(cryptsetup_args).decode()
+ debug(f'cryptsetup luksFormat output: {result}')
break
except SysCallError as err:
time.sleep(storage['DISK_TIMEOUTS'])
@@ -106,10 +109,13 @@ class Luks2:
self.lock()
# Then try again to set up the crypt-device
- SysCommand(cryptsetup_args)
+ result = SysCommand(cryptsetup_args).decode()
+ debug(f'cryptsetup luksFormat output: {result}')
else:
raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}')
+ self.key_file = key_file
+
return key_file
def _get_luks_uuid(self) -> str:
@@ -152,7 +158,15 @@ class Luks2:
while Path(self.luks_dev_path).exists() is False and time.time() - wait_timer < 10:
time.sleep(0.025)
- SysCommand(f'/usr/bin/cryptsetup open {self.luks_dev_path} {self.mapper_name} --key-file {key_file} --type luks2')
+ result = SysCommand(
+ '/usr/bin/cryptsetup open '
+ f'{self.luks_dev_path} '
+ f'{self.mapper_name} '
+ f'--key-file {key_file} '
+ f'--type luks2'
+ ).decode()
+
+ debug(f'cryptsetup open output: {result}')
if not self.mapper_dev or not self.mapper_dev.is_symlink():
raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')
@@ -199,8 +213,8 @@ class Luks2:
key_file.parent.mkdir(parents=True, exist_ok=True)
- with open(key_file, "w") as keyfile:
- keyfile.write(generate_password(length=512))
+ pwd = generate_password(length=512)
+ key_file.write_text(pwd)
key_file.chmod(0o400)
@@ -208,7 +222,7 @@ class Luks2:
self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"])
def _add_key(self, key_file: Path):
- info(f'Adding additional key-file {key_file}')
+ debug(f'Adding additional key-file {key_file}')
command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}'
worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'})
@@ -228,7 +242,7 @@ class Luks2:
key_file: Path,
options: List[str]
) -> None:
- info(f'Adding crypttab entry for key {key_file}')
+ debug(f'Adding crypttab entry for key {key_file}')
with open(crypttab_path, 'a') as crypttab:
opt = ','.join(options)
diff --git a/archinstall/lib/menu/abstract_menu.py b/archinstall/lib/menu/abstract_menu.py
index 14db98ca..ee55f5c9 100644
--- a/archinstall/lib/menu/abstract_menu.py
+++ b/archinstall/lib/menu/abstract_menu.py
@@ -10,6 +10,7 @@ from ..translationhandler import TranslationHandler, Language
if TYPE_CHECKING:
_: Any
+
class Selector:
def __init__(
self,
@@ -68,42 +69,19 @@ class Selector:
:param no_store: A boolean which determines that the field should or shouldn't be stored in the data storage
:type no_store: bool
"""
- self._description = description
- self.func = func
self._display_func = display_func
- self._current_selection = default
+ self._no_store = no_store
+
+ self.description = description
+ self.func = func
+ self.current_selection = default
self.enabled = enabled
- self._dependencies = dependencies
- self._dependencies_not = dependencies_not
+ self.dependencies = dependencies
+ self.dependencies_not = dependencies_not
self.exec_func = exec_func
- self._preview_func = preview_func
+ self.preview_func = preview_func
self.mandatory = mandatory
- self._no_store = no_store
- self._default = default
-
- @property
- def default(self) -> Any:
- return self._default
-
- @property
- def description(self) -> str:
- return self._description
-
- @property
- def dependencies(self) -> List:
- return self._dependencies
-
- @property
- def dependencies_not(self) -> List:
- return self._dependencies_not
-
- @property
- def current_selection(self) -> Optional[Any]:
- return self._current_selection
-
- @property
- def preview_func(self):
- return self._preview_func
+ self.default = default
def do_store(self) -> bool:
return self._no_store is False
@@ -112,45 +90,45 @@ class Selector:
self.enabled = status
def update_description(self, description: str):
- self._description = description
+ self.description = description
def menu_text(self, padding: int = 0) -> str:
- if self._description == '': # special menu option for __separator__
+ if self.description == '': # special menu option for __separator__
return ''
current = ''
if self._display_func:
- current = self._display_func(self._current_selection)
+ current = self._display_func(self.current_selection)
else:
- if self._current_selection is not None:
- current = str(self._current_selection)
+ if self.current_selection is not None:
+ current = str(self.current_selection)
if current:
padding += 5
- description = unicode_ljust(str(self._description), padding, ' ')
+ description = unicode_ljust(str(self.description), padding, ' ')
current = current
else:
- description = self._description
+ description = self.description
current = ''
return f'{description} {current}'
def set_current_selection(self, current: Optional[Any]):
- self._current_selection = current
+ self.current_selection = current
def has_selection(self) -> bool:
- if not self._current_selection:
+ if not self.current_selection:
return False
return True
def get_selection(self) -> Any:
- return self._current_selection
+ return self.current_selection
def is_empty(self) -> bool:
- if self._current_selection is None:
+ if self.current_selection is None:
return True
- elif isinstance(self._current_selection, (str, list, dict)) and len(self._current_selection) == 0:
+ elif isinstance(self.current_selection, (str, list, dict)) and len(self.current_selection) == 0:
return True
return False
@@ -197,6 +175,8 @@ class AbstractMenu:
self._sync_all()
self._populate_default_values()
+ self.defined_text = str(_('Defined'))
+
@property
def last_choice(self):
return self._last_choice
@@ -382,9 +362,10 @@ class AbstractMenu:
result = None
if selector.func is not None:
- presel_val = self.option(config_name).get_selection()
- result = selector.func(presel_val)
+ cur_value = self.option(config_name).get_selection()
+ result = selector.func(cur_value)
self._menu_options[config_name].set_current_selection(result)
+
if selector.do_store():
self._data_store[config_name] = result
@@ -398,19 +379,23 @@ class AbstractMenu:
return True
def _verify_selection_enabled(self, selection_name: str) -> bool:
- """ general """
if selection := self._menu_options.get(selection_name, None):
if not selection.enabled:
return False
if len(selection.dependencies) > 0:
- for d in selection.dependencies:
- if not self._verify_selection_enabled(d) or self._menu_options[d].is_empty():
- return False
+ for dep in selection.dependencies:
+ if isinstance(dep, str):
+ if not self._verify_selection_enabled(dep) or self._menu_options[dep].is_empty():
+ return False
+ elif callable(dep): # callable dependency eval
+ return dep()
+ else:
+ raise ValueError(f'Unsupported dependency: {selection_name}')
if len(selection.dependencies_not) > 0:
- for d in selection.dependencies_not:
- if not self._menu_options[d].is_empty():
+ for dep in selection.dependencies_not:
+ if not self._menu_options[dep].is_empty():
return False
return True
@@ -454,8 +439,8 @@ class AbstractMenu:
class AbstractSubMenu(AbstractMenu):
- def __init__(self, data_store: Dict[str, Any] = {}):
- super().__init__(data_store=data_store)
+ def __init__(self, data_store: Dict[str, Any] = {}, preview_size: float = 0.2):
+ super().__init__(data_store=data_store, preview_size=preview_size)
self._menu_options['__separator__'] = Selector('')
self._menu_options['back'] = \
diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py
index 54fb6a1b..de18791c 100644
--- a/archinstall/lib/menu/list_manager.py
+++ b/archinstall/lib/menu/list_manager.py
@@ -3,6 +3,7 @@ from os import system
from typing import Any, TYPE_CHECKING, Dict, Optional, Tuple, List
from .menu import Menu
+from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@@ -127,18 +128,29 @@ class ListManager:
if choice.value and choice.value != self._cancel_action:
self._data = self.handle_action(choice.value, entry, self._data)
- def selected_action_display(self, selection: Any) -> str:
+ def reformat(self, data: List[Any]) -> Dict[str, Optional[Any]]:
"""
- this will return the value to be displayed in the
- "Select an action for '{}'" string
+ Default implementation of the table to be displayed.
+ Override if any custom formatting is needed
"""
- raise NotImplementedError('Please implement me in the child class')
+ table = FormattedOutput.as_table(data)
+ rows = table.split('\n')
- def reformat(self, data: List[Any]) -> Dict[str, Optional[Any]]:
+ # these are the header rows of the table and do not map to any User obviously
+ # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
+ # the selectable rows so the header has to be aligned
+ display_data: Dict[str, Optional[Any]] = {f' {rows[0]}': None, f' {rows[1]}': None}
+
+ for row, entry in zip(rows[2:], data):
+ row = row.replace('|', '\\|')
+ display_data[row] = entry
+
+ return display_data
+
+ def selected_action_display(self, selection: Any) -> str:
"""
- this should return a dictionary of display string to actual data entry
- mapping; if the value for a given display string is None it will be used
- in the header value (useful when displaying tables)
+ this will return the value to be displayed in the
+ "Select an action for '{}'" string
"""
raise NotImplementedError('Please implement me in the child class')
diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py
index f14b855d..38301d3a 100644
--- a/archinstall/lib/menu/menu.py
+++ b/archinstall/lib/menu/menu.py
@@ -66,7 +66,7 @@ class Menu(TerminalMenu):
sort: bool = True,
preset_values: Optional[Union[str, List[str]]] = None,
cursor_index: Optional[int] = None,
- preview_command: Optional[Callable] = None,
+ preview_command: Optional[Callable[[Any], str | None]] = None,
preview_size: float = 0.0,
preview_title: str = 'Info',
header: Union[List[str], str] = [],
@@ -228,7 +228,11 @@ class Menu(TerminalMenu):
default_str = str(_('(default)'))
return f'{self._default_option} {default_str}'
- def _show_preview(self, preview_command: Optional[Callable], selection: str) -> Optional[str]:
+ def _show_preview(
+ self,
+ preview_command: Optional[Callable[[Any], str | None]],
+ selection: str
+ ) -> Optional[str]:
if selection == self.back():
return None
diff --git a/archinstall/lib/menu/table_selection_menu.py b/archinstall/lib/menu/table_selection_menu.py
index 4cff7216..fec6ae59 100644
--- a/archinstall/lib/menu/table_selection_menu.py
+++ b/archinstall/lib/menu/table_selection_menu.py
@@ -19,6 +19,7 @@ class TableMenu(Menu):
preview_size: float = 0.0,
allow_reset: bool = True,
allow_reset_warning_msg: Optional[str] = None,
+ skip: bool = True
):
"""
param title: Text that will be displayed above the menu
@@ -81,7 +82,8 @@ class TableMenu(Menu):
preview_title=preview_title,
extra_bottom_space=extra_bottom_space,
allow_reset=allow_reset,
- allow_reset_warning_msg=allow_reset_warning_msg
+ allow_reset_warning_msg=allow_reset_warning_msg,
+ skip=skip
)
def _preset_values(self, preset: List[Any]) -> List[str]:
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index 61f3c568..c9094669 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -121,21 +121,6 @@ class CustomMirrorList(ListManager):
]
super().__init__(prompt, custom_mirrors, [self._actions[0]], self._actions[1:])
- def reformat(self, data: List[CustomMirror]) -> Dict[str, Any]:
- table = FormattedOutput.as_table(data)
- rows = table.split('\n')
-
- # these are the header rows of the table and do not map to any User obviously
- # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
- # the selectable rows so the header has to be aligned
- display_data: Dict[str, Optional[CustomMirror]] = {f' {rows[0]}': None, f' {rows[1]}': None}
-
- for row, user in zip(rows[2:], data):
- row = row.replace('|', '\\|')
- display_data[row] = user
-
- return display_data
-
def selected_action_display(self, mirror: CustomMirror) -> str:
return mirror.name