Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2022-05-28 10:36:38 +0200
committerAndreas Baumann <mail@andreasbaumann.cc>2022-05-28 10:36:38 +0200
commitfaf925de1882be722d2994d697a802918282e509 (patch)
tree4856c76b10b36e94875ce3c9add961960bb23bf0 /archinstall/lib
parent3801bee921d22e23435c781c469d9ec0adfa00bd (diff)
parent78449f75bc44f0e2b03cb9d909b9b78e4f7ca4c8 (diff)
Merge branch 'upstreamMaster'
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/configuration.py29
-rw-r--r--archinstall/lib/disk/__init__.py2
-rw-r--r--archinstall/lib/disk/blockdevice.py104
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py (renamed from archinstall/lib/disk/btrfs.py)92
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py132
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py116
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolume.py191
-rw-r--r--archinstall/lib/disk/filesystem.py101
-rw-r--r--archinstall/lib/disk/helpers.py31
-rw-r--r--archinstall/lib/disk/mapperdev.py13
-rw-r--r--archinstall/lib/disk/partition.py167
-rw-r--r--archinstall/lib/disk/user_guides.py36
-rw-r--r--archinstall/lib/exceptions.py4
-rw-r--r--archinstall/lib/general.py66
-rw-r--r--archinstall/lib/hardware.py2
-rw-r--r--archinstall/lib/hsm/__init__.py4
-rw-r--r--archinstall/lib/hsm/fido.py57
-rw-r--r--archinstall/lib/installer.py204
-rw-r--r--archinstall/lib/luks.py16
-rw-r--r--archinstall/lib/menu/global_menu.py264
-rw-r--r--archinstall/lib/menu/list_manager.py97
-rw-r--r--archinstall/lib/menu/menu.py159
-rw-r--r--archinstall/lib/menu/selection_menu.py121
-rw-r--r--archinstall/lib/menu/simple_menu.py15
-rw-r--r--archinstall/lib/models/network_configuration.py5
-rw-r--r--archinstall/lib/models/users.py77
-rw-r--r--archinstall/lib/output.py44
-rw-r--r--archinstall/lib/plugins.py2
-rw-r--r--archinstall/lib/profiles.py8
-rw-r--r--archinstall/lib/storage.py2
-rw-r--r--archinstall/lib/systemd.py13
-rw-r--r--archinstall/lib/translation.py25
-rw-r--r--archinstall/lib/udev/__init__.py1
-rw-r--r--archinstall/lib/udev/udevadm.py17
-rw-r--r--archinstall/lib/user_interaction/__init__.py2
-rw-r--r--archinstall/lib/user_interaction/disk_conf.py51
-rw-r--r--archinstall/lib/user_interaction/general_conf.py138
-rw-r--r--archinstall/lib/user_interaction/locale_conf.py35
-rw-r--r--archinstall/lib/user_interaction/manage_users_conf.py177
-rw-r--r--archinstall/lib/user_interaction/network_conf.py51
-rw-r--r--archinstall/lib/user_interaction/partitioning_conf.py171
-rw-r--r--archinstall/lib/user_interaction/save_conf.py27
-rw-r--r--archinstall/lib/user_interaction/subvolume_config.py36
-rw-r--r--archinstall/lib/user_interaction/system_conf.py126
-rw-r--r--archinstall/lib/user_interaction/utils.py16
45 files changed, 2160 insertions, 887 deletions
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py
index c971768f..510f7103 100644
--- a/archinstall/lib/configuration.py
+++ b/archinstall/lib/configuration.py
@@ -1,12 +1,23 @@
import json
import logging
-from pathlib import Path
+import pathlib
from typing import Optional, Dict
from .storage import storage
from .general import JSON, UNSAFE_JSON
from .output import log
-
+from .exceptions import RequirementError
+from .hsm import get_fido2_devices
+
+def configuration_sanity_check():
+ if storage['arguments'].get('HSM'):
+ if not get_fido2_devices():
+ raise RequirementError(
+ f"In order to use HSM to pair with the disk encryption,"
+ + f" one needs to be accessible through /dev/hidraw* and support"
+ + f" the FIDO2 protocol. You can check this by running"
+ + f" 'systemd-cryptenroll --fido2-device=list'."
+ )
class ConfigurationOutput:
def __init__(self, config: Dict):
@@ -21,12 +32,12 @@ class ConfigurationOutput:
self._user_credentials = {}
self._disk_layout = None
self._user_config = {}
- self._default_save_path = Path(storage.get('LOG_PATH', '.'))
+ self._default_save_path = pathlib.Path(storage.get('LOG_PATH', '.'))
self._user_config_file = 'user_configuration.json'
self._user_creds_file = "user_credentials.json"
self._disk_layout_file = "user_disk_layout.json"
- self._sensitive = ['!users', '!superusers', '!encryption-password']
+ self._sensitive = ['!users', '!encryption-password']
self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run']
self._process_config()
@@ -84,7 +95,7 @@ class ConfigurationOutput:
print()
- def _is_valid_path(self, dest_path :Path) -> bool:
+ def _is_valid_path(self, dest_path :pathlib.Path) -> bool:
if (not dest_path.exists()) or not (dest_path.is_dir()):
log(
'Destination directory {} does not exist or is not a directory,\n Configuration files can not be saved'.format(dest_path.resolve()),
@@ -93,26 +104,26 @@ class ConfigurationOutput:
return False
return True
- def save_user_config(self, dest_path :Path = None):
+ def save_user_config(self, dest_path :pathlib.Path = None):
if self._is_valid_path(dest_path):
with open(dest_path / self._user_config_file, 'w') as config_file:
config_file.write(self.user_config_to_json())
- def save_user_creds(self, dest_path :Path = None):
+ def save_user_creds(self, dest_path :pathlib.Path = None):
if self._is_valid_path(dest_path):
if user_creds := self.user_credentials_to_json():
target = dest_path / self._user_creds_file
with open(target, 'w') as config_file:
config_file.write(user_creds)
- def save_disk_layout(self, dest_path :Path = None):
+ def save_disk_layout(self, dest_path :pathlib.Path = None):
if self._is_valid_path(dest_path):
if disk_layout := self.disk_layout_to_json():
target = dest_path / self._disk_layout_file
with target.open('w') as config_file:
config_file.write(disk_layout)
- def save(self, dest_path :Path = None):
+ def save(self, dest_path :pathlib.Path = None):
if not dest_path:
dest_path = self._default_save_path
diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py
index bb6eb815..352d04b9 100644
--- a/archinstall/lib/disk/__init__.py
+++ b/archinstall/lib/disk/__init__.py
@@ -4,4 +4,4 @@ from .blockdevice import BlockDevice
from .filesystem import Filesystem, MBR, GPT
from .partition import *
from .user_guides import *
-from .validators import *
+from .validators import * \ No newline at end of file
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index 206c3b7e..c7b69205 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -3,6 +3,7 @@ import os
import json
import logging
import time
+from functools import cached_property
from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -32,7 +33,29 @@ class BlockDevice:
# I'm placing the encryption password on a BlockDevice level.
def __repr__(self, *args :str, **kwargs :str) -> str:
- return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
+ return self._str_repr
+
+ @cached_property
+ def _str_repr(self) -> str:
+ return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})"
+
+ @cached_property
+ def display_info(self) -> str:
+ columns = {
+ str(_('Device')): self.device_or_backfile,
+ str(_('Size')): f'{self._safe_size}GB',
+ str(_('Free space')): f'{self._safe_free_space}',
+ str(_('Bus-type')): f'{self.bus_type}'
+ }
+
+ padding = max([len(k) for k in columns.keys()])
+
+ pretty = ''
+ for k, v in columns.items():
+ k = k.ljust(padding, ' ')
+ pretty += f'{k} = {v}\n'
+
+ return pretty.rstrip()
def __iter__(self) -> Iterator[Partition]:
for partition in self.partitions:
@@ -74,7 +97,7 @@ class BlockDevice:
for device in output['blockdevices']:
return device['pttype']
- @property
+ @cached_property
def device_or_backfile(self) -> str:
"""
Returns the actual device-endpoint of the BlockDevice.
@@ -157,7 +180,7 @@ class BlockDevice:
from .filesystem import GPT
return GPT
- @property
+ @cached_property
def uuid(self) -> str:
log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
@@ -167,7 +190,19 @@ class BlockDevice:
"""
return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
- @property
+ @cached_property
+ def _safe_size(self) -> float:
+ from .helpers import convert_size_to_gb
+
+ try:
+ output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
+ except SysCallError:
+ return -1.0
+
+ for device in output['blockdevices']:
+ return convert_size_to_gb(device['size'])
+
+ @cached_property
def size(self) -> float:
from .helpers import convert_size_to_gb
@@ -176,22 +211,29 @@ class BlockDevice:
for device in output['blockdevices']:
return convert_size_to_gb(device['size'])
- @property
+ @cached_property
def bus_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['tran']
- @property
+ @cached_property
def spinning(self) -> bool:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['rota'] is True
- @property
- def free_space(self) -> Tuple[str, str, str]:
+ @cached_property
+ def _safe_free_space(self) -> Tuple[str, ...]:
+ try:
+ return '+'.join(part[2] for part in self.free_space)
+ except SysCallError:
+ return '?'
+
+ @cached_property
+ def free_space(self) -> Tuple[str, ...]:
# NOTE: parted -s will default to `cancel` on prompt, skipping any partition
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
@@ -204,7 +246,7 @@ class BlockDevice:
except SysCallError as error:
log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
- @property
+ @cached_property
def largest_free_space(self) -> List[str]:
info = []
for space_info in self.free_space:
@@ -216,7 +258,7 @@ class BlockDevice:
info = space_info
return info
- @property
+ @cached_property
def first_free_sector(self) -> str:
if info := self.largest_free_space:
start = info[0]
@@ -224,7 +266,7 @@ class BlockDevice:
start = '512MB'
return start
- @property
+ @cached_property
def first_end_sector(self) -> str:
if info := self.largest_free_space:
end = info[1]
@@ -247,19 +289,27 @@ class BlockDevice:
def flush_cache(self) -> None:
self.part_cache = {}
- def get_partition(self, uuid :str) -> Partition:
- count = 0
- while count < 5:
- for partition_uuid, partition in self.partitions.items():
- if partition.uuid.lower() == uuid.lower():
- return partition
- else:
- log(f"uuid {uuid} not found. Waiting for {count +1} time",level=logging.DEBUG)
- time.sleep(float(storage['arguments'].get('disk-sleep', 0.2)))
- count += 1
- else:
- log(f"Could not find {uuid} in disk after 5 retries",level=logging.INFO)
- print(f"Cache: {self.part_cache}")
- print(f"Partitions: {self.partitions.items()}")
- print(f"UUID: {[uuid]}")
- raise DiskError(f"New partition {uuid} never showed up after adding new partition on {self}")
+ def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition:
+ if not uuid and not partuuid:
+ raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.")
+
+ for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)):
+ for partition_index, partition in self.partitions.items():
+ try:
+ if uuid and partition.uuid.lower() == uuid.lower():
+ return partition
+ elif partuuid and partition.part_uuid.lower() == partuuid.lower():
+ return partition
+ except DiskError as error:
+ # Most likely a blockdevice that doesn't support or use UUID's
+ # (like Microsoft recovery partition)
+ log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray")
+ pass
+
+ log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
+ time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
+
+ log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)
+ log(f"Cache: {self.part_cache}")
+ log(f"Partitions: {self.partitions.items()}")
+ raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.")
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs/__init__.py
index 33f59721..84b9c0f6 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs/__init__.py
@@ -4,44 +4,25 @@ import glob
import logging
import re
from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
-from dataclasses import dataclass
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
- from ..installer import Installer
-from .helpers import get_mount_info
-from ..exceptions import DiskError
-from ..general import SysCommand
-from ..output import log
-from ..exceptions import SysCallError
-
-@dataclass
-class BtrfsSubvolume:
- target :str
- source :str
- fstype :str
- name :str
- options :str
- root :bool = False
-
-def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]:
- if '[' in struct['source']:
- subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1]
- struct['source'] = struct['source'].replace(f"[{subvolume}]", "")
- yield BtrfsSubvolume(
- target=struct['target'],
- source=struct['source'],
- fstype=struct['fstype'],
- name=subvolume,
- options=struct['options'],
- root=index == 0
- )
- index += 1
-
- for child in struct.get('children', []):
- for item in get_subvolumes_from_findmnt(child, index=index):
- yield item
- index += 1
+ from ...installer import Installer
+
+from .btrfs_helpers import (
+ subvolume_info_from_path as subvolume_info_from_path,
+ find_parent_subvolume as find_parent_subvolume,
+ setup_subvolumes as setup_subvolumes,
+ mount_subvolume as mount_subvolume
+)
+from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
+from .btrfspartition import BTRFSPartition as BTRFSPartition
+
+from ..helpers import get_mount_info
+from ...exceptions import DiskError, Deprecated
+from ...general import SysCommand
+from ...output import log
+from ...exceptions import SysCallError
def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
try:
@@ -57,42 +38,6 @@ def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
return result
-def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
- """
- This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name.
-
- @installation: archinstall.Installer instance
- @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
- @force: overrides the check for weither or not the subvolume mountpoint is empty or not
-
- This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure.
- Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name]
- """
- log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING)
- installation_mountpoint = installation.target
- if type(installation_mountpoint) == str:
- installation_mountpoint = pathlib.Path(installation_mountpoint)
- # Set up the required physical structure
- if type(subvolume_location) == str:
- subvolume_location = pathlib.Path(subvolume_location)
-
- target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor)
-
- if not target.exists():
- target.mkdir(parents=True)
-
- if glob.glob(str(target / '*')) and force is False:
- raise DiskError(f"Cannot mount subvolume to {target} because it contains data (non-empty folder target)")
-
- log(f"Mounting {target} as a subvolume", level=logging.INFO)
- # Mount the logical volume to the physical structure
- mount_information, mountpoint_device_real_path = get_mount_info(target, traverse=True, return_real_path=True)
- if mountpoint_device_real_path == str(target):
- log(f"Unmounting non-subvolume {mount_information['source']} previously mounted at {target}")
- SysCommand(f"umount {mount_information['source']}")
-
- return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0
-
def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -132,13 +77,18 @@ def _has_option(option :str,options :list) -> bool:
"""
if not options:
return False
+
for item in options:
if option in item:
return True
+
return False
def manage_btrfs_subvolumes(installation :Installer,
partition :Dict[str, str],) -> list:
+
+ raise Deprecated("Use setup_subvolumes() instead.")
+
from copy import deepcopy
""" we do the magic with subvolumes in a centralized place
parameters:
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
new file mode 100644
index 00000000..d577d82b
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py
@@ -0,0 +1,132 @@
+import pathlib
+import logging
+from typing import Optional
+
+from ...exceptions import SysCallError, DiskError
+from ...general import SysCommand
+from ...output import log
+from ..helpers import get_mount_info
+from .btrfssubvolume import BtrfsSubvolume
+
+
+def mount_subvolume(installation, device, name, subvolume_information):
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = name.lstrip('/')
+
+ # renormalize the right hand.
+ mountpoint = subvolume_information.get('mountpoint', None)
+ if not mountpoint:
+ return None
+
+ if type(mountpoint) == str:
+ mountpoint = pathlib.Path(mountpoint)
+
+ installation_target = installation.target
+ if type(installation_target) == str:
+ installation_target = pathlib.Path(installation_target)
+
+ mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
+ mountpoint.mkdir(parents=True, exist_ok=True)
+
+ mount_options = subvolume_information.get('options', [])
+ if not any('subvol=' in x for x in mount_options):
+ mount_options += [f'subvol={name}']
+
+ log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
+ SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
+
+
+def setup_subvolumes(installation, partition_dict):
+ """
+ Taken from: ..user_guides.py
+
+ partition['btrfs'] = {
+ "subvolumes" : {
+ "@": "/",
+ "@home": "/home",
+ "@log": "/var/log",
+ "@pkg": "/var/cache/pacman/pkg",
+ "@.snapshots": "/.snapshots"
+ }
+ }
+ """
+ log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
+ for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = name.lstrip('/')
+
+ # renormalize the right hand.
+ # mountpoint = None
+ subvol_options = []
+
+ match right_hand:
+ # case str(): # backwards-compatability
+ # mountpoint = right_hand
+ case dict():
+ # mountpoint = right_hand.get('mountpoint', None)
+ subvol_options = right_hand.get('options', [])
+
+ # We create the subvolume using the BTRFSPartition instance.
+ # That way we ensure not only easy access, but also accurate mount locations etc.
+ partition_dict['device_instance'].create_subvolume(name, installation=installation)
+
+ # Make the nodatacow processing now
+ # It will be the main cause of creation of subvolumes which are not to be mounted
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ if 'nodatacow' in subvol_options:
+ if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so nodatacow doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('nodatacow')]
+ # Make the compress processing now
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ # in this way only zstd compression is activaded
+ # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
+
+ if 'compress' in subvol_options:
+ if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
+ if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so compress doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('compress')]
+
+def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
+ try:
+ subvolume_name = None
+ result = {}
+ for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
+ if index == 0:
+ subvolume_name = line.strip().decode('UTF-8')
+ continue
+
+ if b':' in line:
+ key, value = line.strip().decode('UTF-8').split(':', 1)
+
+ # A bit of a hack, until I figure out how @dataclass
+ # allows for hooking in a pre-processor to do this we have to do it here:
+ result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
+
+ return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
+
+ except SysCallError as error:
+ log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
+
+ return None
+
+def find_parent_subvolume(path :pathlib.Path, filters=[]):
+ # A root path cannot have a parent
+ if str(path) == '/':
+ return None
+
+ if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters):
+ if not (subvolume := subvolume_info_from_path(found_mount['target'])):
+ if found_mount['target'] == '/':
+ return None
+
+ return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
+
+ return subvolume \ No newline at end of file
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
new file mode 100644
index 00000000..5020133d
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -0,0 +1,116 @@
+import glob
+import pathlib
+import logging
+from typing import Optional, TYPE_CHECKING
+
+from ...exceptions import DiskError
+from ...storage import storage
+from ...output import log
+from ...general import SysCommand
+from ..partition import Partition
+from ..helpers import findmnt
+from .btrfs_helpers import (
+ subvolume_info_from_path
+)
+
+if TYPE_CHECKING:
+ from ...installer import Installer
+ from .btrfssubvolume import BtrfsSubvolume
+
+class BTRFSPartition(Partition):
+ def __init__(self, *args, **kwargs):
+ Partition.__init__(self, *args, **kwargs)
+
+ def __repr__(self, *args :str, **kwargs :str) -> str:
+ mount_repr = ''
+ if self.mountpoint:
+ mount_repr = f", mounted={self.mountpoint}"
+ elif self.target_mountpoint:
+ mount_repr = f", rel_mountpoint={self.target_mountpoint}"
+
+ if self._encrypted:
+ return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
+ else:
+ return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
+
+ @property
+ def subvolumes(self):
+ for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
+ if '[' in filesystem.get('source', ''):
+ yield subvolume_info_from_path(filesystem['target'])
+
+ def iterate_children(struct):
+ for child in struct.get('children', []):
+ if '[' in child.get('source', ''):
+ yield subvolume_info_from_path(child['target'])
+
+ for sub_child in iterate_children(child):
+ yield sub_child
+
+ for child in iterate_children(filesystem):
+ yield child
+
+ def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
+ """
+ Subvolumes have to be created within a mountpoint.
+ This means we need to get the current installation target.
+ After we get it, we need to verify it is a btrfs subvolume filesystem.
+ Finally, the destination must be empty.
+ """
+
+ # Allow users to override the installation session
+ if not installation:
+ installation = storage.get('installation_session')
+
+ # Determain if the path given, is an absolute path or a releative path.
+ # We do this by checking if the path contains a known mountpoint.
+ if str(subvolume)[0] == '/':
+ if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
+ if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
+ # Path starts with a known mountpoint which isn't /
+ # Which means it's an absolut path to a mounted location.
+ pass
+ else:
+ # Since it's not an absolute position with a known start.
+ # We omit the anchor ('/' basically) and make sure it's appendable
+ # to the installation.target later
+ subvolume = subvolume.relative_to(subvolume.anchor)
+ # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is.
+
+ # If the subvolume is not absolute, then we do two checks:
+ # 1. Check if the partition itself is mounted somewhere, and use that as a root
+ # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs
+ # If both above fail, we need to warn the user that such setup is not supported.
+ if str(subvolume)[0] != '/':
+ if self.mountpoint is None and installation is None:
+ raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.")
+ elif self.mountpoint:
+ subvolume = self.mountpoint / subvolume
+ elif installation:
+ ongoing_installation_destination = installation.target
+ if type(ongoing_installation_destination) == str:
+ ongoing_installation_destination = pathlib.Path(ongoing_installation_destination)
+
+ subvolume = ongoing_installation_destination / subvolume
+
+ subvolume.parent.mkdir(parents=True, exist_ok=True)
+
+ # <!--
+ # We perform one more check from the given absolute position.
+ # And we traverse backwards in order to locate any if possible subvolumes above
+ # our new btrfs subvolume. This is because it needs to be mounted under it to properly
+ # function.
+ # if btrfs_parent := find_parent_subvolume(subvolume):
+ # print('Found parent:', btrfs_parent)
+ # -->
+
+ log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey")
+
+ if glob.glob(str(subvolume / '*')):
+ raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
+ elif subvolinfo := subvolume_info_from_path(subvolume):
+ raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+
+ SysCommand(f"btrfs subvolume create {subvolume}")
+
+ return subvolume_info_from_path(subvolume) \ No newline at end of file
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolume.py
new file mode 100644
index 00000000..a96e2a94
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfssubvolume.py
@@ -0,0 +1,191 @@
+import pathlib
+import datetime
+import logging
+import string
+import random
+import shutil
+from dataclasses import dataclass
+from typing import Optional, List# , TYPE_CHECKING
+from functools import cached_property
+
+# if TYPE_CHECKING:
+# from ..blockdevice import BlockDevice
+
+from ...exceptions import DiskError
+from ...general import SysCommand
+from ...output import log
+from ...storage import storage
+
+@dataclass
+class BtrfsSubvolume:
+ full_path :pathlib.Path
+ name :str
+ uuid :str
+ parent_uuid :str
+ creation_time :datetime.datetime
+ subvolume_id :int
+ generation :int
+ gen_at_creation :int
+ parent_id :int
+ top_level_id :int
+ send_transid :int
+ send_time :datetime.datetime
+ receive_transid :int
+ received_uuid :Optional[str] = None
+ flags :Optional[str] = None
+ receive_time :Optional[datetime.datetime] = None
+ snapshots :Optional[List] = None
+
+ def __post_init__(self):
+ self.full_path = pathlib.Path(self.full_path)
+
+ # Convert "-" entries to `None`
+ if self.parent_uuid == "-":
+ self.parent_uuid = None
+ if self.received_uuid == "-":
+ self.received_uuid = None
+ if self.flags == "-":
+ self.flags = None
+ if self.receive_time == "-":
+ self.receive_time = None
+ if self.snapshots == "":
+ self.snapshots = []
+
+ # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats)
+ self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time))
+ self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time))
+ if self.receive_time:
+ self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time))
+
+ @property
+ def parent_subvolume(self):
+ from .btrfs_helpers import find_parent_subvolume
+
+ return find_parent_subvolume(self.full_path)
+
+ @property
+ def root(self) -> bool:
+ from .btrfs_helpers import subvolume_info_from_path
+
+ # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
+ # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
+ # It would also be nice if it could use findmnt(self.full_path) and traverse backwards
+ # finding the last occurance of a subvolume which 'self' belongs to.
+ if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
+ return self.full_path == volume.full_path
+
+ return False
+
+ @cached_property
+ def partition(self):
+ from ..helpers import findmnt, get_parent_of_partition, all_blockdevices
+ from ..partition import Partition
+ from ..blockdevice import BlockDevice
+ from ..mapperdev import MapperDev
+ from .btrfspartition import BTRFSPartition
+ from .btrfs_helpers import subvolume_info_from_path
+
+ try:
+ # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device.
+ if filesystem := findmnt(self.full_path).get('filesystems', []):
+ if source := filesystem[0].get('source', None):
+ # Strip away subvolume definitions from findmnt
+ if '[' in source:
+ source = source[:source.find('[')]
+
+ if filesystem[0].get('fstype', '') == 'btrfs':
+ return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
+ elif filesystem[0].get('source', '').startswith('/dev/mapper'):
+ return MapperDev(source)
+ else:
+ return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
+ except DiskError:
+ # Subvolume has never been mounted, we have no reliable way of finding where it is.
+ # But we have the UUID of the partition, and can begin looking for it by mounting
+ # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices.
+
+ log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING)
+ for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items():
+ if type(instance) in (Partition, MapperDev):
+ we_mounted_it = False
+ detection_mountpoint = instance.mountpoint
+ if not detection_mountpoint:
+ if type(instance) == Partition and instance.encrypted:
+ # TODO: Perhaps support unlocking encrypted volumes?
+ # This will cause a lot of potential user interactions tho.
+ log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG)
+ continue
+
+ detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}")
+ detection_mountpoint.mkdir(parents=True, exist_ok=True)
+
+ instance.mount(str(detection_mountpoint))
+ we_mounted_it = True
+
+ if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])):
+ if subvolume := subvolume_info_from_path(filesystem[0]['target']):
+ if subvolume.uuid == self.uuid:
+ # The top level subvolume matched of ourselves,
+ # which means the instance we're iterating has the subvol we're looking for.
+ log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
+ return instance
+
+ def iterate_children(struct):
+ for child in struct.get('children', []):
+ if '[' in child.get('source', ''):
+ yield subvolume_info_from_path(child['target'])
+
+ for sub_child in iterate_children(child):
+ yield sub_child
+
+ for child in iterate_children(filesystem[0]):
+ if child.uuid == self.uuid:
+ # We found a child within the instance that has the subvol we're looking for.
+ log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
+ return instance
+
+ if we_mounted_it:
+ instance.unmount()
+ shutil.rmtree(detection_mountpoint)
+
+ @cached_property
+ def mount_options(self) -> Optional[List[str]]:
+ from ..helpers import findmnt
+
+ if filesystem := findmnt(self.full_path).get('filesystems', []):
+ return filesystem[0].get('options').split(',')
+
+ def convert_to_ISO_format(self, time_string):
+ time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '')
+ iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}"
+ return iso_string
+
+ def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True):
+ from ..helpers import findmnt
+
+ try:
+ if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False):
+ log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}")
+ SysCommand(f"umount {mountpoint}")
+ except DiskError:
+ # No previously mounted device at the mountpoint
+ pass
+
+ if not options:
+ options = []
+
+ try:
+ if include_previously_known_options and (cached_options := self.mount_options):
+ options += cached_options
+ except DiskError:
+ pass
+
+ if not any('subvol=' in x for x in options):
+ options += f'subvol={self.name}'
+
+ SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}")
+ log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray")
+
+ def unmount(self, recurse :bool = True):
+ SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
+ log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 8a531de0..f94b4b47 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -55,7 +55,7 @@ class Filesystem:
output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8'))
for device in output['blockdevices']:
- for index, partition in enumerate(device['children']):
+ for index, partition in enumerate(device.get('children', [])):
# But we'll use blkid to reliably grab the PARTUUID for that child device (partition)
partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip()
if partition_uuid.lower() == uuid.lower():
@@ -65,6 +65,7 @@ class Filesystem:
def load_layout(self, layout :Dict[str, Any]) -> None:
from ..luks import luks2
+ from .btrfs import BTRFSPartition
# If the layout tells us to wipe the drive, we do so
if layout.get('wipe', False):
@@ -76,27 +77,34 @@ class Filesystem:
raise KeyError(f"Could not create a MSDOS label on {self}")
self.blockdevice.flush_cache()
+ time.sleep(3)
prev_partition = None
# We then iterate the partitions in order
for partition in layout.get('partitions', []):
# We don't want to re-add an existing partition (those containing a UUID already)
if partition.get('wipe', False) and not partition.get('PARTUUID', None):
- print(_("Adding partition...."))
start = partition.get('start') or (
prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START)
partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
start=start,
end=partition.get('size', '100%'),
- partition_format=partition.get('filesystem', {}).get('format', 'btrfs'))
- # TODO: device_instance some times become None
- # print('Device instance:', partition['device_instance'])
-
- elif (partition_uuid := partition.get('PARTUUID')) and (partition_instance := self.blockdevice.get_partition(uuid=partition_uuid)):
- print(_("Re-using partition instance: {}").format(partition_instance))
- partition['device_instance'] = partition_instance
+ partition_format=partition.get('filesystem', {}).get('format', 'btrfs'),
+ skip_mklabel=layout.get('wipe', False) is not False)
+
+ elif (partition_uuid := partition.get('PARTUUID')):
+ # We try to deal with both UUID and PARTUUID of a partition when it's being re-used.
+ # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID')
+ # but for now, lets just attempt to deal with both.
+ try:
+ partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
+ except DiskError:
+ partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)
+
+ log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
else:
- raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition.get('PARTUUID'))}).")
+ log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
+ continue
if partition.get('filesystem', {}).get('format', False):
@@ -137,20 +145,32 @@ class Filesystem:
while True:
partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
- print(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
+ log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
continue
break
unlocked_device.format(partition['filesystem']['format'], options=format_options)
+
elif partition.get('wipe', False):
if not partition['device_instance']:
raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
partition['device_instance'].format(partition['filesystem']['format'], options=format_options)
+ if partition['filesystem']['format'] == 'btrfs':
+ # We upgrade the device instance to a BTRFSPartition if we format it as such.
+ # This is so that we can gain access to more features than otherwise available in Partition()
+ partition['device_instance'] = BTRFSPartition(
+ partition['device_instance'].path,
+ block_device=partition['device_instance'].block_device,
+ encrypted=False,
+ filesystem='btrfs',
+ autodetect_filesystem=False
+ )
+
if partition.get('boot', False):
log(f"Marking partition {partition['device_instance']} as bootable.")
- self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
+ self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on')
prev_partition = partition
@@ -190,10 +210,27 @@ class Filesystem:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> Partition:
+ def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
- previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
+ if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
+ # If it's a completely empty drive, and we're about to add partitions to it
+ # we need to make sure there's a filesystem label.
+ if self.mode == GPT:
+ if not self.parted_mklabel(self.blockdevice.device, "gpt"):
+ raise KeyError(f"Could not create a GPT label on {self}")
+ elif self.mode == MBR:
+ if not self.parted_mklabel(self.blockdevice.device, "msdos"):
+ raise KeyError(f"Could not create a MSDOS label on {self}")
+
+ self.blockdevice.flush_cache()
+
+ previous_partuuids = []
+ for partition in self.blockdevice.partitions.values():
+ try:
+ previous_partuuids.append(partition.part_uuid)
+ except DiskError:
+ pass
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
@@ -207,36 +244,36 @@ class Filesystem:
log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG)
if self.parted(parted_string):
- count = 0
- while count < 10:
- new_uuid = None
- new_uuid_set = (previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()})
+ for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
+ self.partprobe()
- if len(new_uuid_set) > 0:
- new_uuid = new_uuid_set.pop()
+ new_partition_uuids = []
+ for partition in self.blockdevice.partitions.values():
+ try:
+ new_partition_uuids.append(partition.part_uuid)
+ except DiskError:
+ pass
+
+ new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
- if new_uuid:
+ if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
try:
- return self.blockdevice.get_partition(new_uuid)
+ return self.blockdevice.get_partition(partuuid=new_partuuid)
except Exception as err:
log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
- log(f'Partition set: {new_uuid_set}', level=logging.ERROR, fg="red")
- log(f'New UUID: {[new_uuid]}', level=logging.ERROR, fg="red")
+ log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
+ log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
raise err
else:
- count += 1
- log(f"Could not get UUID for partition. Waiting before retry attempt {count} of 10 ...",level=logging.DEBUG)
- time.sleep(float(storage['arguments'].get('disk-sleep', 0.2)))
- else:
- log("Add partition is exiting due to excessive wait time", level=logging.ERROR, fg="red")
- raise DiskError(f"New partition never showed up after adding new partition on {self}.")
+ log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
+ time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
# TODO: This should never be able to happen
log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
- log(f"Previous partitions: {previous_partition_uuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {(previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
+ log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
+ log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
raise DiskError(f"Could not add partition using: {parted_string}")
def set_name(self, partition: int, name: str) -> bool:
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 0799cd49..99856aad 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -291,11 +291,37 @@ def find_mountpoint(device_path :str) -> Dict[str, Any]:
except SysCallError:
return {}
-def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
+def findmnt(path :pathlib.Path, traverse :bool = False, ignore :List = [], recurse :bool = True) -> Dict[str, Any]:
+ for traversal in list(map(str, [str(path)] + list(path.parents))):
+ if traversal in ignore:
+ continue
+
+ try:
+ log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
+ if (output := SysCommand(f"/usr/bin/findmnt --json {'--submounts' if recurse else ''} {traversal}").decode('UTF-8')):
+ return json.loads(output)
+
+ except SysCallError as error:
+ log(f"Could not get mount information on {path} but continuing and ignoring: {error}", level=logging.INFO, fg="gray")
+ pass
+
+ if not traverse:
+ break
+
+ raise DiskError(f"Could not get mount information for path {path}")
+
+
+def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False, ignore :List = []) -> Dict[str, Any]:
+ import traceback
+
+ log(f"Deprecated: archinstall.get_mount_info(). Use archinstall.findmnt() instead, which does not do any automatic parsing. Please change at:\n{''.join(traceback.format_stack())}")
device_path, bind_path = split_bind_name(path)
output = {}
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
+ if traversal in ignore:
+ continue
+
try:
log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
@@ -385,9 +411,8 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]:
def get_filesystem_type(path :str) -> Optional[str]:
- device_name, bind_name = split_bind_name(path)
try:
- return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip()
+ return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip()
except SysCallError:
return None
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
index 32e3ac9b..913dbc13 100644
--- a/archinstall/lib/disk/mapperdev.py
+++ b/archinstall/lib/disk/mapperdev.py
@@ -51,11 +51,11 @@ class MapperDev:
raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device")
@property
- def mountpoint(self) -> Optional[str]:
+ def mountpoint(self) -> Optional[pathlib.Path]:
try:
data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
for filesystem in data['filesystems']:
- return filesystem.get('target')
+ return pathlib.Path(filesystem.get('target'))
except SysCallError as error:
# Not mounted anywhere most likely
@@ -76,8 +76,9 @@ class MapperDev:
@property
def subvolumes(self) -> Iterator['BtrfsSubvolume']:
- from .btrfs import get_subvolumes_from_findmnt
-
+ from .btrfs import subvolume_info_from_path
+
for mountpoint in self.mount_information:
- for result in get_subvolumes_from_findmnt(mountpoint):
- yield result \ No newline at end of file
+ if target := mountpoint.get('target'):
+ if subvolume := subvolume_info_from_path(pathlib.Path(target)):
+ yield subvolume \ No newline at end of file
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index e7568258..73c88597 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -13,7 +13,8 @@ from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
-from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume
+from .btrfs.btrfs_helpers import subvolume_info_from_path
+from .btrfs.btrfssubvolume import BtrfsSubvolume
class Partition:
def __init__(self,
@@ -96,11 +97,11 @@ class Partition:
try:
data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
for filesystem in data['filesystems']:
- return filesystem.get('target')
+ return pathlib.Path(filesystem.get('target'))
except SysCallError as error:
# Not mounted anywhere most likely
- log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG)
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey")
pass
return None
@@ -184,7 +185,7 @@ class Partition:
return device['pttype']
@property
- def uuid(self) -> Optional[str]:
+ def part_uuid(self) -> Optional[str]:
"""
Returns the PARTUUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
@@ -197,6 +198,26 @@ class Partition:
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
+ partuuid = self._safe_part_uuid
+ if partuuid:
+ return partuuid
+
+ raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
+
+ @property
+ def uuid(self) -> Optional[str]:
+ """
+ Returns the UUID as returned by lsblk for the **partition**.
+ This is more reliable than relying on /dev/disk/by-uuid as
+ it doesn't seam to be able to detect md raid partitions.
+ For bind mounts all the subvolumes share the same uuid
+ """
+ for i in range(storage['DISK_RETRY_ATTEMPTS']):
+ if not self.partprobe():
+ raise DiskError(f"Could not perform partprobe on {self.device_path}")
+
+ time.sleep(storage.get('DISK_TIMEOUTS', 1) * i)
+
partuuid = self._safe_uuid
if partuuid:
return partuuid
@@ -217,6 +238,28 @@ class Partition:
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
try:
+ return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
+ except SysCallError as error:
+ if self.block_device.info.get('TYPE') == 'iso9660':
+ # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
+ return None
+
+ log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}")
+
+ @property
+ def _safe_part_uuid(self) -> Optional[str]:
+ """
+ A near copy of self.uuid but without any delays.
+ This function should only be used where uuid is not crucial.
+ For instance when you want to get a __repr__ of the class.
+ """
+ if not self.partprobe():
+ if self.block_device.info.get('TYPE') == 'iso9660':
+ return None
+
+ log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
+
+ try:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
except SysCallError as error:
if self.block_device.info.get('TYPE') == 'iso9660':
@@ -262,9 +305,26 @@ class Partition:
@property
def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ from .helpers import findmnt
+
+ def iterate_children_recursively(information):
+ for child in information.get('children', []):
+ if target := child.get('target'):
+ if subvolume := subvolume_info_from_path(pathlib.Path(target)):
+ yield subvolume
+
+ if child.get('children'):
+ for subchild in iterate_children_recursively(child):
+ yield subchild
+
for mountpoint in self.mount_information:
- for result in get_subvolumes_from_findmnt(mountpoint):
- yield result
+ if result := findmnt(pathlib.Path(mountpoint['target'])):
+ for filesystem in result.get('filesystems', []):
+ if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])):
+ yield subvolume
+
+ for child in iterate_children_recursively(filesystem):
+ yield child
def partprobe(self) -> bool:
try:
@@ -315,7 +375,7 @@ class Partition:
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool:
+ def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@@ -337,63 +397,71 @@ class Partition:
if log_formatting:
log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
- if filesystem == 'btrfs':
- options = ['-f'] + options
+ try:
+ if filesystem == 'btrfs':
+ options = ['-f'] + options
- if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
- raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
- self.filesystem = filesystem
+ if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
+ raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
+ self.filesystem = filesystem
- elif filesystem == 'vfat':
- options = ['-F32'] + options
+ elif filesystem == 'vfat':
+ options = ['-F32'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'ext4':
- options = ['-F'] + options
+ elif filesystem == 'ext4':
+ options = ['-F'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'ext2':
- options = ['-F'] + options
+ elif filesystem == 'ext2':
+ options = ['-F'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'ext2'
+ if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
+ self.filesystem = 'ext2'
- elif filesystem == 'xfs':
- options = ['-f'] + options
+ elif filesystem == 'xfs':
+ options = ['-f'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'f2fs':
- options = ['-f'] + options
+ elif filesystem == 'f2fs':
+ options = ['-f'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'ntfs3':
- options = ['-f'] + options
+ elif filesystem == 'ntfs3':
+ options = ['-f'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'crypto_LUKS':
- # from ..luks import luks2
- # encrypted_partition = luks2(self, None, None)
- # encrypted_partition.format(path)
- self.filesystem = filesystem
+ elif filesystem == 'crypto_LUKS':
+ # from ..luks import luks2
+ # encrypted_partition = luks2(self, None, None)
+ # encrypted_partition.format(path)
+ self.filesystem = filesystem
- else:
- raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
+ else:
+ raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
+ except SysCallError as error:
+ log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange")
+ if retry is True:
+ log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
+ time.sleep(storage.get('DISK_TIMEOUTS', 1))
+
+ return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
self.encrypted = True
@@ -413,6 +481,7 @@ class Partition:
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
+
if not fs:
if not self.filesystem:
raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 63ec1d9b..5fa6bfdc 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -5,6 +5,7 @@ from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .blockdevice import BlockDevice
+ _: Any
from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
from ..hardware import has_uefi
@@ -26,13 +27,13 @@ def suggest_single_disk_layout(block_device :BlockDevice,
compression = False
if default_filesystem == 'btrfs':
- prompt = 'Would you like to use BTRFS subvolumes with a default structure?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- using_subvolumes = choice == 'yes'
+ prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ using_subvolumes = choice.value == Menu.yes()
- prompt = 'Would you like to use BTRFS compression?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- compression = choice == 'yes'
+ prompt = str(_('Would you like to use BTRFS compression?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ compression = choice.value == Menu.yes()
layout = {
block_device.path : {
@@ -87,9 +88,9 @@ def suggest_single_disk_layout(block_device :BlockDevice,
layout[block_device.path]['partitions'][-1]['start'] = '513MiB'
if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART:
- prompt = 'Would you like to create a separate partition for /home?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- using_home_partition = choice == 'yes'
+ prompt = str(_('Would you like to create a separate partition for /home?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ using_home_partition = choice.value == Menu.yes()
# Set a size for / (/root)
if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition:
@@ -138,9 +139,7 @@ def suggest_single_disk_layout(block_device :BlockDevice,
return layout
-def suggest_multi_disk_layout(block_devices :List[BlockDevice],
- default_filesystem :Optional[str] = None,
- advanced_options :bool = False) -> Dict[str, Any]:
+def suggest_multi_disk_layout(block_devices :List[BlockDevice], default_filesystem :Optional[str] = None, advanced_options :bool = False):
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
@@ -158,6 +157,13 @@ def suggest_multi_disk_layout(block_devices :List[BlockDevice],
home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART)
root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device])
+ if home_device is None or root_device is None:
+ text = _('The selected drives do not have the minimum capacity required for an automatic suggestion\n')
+ text += _('Minimum capacity for /home partition: {}GB\n').format(MIN_SIZE_TO_ALLOW_HOME_PART)
+ text += _('Minimum capacity for Arch Linux partition: {}GB').format(ARCH_LINUX_INSTALLED_SIZE)
+ Menu(str(text), [str(_('Continue'))], skip=False).run()
+ return None
+
compression = False
if default_filesystem == 'btrfs':
@@ -165,9 +171,9 @@ def suggest_multi_disk_layout(block_devices :List[BlockDevice],
# choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
# using_subvolumes = choice == 'yes'
- prompt = 'Would you like to use BTRFS compression?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- compression = choice == 'yes'
+ prompt = str(_('Would you like to use BTRFS compression?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ compression = choice.value == Menu.yes()
log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG)
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index f6f58151..a16faa3f 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -48,4 +48,8 @@ class PackageError(BaseException):
class TranslationError(BaseException):
+ pass
+
+
+class Deprecated(BaseException):
pass \ No newline at end of file
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 174acb8a..b99e4a45 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -10,6 +10,9 @@ import string
import sys
import time
import re
+import urllib.parse
+import urllib.request
+import pathlib
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
@@ -132,6 +135,8 @@ class JsonEncoder:
return obj.isoformat()
elif isinstance(obj, (list, set, tuple)):
return [json.loads(json.dumps(item, cls=JSON)) for item in obj]
+ elif isinstance(obj, (pathlib.Path)):
+ return str(obj)
else:
return obj
@@ -352,14 +357,13 @@ class SysCommandWorker:
# only way to get the traceback without loosing it.
self.pid, self.child_fd = pty.fork()
- os.chdir(old_dir)
# https://stackoverflow.com/questions/4022600/python-pty-fork-how-does-it-work
if not self.pid:
try:
try:
with open(f"{storage['LOG_PATH']}/cmd_history.txt", "a") as cmd_log:
- cmd_log.write(f"{' '.join(self.cmd)}\n")
+ cmd_log.write(f"{self.cmd}\n")
except PermissionError:
pass
@@ -371,6 +375,9 @@ class SysCommandWorker:
log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red")
self.exit_code = 1
return False
+ else:
+ # Only parent process moves back to the original working directory
+ os.chdir(old_dir)
self.started = time.time()
self.poll_object.register(self.child_fd, EPOLLIN | EPOLLHUP)
@@ -457,7 +464,14 @@ class SysCommand:
if self.session:
return self.session
- with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines) as session:
+ with SysCommandWorker(
+ self.cmd,
+ callbacks=self._callbacks,
+ peak_output=self.peak_output,
+ environment_vars=self.environment_vars,
+ remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines,
+ working_directory=self.working_directory) as session:
+
if not self.session:
self.session = session
@@ -523,32 +537,40 @@ def run_custom_user_commands(commands :List[str], installation :Installer) -> No
log(execution_output)
os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")
-def json_stream_to_structure(id : str, stream :str, target :dict) -> bool :
- """ Function to load a stream (file (as name) or valid JSON string into an existing dictionary
+def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool :
+ """
+ Function to load a stream (file (as name) or valid JSON string into an existing dictionary
Returns true if it could be done
Return false if operation could not be executed
- +id is just a parameter to get meaningful, but not so long messages
+ +configuration_identifier is just a parameter to get meaningful, but not so long messages
"""
- from pathlib import Path
- if Path(stream).exists():
- try:
- with open(Path(stream)) as fh:
- target.update(json.load(fh))
- except Exception as e:
- log(f"{id} = {stream} does not contain a valid JSON format: {e}",level=logging.ERROR)
- return False
+
+ parsed_url = urllib.parse.urlparse(stream)
+
+ if parsed_url.scheme: # The stream is in fact a URL that should be grabed
+ with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
+ target.update(json.loads(response.read()))
else:
- log(f"{id} = {stream} does not exists in the filesystem. Trying as JSON stream",level=logging.DEBUG)
- # NOTE: failure of this check doesn't make stream 'real' invalid JSON, just it first level entry is not an object (i.e. dict), so it is not a format we handle.
- if stream.strip().startswith('{') and stream.strip().endswith('}'):
+ if pathlib.Path(stream).exists():
try:
- target.update(json.loads(stream))
- except Exception as e:
- log(f" {id} Contains an invalid JSON format : {e}",level=logging.ERROR)
+ with pathlib.Path(stream).open() as fh:
+ target.update(json.load(fh))
+ except Exception as error:
+ log(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {error}", level=logging.ERROR, fg="red")
return False
else:
- log(f" {id} is neither a file nor is a JSON string:",level=logging.ERROR)
- return False
+ # NOTE: This is a rudimentary check if what we're trying parse is a dict structure.
+ # Which is the only structure we tolerate anyway.
+ if stream.strip().startswith('{') and stream.strip().endswith('}'):
+ try:
+ target.update(json.loads(stream))
+ except Exception as e:
+ log(f" {configuration_identifier} Contains an invalid JSON format : {e}",level=logging.ERROR, fg="red")
+ return False
+ else:
+ log(f" {configuration_identifier} is neither a file nor is a JSON string:",level=logging.ERROR, fg="red")
+ return False
+
return True
def secret(x :str):
diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py
index f183b0d3..8400d338 100644
--- a/archinstall/lib/hardware.py
+++ b/archinstall/lib/hardware.py
@@ -55,7 +55,7 @@ AVAILABLE_GFX_DRIVERS = {
"mesa",
"xf86-video-intel"
],
- "Nvidia (open-source)": [
+ "Nvidia (open-source nouveau driver)": [
"mesa",
"xf86-video-nouveau",
"libva-mesa-driver"
diff --git a/archinstall/lib/hsm/__init__.py b/archinstall/lib/hsm/__init__.py
new file mode 100644
index 00000000..c0888b04
--- /dev/null
+++ b/archinstall/lib/hsm/__init__.py
@@ -0,0 +1,4 @@
+from .fido import (
+ get_fido2_devices,
+ fido2_enroll
+) \ No newline at end of file
diff --git a/archinstall/lib/hsm/fido.py b/archinstall/lib/hsm/fido.py
new file mode 100644
index 00000000..49f36957
--- /dev/null
+++ b/archinstall/lib/hsm/fido.py
@@ -0,0 +1,57 @@
+import typing
+import pathlib
+import getpass
+import logging
+from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
+from ..disk.partition import Partition
+from ..general import log
+
+def get_fido2_devices() -> typing.Dict[str, typing.Dict[str, str]]:
+ """
+ Uses systemd-cryptenroll to list the FIDO2 devices
+ connected that supports FIDO2.
+ Some devices might show up in udevadm as FIDO2 compliant
+ when they are in fact not.
+
+ The drawback of systemd-cryptenroll is that it uses human readable format.
+ That means we get this weird table like structure that is of no use.
+
+ So we'll look for `MANUFACTURER` and `PRODUCT`, we take their index
+ and we split each line based on those positions.
+ """
+ worker = clear_vt100_escape_codes(SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8'))
+
+ MANUFACTURER_POS = 0
+ PRODUCT_POS = 0
+ devices = {}
+ for line in worker.split('\r\n'):
+ if '/dev' not in line:
+ MANUFACTURER_POS = line.find('MANUFACTURER')
+ PRODUCT_POS = line.find('PRODUCT')
+ continue
+
+ path = line[:MANUFACTURER_POS].rstrip()
+ manufacturer = line[MANUFACTURER_POS:PRODUCT_POS].rstrip()
+ product = line[PRODUCT_POS:]
+
+ devices[path] = {
+ 'manufacturer' : manufacturer,
+ 'product' : product
+ }
+
+ return devices
+
+def fido2_enroll(hsm_device_path :pathlib.Path, partition :Partition, password :str) -> bool:
+ worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device_path} {partition.real_device}", peak_output=True)
+ pw_inputted = False
+ pin_inputted = False
+ while worker.is_alive():
+ if pw_inputted is False and bytes(f"please enter current passphrase for disk {partition.real_device}", 'UTF-8') in worker._trace_log.lower():
+ worker.write(bytes(password, 'UTF-8'))
+ pw_inputted = True
+
+ elif pin_inputted is False and bytes(f"please enter security token pin", 'UTF-8') in worker._trace_log.lower():
+ worker.write(bytes(getpass.getpass(" "), 'UTF-8'))
+ pin_inputted = True
+
+ log(f"You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds.", level=logging.INFO, fg="yellow") \ No newline at end of file
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 8b77317a..b2cd6306 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -13,16 +13,17 @@ from .disk import get_partitions_in_use, Partition
from .general import SysCommand, generate_password
from .hardware import has_uefi, is_vm, cpu_vendor
from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout
-from .disk.helpers import get_mount_info
+from .disk.helpers import findmnt
from .mirrors import use_mirrors
from .plugins import plugins
from .storage import storage
# from .user_interaction import *
from .output import log
from .profiles import Profile
-from .disk.btrfs import manage_btrfs_subvolumes
from .disk.partition import get_mount_fs_type
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
+from .hsm import fido2_enroll
+from .models.users import User
if TYPE_CHECKING:
_: Any
@@ -126,7 +127,9 @@ class Installer:
self.MODULES = []
self.BINARIES = []
self.FILES = []
- self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
+ # systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt
+ # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override.
+ self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"]
self.KERNEL_PARAMS = []
self._zram_enabled = False
@@ -230,21 +233,26 @@ class Installer:
def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
from .luks import luks2
+ from .disk.btrfs import setup_subvolumes, mount_subvolume
+
# set the partitions as a list not part of a tree (which we don't need anymore (i think)
list_part = []
list_luks_handles = []
for blockdevice in layouts:
list_part.extend(layouts[blockdevice]['partitions'])
+ # TODO: Implement a proper mount-queue system that does not depend on return values.
+ mount_queue = {}
+
# we manage the encrypted partititons
for partition in [entry for entry in list_part if entry.get('encrypted', False)]:
# open the luks device and all associate stuff
if not (password := partition.get('!password', None)):
raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}")
- # i change a bit the naming conventions for the loop device
loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
else:
loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
+
# note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device:
if partition.get('generate-encryption-key-file',False) and not self._has_root(partition):
@@ -252,38 +260,74 @@ class Installer:
# this way all the requesrs will be to the dm_crypt device and not to the physical partition
partition['device_instance'] = unlocked_device
+ if self._has_root(partition) and partition.get('generate-encryption-key-file', False) is False:
+ if storage['arguments'].get('HSM'):
+ hsm_device_path = storage['arguments']['HSM']
+ fido2_enroll(hsm_device_path, partition['device_instance'], password)
+
# we manage the btrfs partitions
- for partition in [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]:
- if partition.get('filesystem',{}).get('mount_options',[]):
- mount_options = ','.join(partition['filesystem']['mount_options'])
- self.mount(partition['device_instance'], "/", options=mount_options)
- else:
- self.mount(partition['device_instance'], "/")
- try:
- new_mountpoints = manage_btrfs_subvolumes(self,partition)
- except Exception as e:
- # every exception unmounts the physical volume. Otherwise we let the system in an unstable state
+ if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
+ for partition in btrfs_subvolumes:
+ if mount_options := ','.join(partition.get('filesystem',{}).get('mount_options',[])):
+ self.mount(partition['device_instance'], "/", options=mount_options)
+ else:
+ self.mount(partition['device_instance'], "/")
+
+ setup_subvolumes(
+ installation=self,
+ partition_dict=partition
+ )
+
partition['device_instance'].unmount()
- raise e
- partition['device_instance'].unmount()
- if new_mountpoints:
- list_part.extend(new_mountpoints)
- # we mount. We need to sort by mountpoint to get a good working order
- for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']):
+ # We then handle any special cases, such as btrfs
+ if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]):
+ for partition_information in btrfs_subvolumes:
+ for name, mountpoint in sorted(partition_information['btrfs']['subvolumes'].items(), key=lambda item: item[1]):
+ btrfs_subvolume_information = {}
+
+ match mountpoint:
+ case str(): # backwards-compatability
+ btrfs_subvolume_information['mountpoint'] = mountpoint
+ btrfs_subvolume_information['options'] = []
+ case dict():
+ btrfs_subvolume_information['mountpoint'] = mountpoint.get('mountpoint', None)
+ btrfs_subvolume_information['options'] = mountpoint.get('options', [])
+ case _:
+ continue
+
+ if mountpoint_parsed := btrfs_subvolume_information.get('mountpoint'):
+ # We cache the mount call for later
+ mount_queue[mountpoint_parsed] = lambda device=partition_information['device_instance'], \
+ name=name, \
+ subvolume_information=btrfs_subvolume_information: mount_subvolume(
+ installation=self,
+ device=device,
+ name=name,
+ subvolume_information=subvolume_information
+ )
+
+ # We mount ordinary partitions, and we sort them by the mountpoint
+ for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']):
mountpoint = partition['mountpoint']
log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
if partition.get('filesystem',{}).get('mount_options',[]):
mount_options = ','.join(partition['filesystem']['mount_options'])
- partition['device_instance'].mount(f"{self.target}{mountpoint}", options=mount_options)
+ mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}", options=mount_options: instance.mount(target, options=options)
else:
- partition['device_instance'].mount(f"{self.target}{mountpoint}")
+ mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target)
+
+ log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.INFO, fg="white")
+
+ # We mount everything by sorting on the mountpoint itself.
+ for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]):
+ frozen_func()
time.sleep(1)
try:
- get_mount_info(f"{self.target}{mountpoint}", traverse=False)
+ findmnt(pathlib.Path(f"{self.target}{mountpoint}"), traverse=False)
except DiskError:
raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
@@ -365,15 +409,28 @@ class Installer:
self.log(f'Installing packages: {packages}', level=logging.INFO)
- if (sync_mirrors := run_pacman('-Syy', default_cmd='/usr/bin/pacman')).exit_code == 0:
- if (pacstrap := SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf {self.target} {" ".join(packages)} --noconfirm', peak_output=True)).exit_code == 0:
- return True
- else:
- self.log(f'Could not strap in packages: {pacstrap}', level=logging.ERROR, fg="red")
- self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.ERROR, fg="red")
- raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
- else:
- self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO)
+ # TODO: We technically only need to run the -Syy once.
+ try:
+ run_pacman('-Syy', default_cmd='/usr/bin/pacman')
+ except SysCallError as error:
+ self.log(f'Could not sync a new package databse: {error}', level=logging.ERROR, fg="red")
+
+ if storage['arguments'].get('silent', False) is False:
+ if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
+ return self.pacstrap(*packages, **kwargs)
+
+ raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red")
+
+ try:
+ return SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf {self.target} {" ".join(packages)} --noconfirm', peak_output=True).exit_code == 0
+ except SysCallError as error:
+ self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red")
+
+ if storage['arguments'].get('silent', False) is False:
+ if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
+ return self.pacstrap(*packages, **kwargs)
+
+ raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None:
for plugin in plugins.values():
@@ -596,6 +653,15 @@ class Installer:
mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n")
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
+
+ if not storage['arguments']['HSM']:
+ # For now, if we don't use HSM we revert to the old
+ # way of setting up encryption hooks for mkinitcpio.
+ # This is purely for stability reasons, we're going away from this.
+ # * systemd -> udev
+ # * sd-vconsole -> keymap
+ self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS]
+
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0
@@ -630,8 +696,15 @@ class Installer:
self.HOOKS.remove('fsck')
if self.detect_encryption(partition):
- if 'encrypt' not in self.HOOKS:
- self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
+ if storage['arguments']['HSM']:
+ # Required bby mkinitcpio to add support for fido2-device options
+ self.pacstrap('libfido2')
+
+ if 'sd-encrypt' not in self.HOOKS:
+ self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt')
+ else:
+ if 'encrypt' not in self.HOOKS:
+ self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
if not has_uefi():
self.base_packages.append('grub')
@@ -687,6 +760,14 @@ class Installer:
# TODO: Use python functions for this
SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
+ if storage['arguments']['HSM']:
+ # TODO:
+ # A bit of a hack, but we need to get vconsole.conf in there
+ # before running `mkinitcpio` because it expects it in HSM mode.
+ if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False:
+ with vconsole.open('w') as fh:
+ fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n")
+
self.mkinitcpio('-P')
self.helper_flags['base'] = True
@@ -731,7 +812,9 @@ class Installer:
# And in which case we should do some clean up.
# Install the boot loader
- if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0:
+ try:
+ SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install')
+ except SysCallError:
# Fallback, try creating the boot loader without touching the EFI variables
SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
@@ -777,7 +860,7 @@ class Installer:
elif vendor == "GenuineIntel":
entry.write("initrd /intel-ucode.img\n")
else:
- self.log("unknow cpu vendor, not adding ucode to systemd-boot config")
+ self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG)
entry.write(f"initrd /initramfs-{kernel}.img\n")
# blkid doesn't trigger on loopback devices really well,
# so we'll use the old manual method until we get that sorted out.
@@ -801,11 +884,23 @@ class Installer:
if real_device := self.detect_encryption(root_partition):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
- entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev {options_entry}')
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG)
+
+ kernel_options = f"options"
+
+ if storage['arguments']['HSM']:
+ # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work
+ kernel_options += f" rd.luks.name={real_device.uuid}=luksdev"
+ # Note: tpm2-device and fido2-device don't play along very well:
+ # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645
+ kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no"
+ else:
+ kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev"
+
+ entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}')
else:
- log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
- entry.write(f'options root=PARTUUID={root_partition.uuid} {options_entry}')
+ log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
+ entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}')
self.helper_flags['bootloader'] = "systemd"
@@ -881,7 +976,7 @@ class Installer:
elif vendor == "GenuineIntel":
kernel_parameters.append("initrd=\\intel-ucode.img")
else:
- self.log("unknow cpu vendor, not adding ucode to firmware boot entry")
+ self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.", level=logging.DEBUG)
kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img")
@@ -890,11 +985,11 @@ class Installer:
if real_device := self.detect_encryption(root_partition):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.part_uuid}'.", level=logging.DEBUG)
+ kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.part_uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
else:
- log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
+ log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG)
+ kernel_parameters.append(f'root=PARTUUID={root_partition.part_uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
@@ -921,10 +1016,14 @@ class Installer:
if plugin.on_add_bootloader(self):
return True
+ if type(self.target) == str:
+ self.target = pathlib.Path(self.target)
+
boot_partition = None
root_partition = None
for partition in self.partitions:
- if partition.mountpoint == os.path.join(self.target, 'boot'):
+ print(partition, [partition.mountpoint], [self.target])
+ if partition.mountpoint == self.target / 'boot':
boot_partition = partition
elif partition.mountpoint == self.target:
root_partition = partition
@@ -963,10 +1062,10 @@ class Installer:
if type(profile) == str:
profile = Profile(self, profile)
- self.log(f'Installing network profile {profile}', level=logging.INFO)
+ self.log(f'Installing archinstall profile {profile}', level=logging.INFO)
return profile.install()
- def enable_sudo(self, entity: str, group :bool = False) -> bool:
+ def enable_sudo(self, entity: str, group :bool = False):
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
sudoers_dir = f"{self.target}/etc/sudoers.d"
@@ -996,9 +1095,14 @@ class Installer:
# Guarantees sudoer conf file recommended perms
os.chmod(pathlib.Path(rule_file_name), 0o440)
- return True
+ def create_users(self, users: Union[User, List[User]]):
+ if not isinstance(users, list):
+ users = [users]
+
+ for user in users:
+ self.user_create(user.username, user.password, user.groups, user.sudo)
- def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None:
+ def user_create(self, user :str, password :Optional[str] = None, groups :Optional[List[str]] = None, sudo :bool = False) -> None:
if groups is None:
groups = []
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 710af01e..ac480b11 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -15,7 +15,10 @@ from .general import SysCommand, SysCommandWorker
from .output import log
from .exceptions import SysCallError, DiskError
from .storage import storage
+from .disk.helpers import get_filesystem_type
from .disk.mapperdev import MapperDev
+from .disk.btrfs import BTRFSPartition
+
class luks2:
def __init__(self,
@@ -149,7 +152,6 @@ class luks2:
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
:type mountpoint: str
"""
- from .disk import get_filesystem_type
if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead?
@@ -162,14 +164,22 @@ class luks2:
if os.path.islink(f'/dev/mapper/{mountpoint}'):
self.mapdev = f'/dev/mapper/{mountpoint}'
- unlocked_partition = Partition(
+ if (filesystem_type := get_filesystem_type(pathlib.Path(self.mapdev))) == 'btrfs':
+ return BTRFSPartition(
+ self.mapdev,
+ block_device=MapperDev(mountpoint).partition.block_device,
+ encrypted=True,
+ filesystem=filesystem_type,
+ autodetect_filesystem=False
+ )
+
+ return Partition(
self.mapdev,
block_device=MapperDev(mountpoint).partition.block_device,
encrypted=True,
filesystem=get_filesystem_type(self.mapdev),
autodetect_filesystem=False
)
- return unlocked_partition
def close(self, mountpoint :Optional[str] = None) -> bool:
if not mountpoint:
diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py
index afccd45b..5cb27cab 100644
--- a/archinstall/lib/menu/global_menu.py
+++ b/archinstall/lib/menu/global_menu.py
@@ -1,6 +1,8 @@
from __future__ import annotations
-from typing import Any, List, Optional, Union
+from typing import Any, List, Optional, Union, Dict, TYPE_CHECKING
+
+import archinstall
from ..menu import Menu
from ..menu.selection_menu import Selector, GeneralMenu
@@ -8,8 +10,7 @@ from ..general import SysCommand, secret
from ..hardware import has_uefi
from ..models import NetworkConfiguration
from ..storage import storage
-from ..output import log
-from ..profiles import is_desktop_profile
+from ..profiles import is_desktop_profile, Profile
from ..disk import encrypted_partitions
from ..user_interaction import get_password, ask_for_a_timezone, save_config
@@ -20,7 +21,6 @@ from ..user_interaction import ask_hostname
from ..user_interaction import ask_for_audio_selection
from ..user_interaction import ask_additional_packages_to_install
from ..user_interaction import ask_to_configure_network
-from ..user_interaction import ask_for_superuser_account
from ..user_interaction import ask_for_additional_users
from ..user_interaction import select_language
from ..user_interaction import select_mirror_regions
@@ -32,126 +32,147 @@ from ..user_interaction import select_encrypted_partitions
from ..user_interaction import select_harddrives
from ..user_interaction import select_profile
from ..user_interaction import select_additional_repositories
+from ..models.users import User
+from ..user_interaction.partitioning_conf import current_partition_layout
+from ..output import FormattedOutput
+
+if TYPE_CHECKING:
+ _: Any
+
class GlobalMenu(GeneralMenu):
def __init__(self,data_store):
- super().__init__(data_store=data_store, auto_cursor=True)
+ super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3)
def _setup_selection_menu_options(self):
# archinstall.Language will not use preset values
self._menu_options['archinstall-language'] = \
Selector(
- _('Select Archinstall language'),
- lambda x: self._select_archinstall_language('English'),
+ _('Archinstall language'),
+ lambda x: self._select_archinstall_language(x),
default='English')
self._menu_options['keyboard-layout'] = \
- Selector(_('Select keyboard layout'), lambda preset: select_language('us',preset), default='us')
+ Selector(
+ _('Keyboard layout'),
+ lambda preset: select_language(preset),
+ default='us')
self._menu_options['mirror-region'] = \
Selector(
- _('Select mirror region'),
- select_mirror_regions,
+ _('Mirror region'),
+ lambda preset: select_mirror_regions(preset),
display_func=lambda x: list(x.keys()) if x else '[]',
default={})
self._menu_options['sys-language'] = \
- Selector(_('Select locale language'), lambda preset: select_locale_lang('en_US',preset), default='en_US')
+ Selector(
+ _('Locale language'),
+ lambda preset: select_locale_lang(preset),
+ default='en_US')
self._menu_options['sys-encoding'] = \
- Selector(_('Select locale encoding'), lambda preset: select_locale_enc('utf-8',preset), default='utf-8')
+ Selector(
+ _('Locale encoding'),
+ lambda preset: select_locale_enc(preset),
+ default='UTF-8')
self._menu_options['harddrives'] = \
Selector(
- _('Select harddrives'),
- self._select_harddrives)
+ _('Drive(s)'),
+ lambda preset: self._select_harddrives(preset),
+ display_func=lambda x: f'{len(x)} ' + str(_('Drive(s)')) if x is not None and len(x) > 0 else '',
+ preview_func=self._prev_harddrives,
+ )
self._menu_options['disk_layouts'] = \
Selector(
- _('Select disk layout'),
- lambda x: select_disk_layout(
+ _('Disk layout'),
+ lambda preset: select_disk_layout(
+ preset,
storage['arguments'].get('harddrives', []),
storage['arguments'].get('advanced', False)
),
+ preview_func=self._prev_disk_layouts,
+ display_func=lambda x: self._display_disk_layout(x),
dependencies=['harddrives'])
self._menu_options['!encryption-password'] = \
Selector(
- _('Set encryption password'),
+ _('Encryption password'),
lambda x: self._select_encrypted_password(),
display_func=lambda x: secret(x) if x else 'None',
dependencies=['harddrives'])
+ self._menu_options['HSM'] = Selector(
+ description=_('Use HSM to unlock encrypted drive'),
+ func=lambda preset: self._select_hsm(preset),
+ dependencies=['!encryption-password'],
+ default=None
+ )
self._menu_options['swap'] = \
Selector(
- _('Use swap'),
+ _('Swap'),
lambda preset: ask_for_swap(preset),
default=True)
self._menu_options['bootloader'] = \
Selector(
- _('Select bootloader'),
+ _('Bootloader'),
lambda preset: ask_for_bootloader(storage['arguments'].get('advanced', False),preset),
default="systemd-bootctl" if has_uefi() else "grub-install")
self._menu_options['hostname'] = \
Selector(
- _('Specify hostname'),
+ _('Hostname'),
ask_hostname,
default='archlinux')
# root password won't have preset value
self._menu_options['!root-password'] = \
Selector(
- _('Set root password'),
+ _('Root password'),
lambda preset:self._set_root_password(),
display_func=lambda x: secret(x) if x else 'None')
- self._menu_options['!superusers'] = \
- Selector(
- _('Specify superuser account'),
- lambda preset: self._create_superuser_account(),
- default={},
- exec_func=lambda n,v:self._users_resynch(),
- dependencies_not=['!root-password'],
- display_func=lambda x: self._display_superusers())
self._menu_options['!users'] = \
Selector(
- _('Specify user account'),
- lambda x: self._create_user_account(),
+ _('User account'),
+ lambda x: self._create_user_account(x),
default={},
- exec_func=lambda n,v:self._users_resynch(),
- display_func=lambda x: list(x.keys()) if x else '[]')
+ display_func=lambda x: f'{len(x)} {_("User(s)")}' if len(x) > 0 else None,
+ preview_func=self._prev_users)
self._menu_options['profile'] = \
Selector(
- _('Specify profile'),
- lambda x: self._select_profile(),
- display_func=lambda x: x if x else 'None')
+ _('Profile'),
+ lambda preset: self._select_profile(preset),
+ display_func=lambda x: x if x else 'None'
+ )
self._menu_options['audio'] = \
Selector(
- _('Select audio'),
+ _('Audio'),
lambda preset: ask_for_audio_selection(is_desktop_profile(storage['arguments'].get('profile', None)),preset),
display_func=lambda x: x if x else 'None',
default=None
)
self._menu_options['kernels'] = \
Selector(
- _('Select kernels'),
+ _('Kernels'),
lambda preset: select_kernel(preset),
default=['linux'])
self._menu_options['packages'] = \
Selector(
- _('Additional packages to install'),
+ _('Additional packages'),
# lambda x: ask_additional_packages_to_install(storage['arguments'].get('packages', None)),
ask_additional_packages_to_install,
default=[])
self._menu_options['additional-repositories'] = \
Selector(
- _('Additional repositories to enable'),
+ _('Optional repositories'),
select_additional_repositories,
default=[])
self._menu_options['nic'] = \
Selector(
- _('Configure network'),
+ _('Network configuration'),
ask_to_configure_network,
display_func=lambda x: self._prev_network_configuration(x),
default={})
self._menu_options['timezone'] = \
Selector(
- _('Select timezone'),
+ _('Timezone'),
lambda preset: ask_for_a_timezone(preset),
default='UTC')
self._menu_options['ntp'] = \
Selector(
- _('Set automatic time sync (NTP)'),
+ _('Automatic time sync (NTP)'),
lambda preset: self._select_ntp(preset),
default=True)
self._menu_options['__separator__'] = \
@@ -172,7 +193,7 @@ class GlobalMenu(GeneralMenu):
def _update_install_text(self, name :str = None, result :Any = None):
text = self._install_text()
- self._menu_options.get('install').update_description(text)
+ self._menu_options['install'].update_description(text)
def post_callback(self,name :str = None ,result :Any = None):
self._update_install_text(name, result)
@@ -182,14 +203,21 @@ class GlobalMenu(GeneralMenu):
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
if len(list(encrypted_partitions(storage['arguments'].get('disk_layouts', [])))) == 0:
- storage['arguments']['disk_layouts'] = select_encrypted_partitions(
- storage['arguments']['disk_layouts'], storage['arguments']['!encryption-password'])
+ for blockdevice in storage['arguments']['disk_layouts']:
+ for partition_index in select_encrypted_partitions(
+ title="Select which partitions to encrypt:",
+ partitions=storage['arguments']['disk_layouts'][blockdevice]['partitions']
+ ):
+
+ partition = storage['arguments']['disk_layouts'][blockdevice]['partitions'][partition_index]
+ partition['encrypted'] = True
+ partition['!password'] = storage['arguments']['!encryption-password']
def _install_text(self):
missing = len(self._missing_configs())
if missing > 0:
return _('Install ({} config(s) missing)').format(missing)
- return 'Install'
+ return _('Install')
def _prev_network_configuration(self, cur_value: Union[NetworkConfiguration, List[NetworkConfiguration]]) -> str:
if not cur_value:
@@ -201,6 +229,35 @@ class GlobalMenu(GeneralMenu):
else:
return str(cur_value)
+ def _prev_harddrives(self) -> Optional[str]:
+ selector = self._menu_options['harddrives']
+ if selector.has_selection():
+ drives = selector.current_selection
+ return '\n\n'.join([d.display_info for d in drives])
+ return None
+
+ def _prev_disk_layouts(self) -> Optional[str]:
+ selector = self._menu_options['disk_layouts']
+ if selector.has_selection():
+ layouts: Dict[str, Dict[str, Any]] = selector.current_selection
+
+ output = ''
+ for device, layout in layouts.items():
+ output += f'{_("Device")}: {device}\n\n'
+ output += current_partition_layout(layout['partitions'], with_title=False)
+ output += '\n\n'
+
+ return output.rstrip()
+
+ return None
+
+ def _display_disk_layout(self, current_value: Optional[Dict[str, Any]]) -> str:
+ if current_value:
+ total_partitions = [entry['partitions'] for entry in current_value.values()]
+ total_nr = sum([len(p) for p in total_partitions])
+ return f'{total_nr} {_("Partitions")}'
+ return ''
+
def _prev_install_missing_config(self) -> Optional[str]:
if missing := self._missing_configs():
text = str(_('Missing configurations:\n'))
@@ -209,31 +266,42 @@ class GlobalMenu(GeneralMenu):
return text[:-1] # remove last new line
return None
+ def _prev_users(self) -> Optional[str]:
+ selector = self._menu_options['!users']
+ if selector.has_selection():
+ users: List[User] = selector.current_selection
+ return FormattedOutput.as_table(users)
+ return None
+
def _missing_configs(self) -> List[str]:
def check(s):
return self._menu_options.get(s).has_selection()
+ def has_superuser() -> bool:
+ users = self._menu_options['!users'].current_selection
+ return any([u.sudo for u in users])
+
missing = []
if not check('bootloader'):
missing += ['Bootloader']
if not check('hostname'):
missing += ['Hostname']
- if not check('!root-password') and not check('!superusers'):
- missing += [str(_('Either root-password or at least 1 superuser must be specified'))]
+ if not check('!root-password') and not has_superuser():
+ missing += [str(_('Either root-password or at least 1 user with sudo privileges must be specified'))]
if not check('harddrives'):
missing += ['Hard drives']
if check('harddrives'):
- if not self._menu_options.get('harddrives').is_empty() and not check('disk_layouts'):
+ if not self._menu_options['harddrives'].is_empty() and not check('disk_layouts'):
missing += ['Disk layout']
return missing
- def _set_root_password(self):
+ def _set_root_password(self) -> Optional[str]:
prompt = str(_('Enter root password (leave blank to disable root): '))
password = get_password(prompt=prompt)
return password
- def _select_encrypted_password(self):
+ def _select_encrypted_password(self) -> Optional[str]:
if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))):
return passwd
else:
@@ -247,59 +315,75 @@ class GlobalMenu(GeneralMenu):
return ntp
- def _select_harddrives(self, old_harddrives : list) -> list:
- # old_haddrives = storage['arguments'].get('harddrives', [])
+ def _select_harddrives(self, old_harddrives : list) -> List:
harddrives = select_harddrives(old_harddrives)
- # in case the harddrives got changed we have to reset the disk layout as well
- if old_harddrives != harddrives:
- self._menu_options.get('disk_layouts').set_current_selection(None)
- storage['arguments']['disk_layouts'] = {}
-
- if not harddrives:
+ if len(harddrives) == 0:
prompt = _(
"You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n"
"WARNING: Archinstall won't check the suitability of this setup\n"
"Do you wish to continue?"
).format(storage['MOUNT_POINT'])
- choice = Menu(prompt, ['yes', 'no'], default_option='yes').run()
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run()
- if choice == 'no':
- exit(1)
+ if choice.value == Menu.no():
+ return self._select_harddrives(old_harddrives)
+
+ # in case the harddrives got changed we have to reset the disk layout as well
+ if old_harddrives != harddrives:
+ self._menu_options['disk_layouts'].set_current_selection(None)
+ storage['arguments']['disk_layouts'] = {}
return harddrives
- def _select_profile(self):
- profile = select_profile()
+ def _select_profile(self, preset):
+ profile = select_profile(preset)
+ ret = None
+
+ if profile is None:
+ if any([
+ archinstall.storage.get('profile_minimal', False),
+ archinstall.storage.get('_selected_servers', None),
+ archinstall.storage.get('_desktop_profile', None),
+ archinstall.arguments.get('desktop-environment', None),
+ archinstall.arguments.get('gfx_driver_packages', None)
+ ]):
+ return preset
+ else: # ctrl+c was actioned and all profile settings have been reset
+ return None
+
+ servers = archinstall.storage.get('_selected_servers', [])
+ desktop = archinstall.storage.get('_desktop_profile', None)
+ desktop_env = archinstall.arguments.get('desktop-environment', None)
+ gfx_driver = archinstall.arguments.get('gfx_driver_packages', None)
# Check the potentially selected profiles preparations to get early checks if some additional questions are needed.
if profile and profile.has_prep_function():
namespace = f'{profile.namespace}.py'
with profile.load_instructions(namespace=namespace) as imported:
- if not imported._prep_function():
- log(' * Profile\'s preparation requirements was not fulfilled.', fg='red')
- exit(1)
-
- return profile
-
- def _create_superuser_account(self):
- superusers = ask_for_superuser_account(str(_('Manage superuser accounts: ')))
- return superusers if superusers else None
-
- def _create_user_account(self):
- users = ask_for_additional_users(str(_('Manage ordinary user accounts: ')))
+ if imported._prep_function(servers=servers, desktop=desktop, desktop_env=desktop_env, gfx_driver=gfx_driver):
+ ret: Profile = profile
+
+ match ret.name:
+ case 'minimal':
+ reset = ['_selected_servers', '_desktop_profile', 'desktop-environment', 'gfx_driver_packages']
+ case 'server':
+ reset = ['_desktop_profile', 'desktop-environment']
+ case 'desktop':
+ reset = ['_selected_servers']
+ case 'xorg':
+ reset = ['_selected_servers', '_desktop_profile', 'desktop-environment']
+
+ for r in reset:
+ archinstall.storage[r] = None
+ else:
+ return self._select_profile(preset)
+ elif profile:
+ ret = profile
+
+ return ret
+
+ def _create_user_account(self, defined_users: List[User]) -> List[User]:
+ users = ask_for_additional_users(defined_users=defined_users)
return users
-
- def _display_superusers(self):
- superusers = self._data_store.get('!superusers', {})
-
- if self._menu_options.get('!root-password').has_selection():
- return list(superusers.keys()) if superusers else '[]'
- else:
- return list(superusers.keys()) if superusers else ''
-
- def _users_resynch(self):
- self.synch('!superusers')
- self.synch('!users')
- return False
diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py
index 4e6dffbe..cb567093 100644
--- a/archinstall/lib/menu/list_manager.py
+++ b/archinstall/lib/menu/list_manager.py
@@ -84,12 +84,12 @@ The contents in the base class of this methods serve for a very basic usage, and
```
"""
+import copy
+from os import system
+from typing import Union, Any, TYPE_CHECKING, Dict, Optional
from .text_input import TextInput
from .menu import Menu
-from os import system
-from copy import copy
-from typing import Union, Any, List, TYPE_CHECKING
if TYPE_CHECKING:
_: Any
@@ -144,82 +144,99 @@ class ListManager:
self.bottom_list = [self.confirm_action,self.cancel_action]
self.bottom_item = [self.cancel_action]
self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))]
- self.base_data = base_list
- self._data = copy(base_list) # as refs, changes are immediate
+ self._original_data = copy.deepcopy(base_list)
+ self._data = copy.deepcopy(base_list) # as refs, changes are immediate
# default values for the null case
- self.target = None
+ self.target: Optional[Any] = None
self.action = self._null_action
+
if len(self._data) == 0 and self._null_action:
- self.exec_action(self._data)
+ self._data = self.exec_action(self._data)
def run(self):
while True:
- self._data_formatted = self.reformat(self._data)
- options = self._data_formatted + [self.separator]
+ # this will return a dictionary with the key as the menu entry to be displayed
+ # and the value is the original value from the self._data container
+ data_formatted = self.reformat(self._data)
+ options = list(data_formatted.keys())
+ options.append(self.separator)
+
if self._default_action:
options += self._default_action
+
options += self.bottom_list
+
system('clear')
- target = Menu(self._prompt,
+
+ target = Menu(
+ self._prompt,
options,
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
header=self.header,
- skip_empty_entries=True).run()
+ skip_empty_entries=True,
+ skip=False
+ ).run()
- if not target or target in self.bottom_list:
+ if not target.value or target.value in self.bottom_list:
self.action = target
break
- if target and target == self.separator:
- continue
- if target and target in self._default_action:
- self.action = target
- target = None
+
+ if target.value and target.value in self._default_action:
+ self.action = target.value
self.target = None
- self.exec_action(self._data)
+ self._data = self.exec_action(self._data)
continue
+
if isinstance(self._data,dict):
- key = list(self._data.keys())[self._data_formatted.index(target)]
- self.target = {key: self._data[key]}
+ data_key = data_formatted[target.value]
+ key = self._data[data_key]
+ self.target = {data_key: key}
+ elif isinstance(self._data, list):
+ self.target = [d for d in self._data if d == data_formatted[target.value]][0]
else:
- self.target = self._data[self._data_formatted.index(target)]
+ self.target = self._data[data_formatted[target.value]]
+
# Possible enhacement. If run_actions returns false a message line indicating the failure
- self.run_actions(target)
+ self.run_actions(target.value)
- if not target or target == self.cancel_action: # TODO dubious
- return self.base_data # return the original list
+ if target.value == self.cancel_action: # TODO dubious
+ return self._original_data # return the original list
else:
return self._data
def run_actions(self,prompt_data=None):
options = self.action_list() + self.bottom_item
prompt = _("Select an action for < {} >").format(prompt_data if prompt_data else self.target)
- self.action = Menu(
+ choice = Menu(
prompt,
options,
sort=False,
clear_screen=False,
clear_menu_on_exit=False,
preset_values=self.bottom_item,
- show_search_hint=False).run()
- if not self.action or self.action == self.cancel_action:
- return False
- else:
- return self.exec_action(self._data)
+ show_search_hint=False
+ ).run()
+
+ self.action = choice.value
+
+ if self.action and self.action != self.cancel_action:
+ self._data = self.exec_action(self._data)
+
"""
The following methods are expected to be overwritten by the user if the needs of the list are beyond the simple case
"""
- def reformat(self, data: Any) -> List[Any]:
+ def reformat(self, data: Any) -> Dict[str, Any]:
"""
method to get the data in a format suitable to be shown
It is executed once for run loop and processes the whole self._data structure
"""
if isinstance(data,dict):
- return list(map(lambda x:f"{x} : {data[x]}",data))
+ return {f'{k}: {v}': k for k, v in data.items()}
else:
- return list(map(lambda x:str(x),data))
+ return {str(k): k for k in data}
def action_list(self):
"""
@@ -238,18 +255,18 @@ class ListManager:
# TODO guarantee unicity
if isinstance(self._data,list):
if self.action == str(_('Add')):
- self.target = TextInput(_('Add :'),None).run()
+ self.target = TextInput(_('Add: '),None).run()
self._data.append(self.target)
if self.action == str(_('Copy')):
while True:
- target = TextInput(_('Copy to :'),self.target).run()
+ target = TextInput(_('Copy to: '),self.target).run()
if target != self.target:
self._data.append(self.target)
break
elif self.action == str(_('Edit')):
tgt = self.target
idx = self._data.index(self.target)
- result = TextInput(_('Edite :'),tgt).run()
+ result = TextInput(_('Edit: '),tgt).run()
self._data[idx] = result
elif self.action == str(_('Delete')):
del self._data[self._data.index(self.target)]
@@ -261,8 +278,8 @@ class ListManager:
origkey = None
origval = None
if self.action == str(_('Add')):
- key = TextInput(_('Key :'),None).run()
- value = TextInput(_('Value :'),None).run()
+ key = TextInput(_('Key: '),None).run()
+ value = TextInput(_('Value: '),None).run()
self._data[key] = value
if self.action == str(_('Copy')):
while True:
@@ -271,7 +288,9 @@ class ListManager:
self._data[key] = origval
break
elif self.action == str(_('Edit')):
- value = TextInput(_(f'Edit {origkey} :'),origval).run()
+ value = TextInput(_('Edit {}: ').format(origkey), origval).run()
self._data[origkey] = value
elif self.action == str(_('Delete')):
del self._data[origkey]
+
+ return self._data
diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py
index d254e0f9..c34814eb 100644
--- a/archinstall/lib/menu/menu.py
+++ b/archinstall/lib/menu/menu.py
@@ -1,4 +1,6 @@
-from typing import Dict, List, Union, Any, TYPE_CHECKING
+from dataclasses import dataclass
+from enum import Enum, auto
+from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional
from archinstall.lib.menu.simple_menu import TerminalMenu
@@ -12,21 +14,49 @@ import logging
if TYPE_CHECKING:
_: Any
+
+class MenuSelectionType(Enum):
+ Selection = auto()
+ Esc = auto()
+ Ctrl_c = auto()
+
+
+@dataclass
+class MenuSelection:
+ type_: MenuSelectionType
+ value: Optional[Union[str, List[str]]] = None
+
+
class Menu(TerminalMenu):
+
+ @classmethod
+ def yes(cls):
+ return str(_('yes'))
+
+ @classmethod
+ def no(cls):
+ return str(_('no'))
+
+ @classmethod
+ def yes_no(cls):
+ return [cls.yes(), cls.no()]
+
def __init__(
self,
title :str,
p_options :Union[List[str], Dict[str, Any]],
skip :bool = True,
multi :bool = False,
- default_option :str = None,
+ default_option : Optional[str] = None,
sort :bool = True,
preset_values :Union[str, List[str]] = None,
- cursor_index :int = None,
+ cursor_index : Optional[int] = None,
preview_command=None,
preview_size=0.75,
preview_title='Info',
header :Union[List[str],str] = None,
+ explode_on_interrupt :bool = False,
+ explode_warning :str = '',
**kwargs
):
"""
@@ -66,9 +96,15 @@ class Menu(TerminalMenu):
:param preview_title: Title of the preview window
:type preview_title: str
- param: header one or more header lines for the menu
+ param header: one or more header lines for the menu
type param: string or list
+ param explode_on_interrupt: This will explicitly handle a ctrl+c instead and return that specific state
+ type param: bool
+
+ param explode_warning: If explode_on_interrupt is True and this is non-empty, there will be a warning with a user confirmation displayed
+ type param: str
+
:param kwargs : any SimpleTerminal parameter
"""
# we guarantee the inmutability of the options outside the class.
@@ -85,6 +121,8 @@ class Menu(TerminalMenu):
log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING)
raise RequirementError("Menu() requires an iterable as option.")
+ self._default_str = str(_('(default)'))
+
if isinstance(p_options,dict):
options = list(p_options.keys())
else:
@@ -103,27 +141,40 @@ class Menu(TerminalMenu):
if sort:
options = sorted(options)
- self.menu_options = options
- self.skip = skip
- self.default_option = default_option
- self.multi = multi
+ self._menu_options = options
+ self._skip = skip
+ self._default_option = default_option
+ self._multi = multi
+ self._explode_on_interrupt = explode_on_interrupt
+ self._explode_warning = explode_warning
+
menu_title = f'\n{title}\n\n'
+
if header:
- separator = '\n '
if not isinstance(header,(list,tuple)):
- header = [header,]
- if skip:
- menu_title += str(_("Use ESC to skip\n"))
- menu_title += separator + separator.join(header)
- elif skip:
- menu_title += str(_("Use ESC to skip\n\n"))
+ header = [header]
+ header = '\n'.join(header)
+ menu_title += f'\n{header}\n'
+
+ action_info = ''
+ if skip:
+ action_info += str(_("Use ESC to skip"))
+
+ if self._explode_on_interrupt:
+ if len(action_info) > 0:
+ action_info += '\n'
+ action_info += str(_('Use CTRL+C to reset current selection\n\n'))
+
+ menu_title += action_info
+
if default_option:
# if a default value was specified we move that one
# to the top of the list and mark it as default as well
- default = f'{default_option} (default)'
- self.menu_options = [default] + [o for o in self.menu_options if default_option != o]
+ default = f'{default_option} {self._default_str}'
+ self._menu_options = [default] + [o for o in self._menu_options if default_option != o]
+
+ self._preselection(preset_values,cursor_index)
- self.preselection(preset_values,cursor_index)
cursor = "> "
main_menu_cursor_style = ("fg_cyan", "bold")
main_menu_style = ("bg_blue", "fg_gray")
@@ -131,8 +182,9 @@ class Menu(TerminalMenu):
kwargs['clear_screen'] = kwargs.get('clear_screen',True)
kwargs['show_search_hint'] = kwargs.get('show_search_hint',True)
kwargs['cycle_cursor'] = kwargs.get('cycle_cursor',True)
+
super().__init__(
- menu_entries=self.menu_options,
+ menu_entries=self._menu_options,
title=menu_title,
menu_cursor=cursor,
menu_cursor_style=main_menu_cursor_style,
@@ -146,31 +198,46 @@ class Menu(TerminalMenu):
preview_command=preview_command,
preview_size=preview_size,
preview_title=preview_title,
+ explode_on_interrupt=self._explode_on_interrupt,
+ multi_select_select_on_accept=False,
**kwargs,
)
- def _show(self):
- idx = self.show()
+ def _show(self) -> MenuSelection:
+ try:
+ idx = self.show()
+ except KeyboardInterrupt:
+ return MenuSelection(type_=MenuSelectionType.Ctrl_c)
+
+ def check_default(elem):
+ if self._default_option is not None and f'{self._default_option} {self._default_str}' in elem:
+ return self._default_option
+ else:
+ return elem
+
if idx is not None:
if isinstance(idx, (list, tuple)):
- return [self.default_option if ' (default)' in self.menu_options[i] else self.menu_options[i] for i in idx]
+ results = []
+ for i in idx:
+ option = check_default(self._menu_options[i])
+ results.append(option)
+ return MenuSelection(type_=MenuSelectionType.Selection, value=results)
else:
- selected = self.menu_options[idx]
- if ' (default)' in selected and self.default_option:
- return self.default_option
- return selected
+ result = check_default(self._menu_options[idx])
+ return MenuSelection(type_=MenuSelectionType.Selection, value=result)
else:
- if self.default_option:
- if self.multi:
- return [self.default_option]
- else:
- return self.default_option
- return None
-
- def run(self):
+ return MenuSelection(type_=MenuSelectionType.Esc)
+
+ def run(self) -> MenuSelection:
ret = self._show()
- if ret is None and not self.skip:
+ if ret.type_ == MenuSelectionType.Ctrl_c:
+ if self._explode_on_interrupt and len(self._explode_warning) > 0:
+ response = Menu(self._explode_warning, Menu.yes_no(), skip=False).run()
+ if response.value == Menu.no():
+ return self.run()
+
+ if ret.type_ is not MenuSelectionType.Selection and not self._skip:
return self.run()
return ret
@@ -185,15 +252,15 @@ class Menu(TerminalMenu):
pos = self._menu_entries.index(value)
self.set_cursor_pos(pos)
- def preselection(self,preset_values :list = [],cursor_index :int = None):
+ def _preselection(self,preset_values :Union[str, List[str]] = [], cursor_index : Optional[int] = None):
def from_preset_to_cursor():
if preset_values:
# if the value is not extant return 0 as cursor index
try:
if isinstance(preset_values,str):
- self.cursor_index = self.menu_options.index(self.preset_values)
+ self.cursor_index = self._menu_options.index(self.preset_values)
else: # should return an error, but this is smoother
- self.cursor_index = self.menu_options.index(self.preset_values[0])
+ self.cursor_index = self._menu_options.index(self.preset_values[0])
except ValueError:
self.cursor_index = 0
@@ -203,13 +270,13 @@ class Menu(TerminalMenu):
return
self.preset_values = preset_values
- if self.default_option:
- if isinstance(preset_values,str) and self.default_option == preset_values:
- self.preset_values = f"{preset_values} (default)"
- elif isinstance(preset_values,(list,tuple)) and self.default_option in preset_values:
- idx = preset_values.index(self.default_option)
- self.preset_values[idx] = f"{preset_values[idx]} (default)"
- if cursor_index is None or not self.multi:
+ if self._default_option:
+ if isinstance(preset_values,str) and self._default_option == preset_values:
+ self.preset_values = f"{preset_values} {self._default_str}"
+ elif isinstance(preset_values,(list,tuple)) and self._default_option in preset_values:
+ idx = preset_values.index(self._default_option)
+ self.preset_values[idx] = f"{preset_values[idx]} {self._default_str}"
+ if cursor_index is None or not self._multi:
from_preset_to_cursor()
- if not self.multi: # Not supported by the infraestructure
+ if not self._multi: # Not supported by the infraestructure
self.preset_values = None
diff --git a/archinstall/lib/menu/selection_menu.py b/archinstall/lib/menu/selection_menu.py
index b2f99423..57e290f1 100644
--- a/archinstall/lib/menu/selection_menu.py
+++ b/archinstall/lib/menu/selection_menu.py
@@ -2,23 +2,27 @@ from __future__ import annotations
import logging
import sys
+import pathlib
from typing import Callable, Any, List, Iterator, Tuple, Optional, Dict, TYPE_CHECKING
-from .menu import Menu
+from .menu import Menu, MenuSelectionType
from ..locale_helpers import set_keyboard_language
from ..output import log
from ..translation import Translation
+from ..hsm.fido import get_fido2_devices
if TYPE_CHECKING:
_: Any
-def select_archinstall_language(default='English'):
+
+def select_archinstall_language(preset_value: str) -> Optional[Any]:
"""
copied from user_interaction/general_conf.py as a temporary measure
"""
- languages = Translation.get_all_names()
- language = Menu(_('Select Archinstall language'), languages, default_option=default).run()
- return language
+ languages = Translation.get_available_lang()
+ language = Menu(_('Archinstall language'), languages, preset_values=preset_value).run()
+ return language.value
+
class Selector:
def __init__(
@@ -91,6 +95,10 @@ class Selector:
self._no_store = no_store
@property
+ def description(self) -> str:
+ return self._description
+
+ @property
def dependencies(self) -> List:
return self._dependencies
@@ -115,7 +123,7 @@ class Selector:
def update_description(self, description :str):
self._description = description
- def menu_text(self) -> str:
+ def menu_text(self, padding: int = 0) -> str:
if self._description == '': # special menu option for __separator__
return ''
@@ -128,14 +136,14 @@ class Selector:
current = str(self._current_selection)
if current:
- padding = 35 - len(str(self._description))
- current = ' ' * padding + f'SET: {current}'
-
- return f'{self._description} {current}'
+ padding += 5
+ description = str(self._description).ljust(padding, ' ')
+ current = str(_('set: {}').format(current))
+ else:
+ description = self._description
+ current = ''
- @property
- def text(self):
- return self.menu_text()
+ return f'{description} {current}'
def set_current_selection(self, current :Optional[str]):
self._current_selection = current
@@ -262,8 +270,14 @@ class GeneralMenu:
return preview()
return None
+ def _get_menu_text_padding(self, entries: List[Selector]):
+ return max([len(str(selection.description)) for selection in entries])
+
def _find_selection(self, selection_name: str) -> Tuple[str, Selector]:
- option = [(k, v) for k, v in self._menu_options.items() if v.text.strip() == selection_name.strip()]
+ enabled_menus = self._menus_to_enable()
+ padding = self._get_menu_text_padding(list(enabled_menus.values()))
+ option = [(k, v) for k, v in self._menu_options.items() if v.menu_text(padding).strip() == selection_name.strip()]
+
if len(option) != 1:
raise ValueError(f'Selection not found: {selection_name}')
config_name = option[0][0]
@@ -275,14 +289,18 @@ class GeneralMenu:
# we synch all the options just in case
for item in self.list_options():
self.synch(item)
- self.post_callback # as all the values can vary i have to exec this callback
+
+ self.post_callback() # as all the values can vary i have to exec this callback
cursor_pos = None
+
while True:
# Before continuing, set the preferred keyboard layout/language in the current terminal.
# This will just help the user with the next following questions.
self._set_kb_language()
enabled_menus = self._menus_to_enable()
- menu_options = [m.text for m in enabled_menus.values()]
+
+ padding = self._get_menu_text_padding(list(enabled_menus.values()))
+ menu_options = [m.menu_text(padding) for m in enabled_menus.values()]
selection = Menu(
_('Set/Modify the below options'),
@@ -291,18 +309,31 @@ class GeneralMenu:
cursor_index=cursor_pos,
preview_command=self._preview_display,
preview_size=self.preview_size,
- skip_empty_entries=True
+ skip_empty_entries=True,
+ skip=False
).run()
- if selection and self.auto_cursor:
- cursor_pos = menu_options.index(selection) + 1 # before the strip otherwise fails
- if cursor_pos >= len(menu_options):
- cursor_pos = len(menu_options) - 1
- selection = selection.strip()
- if selection:
- # if this calls returns false, we exit the menu. We allow for an callback for special processing on realeasing control
- if not self._process_selection(selection):
- break
+ if selection.type_ == MenuSelectionType.Selection:
+ value = selection.value
+
+ if self.auto_cursor:
+ cursor_pos = menu_options.index(value) + 1 # before the strip otherwise fails
+
+ # in case the new position lands on a "placeholder" we'll skip them as well
+ while True:
+ if cursor_pos >= len(menu_options):
+ cursor_pos = 0
+ if len(menu_options[cursor_pos]) > 0:
+ break
+ cursor_pos += 1
+
+ value = value.strip()
+
+ # if this calls returns false, we exit the menu
+ # we allow for an callback for special processing on realeasing control
+ if not self._process_selection(value):
+ break
+
if not self.is_context_mgr:
self.__exit__()
@@ -423,15 +454,41 @@ class GeneralMenu:
def mandatory_overview(self) -> Tuple[int, int]:
mandatory_fields = 0
mandatory_waiting = 0
- for field in self._menu_options:
- option = self._menu_options[field]
+ for field, option in self._menu_options.items():
if option.is_mandatory():
mandatory_fields += 1
if not option.has_selection():
mandatory_waiting += 1
return mandatory_fields, mandatory_waiting
- def _select_archinstall_language(self, default_lang):
- language = select_archinstall_language(default_lang)
- self._translation.activate(language)
- return language
+ def _select_archinstall_language(self, preset_value: str) -> str:
+ language = select_archinstall_language(preset_value)
+ if language is not None:
+ self._translation.activate(language)
+ return language
+
+ return preset_value
+
+ def _select_hsm(self, preset :Optional[pathlib.Path] = None) -> Optional[pathlib.Path]:
+ title = _('Select which partitions to mark for formatting:')
+ title += '\n'
+
+ fido_devices = get_fido2_devices()
+
+ indexes = []
+ for index, path in enumerate(fido_devices.keys()):
+ title += f"{index}: {path} ({fido_devices[path]['manufacturer']} - {fido_devices[path]['product']})"
+ indexes.append(f"{index}|{fido_devices[path]['product']}")
+
+ title += '\n'
+
+ choice = Menu(title, indexes, multi=False).run()
+
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Selection:
+ selection: Any = choice.value
+ index = int(selection.split('|',1)[0])
+ return pathlib.Path(list(fido_devices.keys())[index])
+
+ return None
diff --git a/archinstall/lib/menu/simple_menu.py b/archinstall/lib/menu/simple_menu.py
index a0a241bd..947259eb 100644
--- a/archinstall/lib/menu/simple_menu.py
+++ b/archinstall/lib/menu/simple_menu.py
@@ -596,7 +596,8 @@ class TerminalMenu:
status_bar: Optional[Union[str, Iterable[str], Callable[[str], str]]] = None,
status_bar_below_preview: bool = DEFAULT_STATUS_BAR_BELOW_PREVIEW,
status_bar_style: Optional[Iterable[str]] = DEFAULT_STATUS_BAR_STYLE,
- title: Optional[Union[str, Iterable[str]]] = None
+ title: Optional[Union[str, Iterable[str]]] = None,
+ explode_on_interrupt: bool = False
):
def extract_shortcuts_menu_entries_and_preview_arguments(
entries: Iterable[str],
@@ -718,6 +719,7 @@ class TerminalMenu:
self._search_case_sensitive = search_case_sensitive
self._search_highlight_style = tuple(search_highlight_style) if search_highlight_style is not None else ()
self._search_key = search_key
+ self._explode_on_interrupt = explode_on_interrupt
self._shortcut_brackets_highlight_style = (
tuple(shortcut_brackets_highlight_style) if shortcut_brackets_highlight_style is not None else ()
)
@@ -1538,7 +1540,9 @@ class TerminalMenu:
# Only append `next_key` if it is a printable character and the first character is not the
# `search_start` key
self._search.search_text += next_key
- except KeyboardInterrupt:
+ except KeyboardInterrupt as e:
+ if self._explode_on_interrupt:
+ raise e
menu_was_interrupted = True
finally:
reset_signal_handling()
@@ -1842,6 +1846,12 @@ def get_argumentparser() -> argparse.ArgumentParser:
)
parser.add_argument("-t", "--title", action="store", dest="title", help="menu title")
parser.add_argument(
+ "--explode-on-interrupt",
+ action="store_true",
+ dest="explode_on_interrupt",
+ help="Instead of quitting the menu, this will raise the KeyboardInterrupt Exception",
+ )
+ parser.add_argument(
"-V", "--version", action="store_true", dest="print_version", help="print the version number and exit"
)
parser.add_argument("entries", action="store", nargs="*", help="the menu entries to show")
@@ -1971,6 +1981,7 @@ def main() -> None:
status_bar_below_preview=args.status_bar_below_preview,
status_bar_style=args.status_bar_style,
title=args.title,
+ explode_on_interrupt=args.explode_on_interrupt,
)
except (InvalidParameterCombinationError, InvalidStyleError, UnknownMenuEntryError) as e:
print(str(e), file=sys.stderr)
diff --git a/archinstall/lib/models/network_configuration.py b/archinstall/lib/models/network_configuration.py
index 16136177..4f135da5 100644
--- a/archinstall/lib/models/network_configuration.py
+++ b/archinstall/lib/models/network_configuration.py
@@ -5,6 +5,7 @@ from enum import Enum
from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING
from ..output import log
+from ..storage import storage
if TYPE_CHECKING:
_: Any
@@ -77,7 +78,9 @@ class NetworkConfigurationHandler:
installation.copy_iso_network_config(
enable_services=True) # Sources the ISO network configuration to the install medium.
elif self._configuration.is_network_manager():
- installation.add_additional_packages("networkmanager")
+ installation.add_additional_packages(["networkmanager"])
+ if (profile := storage['arguments'].get('profile')) and profile.is_desktop_profile:
+ installation.add_additional_packages(["network-manager-applet"])
installation.enable_service('NetworkManager.service')
def _backwards_compability_config(self, config: Union[str,Dict[str, str]]) -> Union[List[NetworkConfiguration], NetworkConfiguration, None]:
diff --git a/archinstall/lib/models/users.py b/archinstall/lib/models/users.py
new file mode 100644
index 00000000..6052b73a
--- /dev/null
+++ b/archinstall/lib/models/users.py
@@ -0,0 +1,77 @@
+from dataclasses import dataclass
+from typing import Dict, List, Union, Any, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ _: Any
+
+
+@dataclass
+class User:
+ username: str
+ password: str
+ sudo: bool
+
+ @property
+ def groups(self) -> List[str]:
+ # this property should be transferred into a class attr instead
+ # if it's every going to be used
+ return []
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'username': self.username,
+ '!password': self.password,
+ 'sudo': self.sudo
+ }
+
+ def display(self) -> str:
+ password = '*' * len(self.password)
+ return f'{_("Username")}: {self.username:16} {_("Password")}: {password:16} sudo: {str(self.sudo)}'
+
+ @classmethod
+ def _parse(cls, config_users: List[Dict[str, Any]]) -> List['User']:
+ users = []
+
+ for entry in config_users:
+ username = entry.get('username', None)
+ password = entry.get('!password', '')
+ sudo = entry.get('sudo', False)
+
+ if username is None:
+ continue
+
+ user = User(username, password, sudo)
+ users.append(user)
+
+ return users
+
+ @classmethod
+ def _parse_backwards_compatible(cls, config_users: Dict, sudo: bool) -> List['User']:
+ if len(config_users.keys()) > 0:
+ username = list(config_users.keys())[0]
+ password = config_users[username]['!password']
+
+ if password:
+ return [User(username, password, sudo)]
+
+ return []
+
+ @classmethod
+ def parse_arguments(
+ cls,
+ config_users: Union[List[Dict[str, str]], Dict[str, str]],
+ config_superusers: Union[List[Dict[str, str]], Dict[str, str]]
+ ) -> List['User']:
+ users = []
+
+ # backwards compability
+ if isinstance(config_users, dict):
+ users += cls._parse_backwards_compatible(config_users, False)
+ else:
+ users += cls._parse(config_users)
+
+ # backwards compability
+ if isinstance(config_superusers, dict):
+ users += cls._parse_backwards_compatible(config_superusers, True)
+
+ return users
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index da41d16d..29b73bc4 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -2,11 +2,47 @@ import logging
import os
import sys
from pathlib import Path
-from typing import Dict, Union
+from typing import Dict, Union, List, Any
from .storage import storage
+class FormattedOutput:
+
+ @classmethod
+ def values(cls, o: Any) -> Dict[str, Any]:
+ if hasattr(o, 'json'):
+ return o.json()
+ else:
+ return o.__dict__
+
+ @classmethod
+ def as_table(cls, obj: List[Any]) -> str:
+ column_width: Dict[str, int] = {}
+ for o in obj:
+ for k, v in cls.values(o).items():
+ column_width.setdefault(k, 0)
+ column_width[k] = max([column_width[k], len(str(v)), len(k)])
+
+ output = ''
+ for key, width in column_width.items():
+ key = key.replace('!', '')
+ output += key.ljust(width) + ' | '
+
+ output = output[:-3] + '\n'
+ output += '-' * len(output) + '\n'
+
+ for o in obj:
+ for k, v in cls.values(o).items():
+ if '!' in k:
+ v = '*' * len(str(v))
+ output += str(v).ljust(column_width[k]) + ' | '
+ output = output[:-3]
+ output += '\n'
+
+ return output
+
+
class Journald:
@staticmethod
def log(message :str, level :int = logging.DEBUG) -> None:
@@ -61,9 +97,11 @@ def stylize_output(text: str, *opts :str, **kwargs) -> str:
'magenta' : '5',
'cyan' : '6',
'white' : '7',
- 'orange' : '8;5;208', # Extended 256-bit colors (not always supported)
- 'darkorange' : '8;5;202',# https://www.lihaoyi.com/post/BuildyourownCommandLinewithANSIescapecodes.html#256-colors
+ 'teal' : '8;5;109', # Extended 256-bit colors (not always supported)
+ 'orange' : '8;5;208', # https://www.lihaoyi.com/post/BuildyourownCommandLinewithANSIescapecodes.html#256-colors
+ 'darkorange' : '8;5;202',
'gray' : '8;5;246',
+ 'grey' : '8;5;246',
'darkgray' : '8;5;240',
'lightgray' : '8;5;256'
}
diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py
index 7f920317..99e3811c 100644
--- a/archinstall/lib/plugins.py
+++ b/archinstall/lib/plugins.py
@@ -18,7 +18,7 @@ plugins = {}
# 1: List archinstall.plugin definitions
# 2: Load the plugin entrypoint
# 3: Initiate the plugin and store it as .name in plugins
-for plugin_definition in metadata.entry_points().get('archinstall.plugin', []):
+for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'):
plugin_entrypoint = plugin_definition.load()
try:
plugins[plugin_definition.name] = plugin_entrypoint()
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 65a30b0b..a4fbe490 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -207,6 +207,14 @@ class Profile(Script):
def __repr__(self, *args :str, **kwargs :str) -> str:
return f'Profile({os.path.basename(self.profile)})'
+ @property
+ def name(self) -> str:
+ return os.path.basename(self.profile)
+
+ @property
+ def is_desktop_profile(self) -> bool:
+ return is_desktop_profile(repr(self))
+
def install(self) -> ModuleType:
# Before installing, revert any temporary changes to the namespace.
# This ensures that the namespace during installation is the original initiation namespace.
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index 650b9c0e..dd7ddc88 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -23,7 +23,7 @@ storage: Dict[str, Any] = {
'MOUNT_POINT': '/mnt/archinstall',
'ENC_IDENTIFIER': 'ainst',
'DISK_TIMEOUTS' : 1, # seconds
- 'DISK_RETRY_ATTEMPTS' : 20, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations
+ 'DISK_RETRY_ATTEMPTS' : 5, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations
'CMD_LOCALE':{'LC_ALL':'C'}, # default locale for execution commands. Can be overriden with set_cmd_locale()
'CMD_LOCALE_DEFAULT':{'LC_ALL':'C'}, # should be the same as the former. Not be used except in reset_cmd_locale()
}
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py
index 417870da..3d2f0385 100644
--- a/archinstall/lib/systemd.py
+++ b/archinstall/lib/systemd.py
@@ -91,20 +91,25 @@ class Boot:
log(f"The error above occured in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red")
shutdown = None
+ shutdown_exit_code = -1
try:
shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now')
except SysCallError as error:
- if error.exit_code == 256:
- pass
+ shutdown_exit_code = error.exit_code
+ # if error.exit_code == 256:
+ # pass
while self.session.is_alive():
time.sleep(0.25)
- if self.session.exit_code == 0 or (shutdown and shutdown.exit_code == 0):
+ if shutdown:
+ shutdown_exit_code = shutdown.exit_code
+
+ if self.session.exit_code == 0 or shutdown_exit_code == 0:
storage['active_boot'] = None
else:
- raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {shutdown}", exit_code=shutdown.exit_code)
+ raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {self.session.exit_code}/{shutdown_exit_code}", exit_code=next(filter(bool, [self.session.exit_code, shutdown_exit_code])))
def __iter__(self) -> Iterator[str]:
if self.session:
diff --git a/archinstall/lib/translation.py b/archinstall/lib/translation.py
index 74ffd691..1a0e94e4 100644
--- a/archinstall/lib/translation.py
+++ b/archinstall/lib/translation.py
@@ -5,14 +5,14 @@ import os
import gettext
from pathlib import Path
-from typing import List, Dict, Any, TYPE_CHECKING
+from typing import List, Dict, Any, TYPE_CHECKING, Tuple
from .exceptions import TranslationError
if TYPE_CHECKING:
_: Any
-class Languages:
+class LanguageDefinitions:
def __init__(self):
self._mappings = self._get_language_mappings()
@@ -70,11 +70,11 @@ class Translation:
def __init__(self, locales_dir):
self._languages = {}
- for name in self.get_all_names():
+ for names in self._get_translation_lang():
try:
- self._languages[name] = gettext.translation('base', localedir=locales_dir, languages=[name])
+ self._languages[names[0]] = gettext.translation('base', localedir=locales_dir, languages=names)
except FileNotFoundError as error:
- raise TranslationError(f"Could not locate language file for '{name}': {error}")
+ raise TranslationError(f"Could not locate language file for '{names}': {error}")
def activate(self, name):
if language := self._languages.get(name, None):
@@ -94,10 +94,19 @@ class Translation:
return locales_dir
@classmethod
- def get_all_names(cls) -> List[str]:
+ def _defined_languages(cls) -> List[str]:
locales_dir = cls.get_locales_dir()
filenames = os.listdir(locales_dir)
- def_languages = filter(lambda x: len(x) == 2, filenames)
+ return list(filter(lambda x: len(x) == 2, filenames))
- languages = Languages()
+ @classmethod
+ def _get_translation_lang(cls) -> List[Tuple[str, str]]:
+ def_languages = cls._defined_languages()
+ languages = LanguageDefinitions()
+ return [(languages.get_language(lang), lang) for lang in def_languages]
+
+ @classmethod
+ def get_available_lang(cls) -> List[str]:
+ def_languages = cls._defined_languages()
+ languages = LanguageDefinitions()
return [languages.get_language(lang) for lang in def_languages]
diff --git a/archinstall/lib/udev/__init__.py b/archinstall/lib/udev/__init__.py
new file mode 100644
index 00000000..86c8cc29
--- /dev/null
+++ b/archinstall/lib/udev/__init__.py
@@ -0,0 +1 @@
+from .udevadm import udevadm_info \ No newline at end of file
diff --git a/archinstall/lib/udev/udevadm.py b/archinstall/lib/udev/udevadm.py
new file mode 100644
index 00000000..84ec9cfd
--- /dev/null
+++ b/archinstall/lib/udev/udevadm.py
@@ -0,0 +1,17 @@
+import typing
+import pathlib
+from ..general import SysCommand
+
+def udevadm_info(path :pathlib.Path) -> typing.Dict[str, str]:
+ if path.resolve().exists() is False:
+ return {}
+
+ result = SysCommand(f"udevadm info {path.resolve()}")
+ data = {}
+ for line in result:
+ if b': ' in line and b'=' in line:
+ _, obj = line.split(b': ', 1)
+ key, value = obj.split(b'=', 1)
+ data[key.decode('UTF-8').lower()] = value.decode('UTF-8').strip()
+
+ return data \ No newline at end of file
diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py
index b0174d94..8aba4b4d 100644
--- a/archinstall/lib/user_interaction/__init__.py
+++ b/archinstall/lib/user_interaction/__init__.py
@@ -1,5 +1,5 @@
from .save_conf import save_config
-from .manage_users_conf import ask_for_superuser_account, ask_for_additional_users
+from .manage_users_conf import ask_for_additional_users
from .backwards_compatible_conf import generic_select, generic_multi_select
from .locale_conf import select_locale_lang, select_locale_enc
from .system_conf import select_kernel, select_harddrives, select_driver, ask_for_bootloader, ask_for_swap
diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/user_interaction/disk_conf.py
index 9238a766..371d052f 100644
--- a/archinstall/lib/user_interaction/disk_conf.py
+++ b/archinstall/lib/user_interaction/disk_conf.py
@@ -1,18 +1,18 @@
from __future__ import annotations
-from typing import Any, Dict, TYPE_CHECKING
+from typing import Any, Dict, TYPE_CHECKING, Optional
from .partitioning_conf import manage_new_and_existing_partitions, get_default_partition_layout
from ..disk import BlockDevice
from ..exceptions import DiskError
from ..menu import Menu
-from ..output import log
+from ..menu.menu import MenuSelectionType
if TYPE_CHECKING:
_: Any
-def ask_for_main_filesystem_format(advanced_options=False):
+def ask_for_main_filesystem_format(advanced_options=False) -> str:
options = {'btrfs': 'btrfs', 'ext4': 'ext4', 'xfs': 'xfs', 'f2fs': 'f2fs'}
advanced = {'ntfs': 'ntfs'}
@@ -22,7 +22,7 @@ def ask_for_main_filesystem_format(advanced_options=False):
prompt = _('Select which filesystem your main partition should use')
choice = Menu(prompt, options, skip=False).run()
- return choice
+ return choice.value
def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
@@ -30,27 +30,36 @@ def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
for device in block_devices:
layout = manage_new_and_existing_partitions(device)
-
result[device.path] = layout
return result
-def select_disk_layout(block_devices: list, advanced_options=False) -> Dict[str, Any]:
+def select_disk_layout(preset: Optional[Dict[str, Any]], block_devices: list, advanced_options=False) -> Optional[Dict[str, Any]]:
wipe_mode = str(_('Wipe all selected drives and use a best-effort default partition layout'))
custome_mode = str(_('Select what to do with each individual drive (followed by partition usage)'))
modes = [wipe_mode, custome_mode]
- print(modes)
- mode = Menu(_('Select what you wish to do with the selected block devices'), modes, skip=False).run()
+ warning = str(_('Are you sure you want to reset this setting?'))
+
+ choice = Menu(
+ _('Select what you wish to do with the selected block devices'),
+ modes,
+ explode_on_interrupt=True,
+ explode_warning=warning
+ ).run()
- if mode == wipe_mode:
- return get_default_partition_layout(block_devices, advanced_options)
- else:
- return select_individual_blockdevice_usage(block_devices)
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Ctrl_c: return None
+ case MenuSelectionType.Selection:
+ if choice.value == wipe_mode:
+ return get_default_partition_layout(block_devices, advanced_options)
+ else:
+ return select_individual_blockdevice_usage(block_devices)
-def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> BlockDevice:
+def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> Optional[BlockDevice]:
"""
Asks the user to select a harddrive from the `dict_o_disks` selection.
Usually this is combined with :ref:`archinstall.list_drives`.
@@ -63,19 +72,15 @@ def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> BlockDevice:
"""
drives = sorted(list(dict_o_disks.keys()))
if len(drives) >= 1:
- for index, drive in enumerate(drives):
- print(
- f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})"
- )
+ title = str(_('You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)')) + '\n'
+ title += str(_('Select one of the disks or skip and use /mnt as default'))
- log("You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)",
- fg="yellow")
+ choice = Menu(title, drives).run()
- drive = Menu('Select one of the disks or skip and use "/mnt" as default"', drives).run()
- if not drive:
- return drive
+ if choice.type_ == MenuSelectionType.Esc:
+ return None
- drive = dict_o_disks[drive]
+ drive = dict_o_disks[choice.value]
return drive
raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py
index c42e9e27..d4dc60db 100644
--- a/archinstall/lib/user_interaction/general_conf.py
+++ b/archinstall/lib/user_interaction/general_conf.py
@@ -3,6 +3,9 @@ from __future__ import annotations
import logging
from typing import List, Any, Optional, Dict, TYPE_CHECKING
+import archinstall
+
+from ..menu.menu import MenuSelectionType
from ..menu.text_input import TextInput
from ..locale_helpers import list_keyboard_languages, list_timezones
@@ -22,11 +25,12 @@ def ask_ntp(preset: bool = True) -> bool:
prompt = str(_('Would you like to use automatic time synchronization (NTP) with the default time servers?\n'))
prompt += str(_('Hardware time and other post-configuration steps might be required in order for NTP to work.\nFor more information, please check the Arch wiki'))
if preset:
- preset_val = 'yes'
+ preset_val = Menu.yes()
else:
- preset_val = 'no'
- choice = Menu(prompt, ['yes', 'no'], skip=False, preset_values=preset_val, default_option='yes').run()
- return False if choice == 'no' else True
+ preset_val = Menu.no()
+ choice = Menu(prompt, Menu.yes_no(), skip=False, preset_values=preset_val, default_option=Menu.yes()).run()
+
+ return False if choice.value == Menu.no() else True
def ask_hostname(preset: str = None) -> str:
@@ -38,23 +42,31 @@ def ask_for_a_timezone(preset: str = None) -> str:
timezones = list_timezones()
default = 'UTC'
- selected_tz = Menu(_('Select a timezone'),
- list(timezones),
- skip=False,
- preset_values=preset,
- default_option=default).run()
+ choice = Menu(
+ _('Select a timezone'),
+ list(timezones),
+ preset_values=preset,
+ default_option=default
+ ).run()
- return selected_tz
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Selection: return choice.value
def ask_for_audio_selection(desktop: bool = True, preset: str = None) -> str:
- audio = 'pipewire' if desktop else 'none'
- choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none']
- selected_audio = Menu(_('Choose an audio server'), choices, preset_values=preset, default_option=audio, skip=False).run()
- return selected_audio
+ no_audio = str(_('No audio server'))
+ choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', no_audio]
+ default = 'pipewire' if desktop else no_audio
+
+ choice = Menu(_('Choose an audio server'), choices, preset_values=preset, default_option=default).run()
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Selection: return choice.value
-def select_language(default_value: str, preset_value: str = None) -> str:
+
+def select_language(preset_value: str = None) -> str:
"""
Asks the user to select a language
Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
@@ -64,16 +76,19 @@ def select_language(default_value: str, preset_value: str = None) -> str:
"""
kb_lang = list_keyboard_languages()
# sort alphabetically and then by length
- # it's fine if the list is big because the Menu
- # allows for searching anyways
sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len)
- selected_lang = Menu(_('Select Keyboard layout'),
- sorted_kb_lang,
- default_option=default_value,
- preset_values=preset_value,
- sort=False).run()
- return selected_lang
+ selected_lang = Menu(
+ _('Select keyboard layout'),
+ sorted_kb_lang,
+ preset_values=preset_value,
+ sort=False
+ ).run()
+
+ if selected_lang.value is None:
+ return preset_value
+
+ return selected_lang.value
def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
@@ -89,24 +104,27 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
else:
preselected = list(preset_values.keys())
mirrors = list_mirrors()
- selected_mirror = Menu(_('Select one of the regions to download packages from'),
- list(mirrors.keys()),
- preset_values=preselected,
- multi=True).run()
+ selected_mirror = Menu(
+ _('Select one of the regions to download packages from'),
+ list(mirrors.keys()),
+ preset_values=preselected,
+ multi=True,
+ explode_on_interrupt=True
+ ).run()
- if selected_mirror is not None:
- return {selected: mirrors[selected] for selected in selected_mirror}
-
- return {}
+ match selected_mirror.type_:
+ case MenuSelectionType.Ctrl_c: return {}
+ case MenuSelectionType.Esc: return preset_values
+ case _: return {selected: mirrors[selected] for selected in selected_mirror.value}
def select_archinstall_language(default='English'):
- languages = Translation.get_all_names()
- language = Menu(_('Select Archinstall language'), languages, default_option=default).run()
+ languages = Translation.get_available_lang()
+ language = Menu(_('Archinstall language'), languages, default_option=default).run()
return language
-def select_profile() -> Optional[Profile]:
+def select_profile(preset) -> Optional[Profile]:
"""
# Asks the user to select a profile from the available profiles.
#
@@ -124,13 +142,27 @@ def select_profile() -> Optional[Profile]:
options[option] = profile
title = _('This is a list of pre-programmed profiles, they might make it easier to install things like desktop environments')
-
- selection = Menu(title=title, p_options=list(options.keys())).run()
-
- if selection is not None:
- return options[selection]
-
- return None
+ warning = str(_('Are you sure you want to reset this setting?'))
+
+ selection = Menu(
+ title=title,
+ p_options=list(options.keys()),
+ explode_on_interrupt=True,
+ explode_warning=warning
+ ).run()
+
+ match selection.type_:
+ case MenuSelectionType.Selection:
+ return options[selection.value] if selection.value is not None else None
+ case MenuSelectionType.Ctrl_c:
+ archinstall.storage['profile_minimal'] = False
+ archinstall.storage['_selected_servers'] = []
+ archinstall.storage['_desktop_profile'] = None
+ archinstall.arguments['desktop-environment'] = None
+ archinstall.arguments['gfx_driver_packages'] = None
+ return None
+ case MenuSelectionType.Esc:
+ return None
def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List[str]:
@@ -171,14 +203,16 @@ def select_additional_repositories(preset: List[str]) -> List[str]:
repositories = ["multilib", "testing"]
- additional_repositories = Menu(_('Choose which optional additional repositories to enable'),
- repositories,
- sort=False,
- multi=True,
- preset_values=preset,
- default_option=[]).run()
-
- if additional_repositories is not None:
- return additional_repositories
-
- return []
+ choice = Menu(
+ _('Choose which optional additional repositories to enable'),
+ repositories,
+ sort=False,
+ multi=True,
+ preset_values=preset,
+ explode_on_interrupt=True
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Ctrl_c: return []
+ case MenuSelectionType.Selection: return choice.value
diff --git a/archinstall/lib/user_interaction/locale_conf.py b/archinstall/lib/user_interaction/locale_conf.py
index d48018cf..15720064 100644
--- a/archinstall/lib/user_interaction/locale_conf.py
+++ b/archinstall/lib/user_interaction/locale_conf.py
@@ -4,32 +4,39 @@ from typing import Any, TYPE_CHECKING
from ..locale_helpers import list_locales
from ..menu import Menu
+from ..menu.menu import MenuSelectionType
if TYPE_CHECKING:
_: Any
-def select_locale_lang(default: str, preset: str = None) -> str:
+def select_locale_lang(preset: str = None) -> str:
locales = list_locales()
locale_lang = set([locale.split()[0] for locale in locales])
- selected_locale = Menu(_('Choose which locale language to use'),
- locale_lang,
- sort=True,
- preset_values=preset,
- default_option=default).run()
+ selected_locale = Menu(
+ _('Choose which locale language to use'),
+ list(locale_lang),
+ sort=True,
+ preset_values=preset
+ ).run()
- return selected_locale
+ match selected_locale.type_:
+ case MenuSelectionType.Selection: return selected_locale.value
+ case MenuSelectionType.Esc: return preset
-def select_locale_enc(default: str, preset: str = None) -> str:
+def select_locale_enc(preset: str = None) -> str:
locales = list_locales()
locale_enc = set([locale.split()[1] for locale in locales])
- selected_locale = Menu(_('Choose which locale encoding to use'),
- locale_enc,
- sort=True,
- preset_values=preset,
- default_option=default).run()
+ selected_locale = Menu(
+ _('Choose which locale encoding to use'),
+ list(locale_enc),
+ sort=True,
+ preset_values=preset
+ ).run()
- return selected_locale
+ match selected_locale.type_:
+ case MenuSelectionType.Selection: return selected_locale.value
+ case MenuSelectionType.Esc: return preset
diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py
index a6ff3111..567a2964 100644
--- a/archinstall/lib/user_interaction/manage_users_conf.py
+++ b/archinstall/lib/user_interaction/manage_users_conf.py
@@ -1,14 +1,12 @@
from __future__ import annotations
-import logging
import re
-from typing import Any, Dict, TYPE_CHECKING, List
+from typing import Any, Dict, TYPE_CHECKING, List, Optional
+from .utils import get_password
from ..menu import Menu
from ..menu.list_manager import ListManager
-from ..output import log
-from ..storage import storage
-from .utils import get_password
+from ..models.users import User
if TYPE_CHECKING:
_: Any
@@ -19,7 +17,7 @@ class UserList(ListManager):
subclass of ListManager for the managing of user accounts
"""
- def __init__(self, prompt: str, lusers: dict, sudo: bool = None):
+ def __init__(self, prompt: str, lusers: List[User]):
"""
param: prompt
type: str
@@ -27,140 +25,83 @@ class UserList(ListManager):
type: Dict
param: sudo. boolean to determine if we handle superusers or users. If None handles both types
"""
- self.sudo = sudo
- self.actions = [
+ self._actions = [
str(_('Add a user')),
str(_('Change password')),
str(_('Promote/Demote user')),
str(_('Delete User'))
]
- super().__init__(prompt, lusers, self.actions, self.actions[0])
-
- def reformat(self, data: Any) -> List[Any]:
- def format_element(elem :str):
- # secret gives away the length of the password
- if data[elem].get('!password'):
- pwd = '*' * 16
- else:
- pwd = ''
- if data[elem].get('sudoer'):
- super_user = 'Superuser'
- else:
- super_user = ' '
- return f"{elem:16}: password {pwd:16} {super_user}"
+ super().__init__(prompt, lusers, self._actions, self._actions[0])
- return list(map(lambda x: format_element(x), data))
+ def reformat(self, data: List[User]) -> Dict[str, User]:
+ return {e.display(): e for e in data}
def action_list(self):
- if self.target:
- active_user = list(self.target.keys())[0]
- else:
- active_user = None
- sudoer = self.target[active_user].get('sudoer', False)
- if self.sudo is None:
- return self.actions
- if self.sudo and sudoer:
- return self.actions
- elif self.sudo and not sudoer:
- return [self.actions[2]]
- elif not self.sudo and sudoer:
- return [self.actions[2]]
+ active_user = self.target if self.target else None
+
+ if active_user is None:
+ return [self._actions[0]]
else:
- return self.actions
+ return self._actions[1:]
- def exec_action(self, data: Any):
+ def exec_action(self, data: List[User]) -> List[User]:
if self.target:
- active_user = list(self.target.keys())[0]
+ active_user = self.target
else:
active_user = None
- if self.action == self.actions[0]: # add
- new_user = self.add_user()
- # no unicity check, if exists will be replaced
- data.update(new_user)
- elif self.action == self.actions[1]: # change password
- data[active_user]['!password'] = get_password(
- prompt=str(_('Password for user "{}": ').format(active_user)))
- elif self.action == self.actions[2]: # promote/demote
- data[active_user]['sudoer'] = not data[active_user]['sudoer']
- elif self.action == self.actions[3]: # delete
- del data[active_user]
+ if self.action == self._actions[0]: # add
+ new_user = self._add_user()
+ if new_user is not None:
+ # in case a user with the same username as an existing user
+ # was created we'll replace the existing one
+ data = [d for d in data if d.username != new_user.username]
+ data += [new_user]
+ elif self.action == self._actions[1]: # change password
+ prompt = str(_('Password for user "{}": ').format(active_user.username))
+ new_password = get_password(prompt=prompt)
+ if new_password:
+ user = next(filter(lambda x: x == active_user, data), 1)
+ user.password = new_password
+ elif self.action == self._actions[2]: # promote/demote
+ user = next(filter(lambda x: x == active_user, data), 1)
+ user.sudo = False if user.sudo else True
+ elif self.action == self._actions[3]: # delete
+ data = [d for d in data if d != active_user]
+
+ return data
def _check_for_correct_username(self, username: str) -> bool:
if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
return True
- log("The username you entered is invalid. Try again", level=logging.WARNING, fg='red')
return False
- def add_user(self):
+ def _add_user(self) -> Optional[User]:
print(_('\nDefine a new user\n'))
- prompt = str(_("User Name : "))
+ prompt = str(_('Enter username (leave blank to skip): '))
+
while True:
- userid = input(prompt).strip(' ')
- if not userid:
- return {} # end
- if not self._check_for_correct_username(userid):
- pass
+ username = input(prompt).strip(' ')
+ if not username:
+ return None
+ if not self._check_for_correct_username(username):
+ prompt = str(_("The username you entered is invalid. Try again")) + '\n' + prompt
else:
break
- if self.sudo:
- sudoer = True
- elif self.sudo is not None and not self.sudo:
- sudoer = False
- else:
- sudoer = False
- sudo_choice = Menu(str(_('Should {} be a superuser (sudoer)?')).format(userid), ['yes', 'no'],
- skip=False,
- preset_values='yes' if sudoer else 'no',
- default_option='no').run()
- sudoer = True if sudo_choice == 'yes' else False
-
- password = get_password(prompt=str(_('Password for user "{}": ').format(userid)))
-
- return {userid: {"!password": password, "sudoer": sudoer}}
-
-
-def manage_users(prompt: str, sudo: bool) -> tuple[dict, dict]:
- # TODO Filtering and some kind of simpler code
- lusers = {}
- if storage['arguments'].get('!superusers', {}):
- lusers.update({
- uid: {
- '!password': storage['arguments']['!superusers'][uid].get('!password'),
- 'sudoer': True
- }
- for uid in storage['arguments'].get('!superusers', {})
- })
- if storage['arguments'].get('!users', {}):
- lusers.update({
- uid: {
- '!password': storage['arguments']['!users'][uid].get('!password'),
- 'sudoer': False
- }
- for uid in storage['arguments'].get('!users', {})
- })
- # processing
- lusers = UserList(prompt, lusers, sudo).run()
- # return data
- superusers = {
- uid: {
- '!password': lusers[uid].get('!password')
- }
- for uid in lusers if lusers[uid].get('sudoer', False)
- }
- users = {uid: {'!password': lusers[uid].get('!password')} for uid in lusers if not lusers[uid].get('sudoer', False)}
- storage['arguments']['!superusers'] = superusers
- storage['arguments']['!users'] = users
- return superusers, users
-
-
-def ask_for_superuser_account(prompt: str) -> Dict[str, Dict[str, str]]:
- prompt = prompt if prompt else str(_('Define users with sudo privilege, by username: '))
- superusers, dummy = manage_users(prompt, sudo=True)
- return superusers
-
-
-def ask_for_additional_users(prompt: str = '') -> Dict[str, Dict[str, str | None]]:
- prompt = prompt if prompt else _('Any additional users to install (leave blank for no users): ')
- dummy, users = manage_users(prompt, sudo=False)
+
+ password = get_password(prompt=str(_('Password for user "{}": ').format(username)))
+
+ choice = Menu(
+ str(_('Should "{}" be a superuser (sudo)?')).format(username), Menu.yes_no(),
+ skip=False,
+ default_option=Menu.no()
+ ).run()
+
+ sudo = True if choice.value == Menu.yes() else False
+ return User(username, password, sudo)
+
+
+def ask_for_additional_users(prompt: str = '', defined_users: List[User] = []) -> List[User]:
+ prompt = prompt if prompt else _('Enter username (leave blank to skip): ')
+ users = UserList(prompt, defined_users).run()
return users
diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py
index 80c9106b..5154d8b1 100644
--- a/archinstall/lib/user_interaction/network_conf.py
+++ b/archinstall/lib/user_interaction/network_conf.py
@@ -4,6 +4,7 @@ import ipaddress
import logging
from typing import Any, Optional, TYPE_CHECKING, List, Union
+from ..menu.menu import MenuSelectionType
from ..menu.text_input import TextInput
from ..models.network_configuration import NetworkConfiguration, NicType
@@ -63,11 +64,17 @@ class ManualNetworkConfig(ListManager):
elif self.action == self._action_delete:
del data[iface_name]
- def _select_iface(self, existing_ifaces: List[str]) -> Optional[str]:
+ return data
+
+ def _select_iface(self, existing_ifaces: List[str]) -> Optional[Any]:
all_ifaces = list_interfaces().values()
available = set(all_ifaces) - set(existing_ifaces)
- iface = Menu(str(_('Select interface to add')), list(available), skip=True).run()
- return iface
+ choice = Menu(str(_('Select interface to add')), list(available), skip=True).run()
+
+ if choice.type_ == MenuSelectionType.Esc:
+ return None
+
+ return choice.value
def _edit_iface(self, edit_iface :NetworkConfiguration):
iface_name = edit_iface.iface
@@ -75,9 +82,9 @@ class ManualNetworkConfig(ListManager):
default_mode = 'DHCP (auto detect)'
prompt = _('Select which mode to configure for "{}" or skip to use default mode "{}"').format(iface_name, default_mode)
- mode = Menu(prompt, modes, default_option=default_mode).run()
+ mode = Menu(prompt, modes, default_option=default_mode, skip=False).run()
- if mode == 'IP (static)':
+ if mode.value == 'IP (static)':
while 1:
prompt = _('Enter the IP and subnet for {} (example: 192.168.0.5/24): ').format(iface_name)
ip = TextInput(prompt, edit_iface.ip).run().strip()
@@ -89,14 +96,14 @@ class ManualNetworkConfig(ListManager):
log("You need to enter a valid IP in IP-config mode.", level=logging.WARNING, fg='red')
# Implemented new check for correct gateway IP address
+ gateway = None
+
while 1:
- gateway = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '),
+ gateway_input = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '),
edit_iface.gateway).run().strip()
try:
- if len(gateway) == 0:
- gateway = None
- else:
- ipaddress.ip_address(gateway)
+ if len(gateway_input) > 0:
+ ipaddress.ip_address(gateway_input)
break
except ValueError:
log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red')
@@ -107,6 +114,7 @@ class ManualNetworkConfig(ListManager):
display_dns = None
dns_input = TextInput(_('Enter your DNS servers (space separated, blank for none): '), display_dns).run().strip()
+ dns = []
if len(dns_input):
dns = dns_input.split(' ')
@@ -135,23 +143,28 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw
elif preset.type == 'network_manager':
cursor_idx = 1
- nic = Menu(_(
- 'Select one network interface to configure'),
+ warning = str(_('Are you sure you want to reset this setting?'))
+
+ choice = Menu(
+ _('Select one network interface to configure'),
list(network_options.values()),
cursor_index=cursor_idx,
- sort=False
+ sort=False,
+ explode_on_interrupt=True,
+ explode_warning=warning
).run()
- if not nic:
- return preset
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Ctrl_c: return None
- if nic == network_options['none']:
+ if choice.value == network_options['none']:
return None
- elif nic == network_options['iso_config']:
+ elif choice.value == network_options['iso_config']:
return NetworkConfiguration(NicType.ISO)
- elif nic == network_options['network_manager']:
+ elif choice.value == network_options['network_manager']:
return NetworkConfiguration(NicType.NM)
- elif nic == network_options['manual']:
+ elif choice.value == network_options['manual']:
manual = ManualNetworkConfig('Configure interfaces', preset)
return manual.run_manual()
diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py
index af1d224f..bfff5705 100644
--- a/archinstall/lib/user_interaction/partitioning_conf.py
+++ b/archinstall/lib/user_interaction/partitioning_conf.py
@@ -1,12 +1,13 @@
from __future__ import annotations
-from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable
+import copy
+from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable, Optional
from ..menu import Menu
+from ..menu.menu import MenuSelectionType
from ..output import log
from ..disk.validators import fs_types
-from ..disk.helpers import has_mountpoint
if TYPE_CHECKING:
from ..disk import BlockDevice
@@ -19,9 +20,9 @@ def partition_overlap(partitions: list, start: str, end: str) -> bool:
return False
-def _current_partition_layout(partitions: List[Partition], with_idx: bool = False) -> str:
+def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool = False, with_title: bool = True) -> str:
- def do_padding(name, max_len):
+ def do_padding(name: str, max_len: int):
spaces = abs(len(str(name)) - max_len) + 2
pad_left = int(spaces / 2)
pad_right = spaces - pad_left
@@ -61,39 +62,54 @@ def _current_partition_layout(partitions: List[Partition], with_idx: bool = Fals
current_layout += f'{row[:-1]}\n'
- title = str(_('Current partition layout'))
- return f'\n\n{title}:\n\n{current_layout}'
+ if with_title:
+ title = str(_('Current partition layout'))
+ return f'\n\n{title}:\n\n{current_layout}'
+ return current_layout
-def select_partition(title :str, partitions :List[Partition], multiple :bool = False, filter :Callable = None) -> Union[int, List[int], None]:
+
+def _get_partitions(partitions :List[Partition], filter_ :Callable = None) -> List[str]:
"""
filter allows to filter out the indexes once they are set. Should return True if element is to be included
"""
partition_indexes = []
for i in range(len(partitions)):
- if filter:
- if filter(partitions[i]):
+ if filter_:
+ if filter_(partitions[i]):
partition_indexes.append(str(i))
else:
partition_indexes.append(str(i))
+
+ return partition_indexes
+
+
+def select_partition(
+ title :str,
+ partitions :List[Partition],
+ multiple :bool = False,
+ filter_ :Callable = None
+) -> Optional[int, List[int]]:
+ partition_indexes = _get_partitions(partitions, filter_)
+
if len(partition_indexes) == 0:
return None
- # old code without filter
- # partition_indexes = list(map(str, range(len(partitions))))
- partition = Menu(title, partition_indexes, multi=multiple).run()
+ choice = Menu(title, partition_indexes, multi=multiple).run()
- if partition is not None:
- if isinstance(partition, list):
- return [int(p) for p in partition]
- else:
- return int(partition)
+ if choice.type_ == MenuSelectionType.Esc:
+ return None
- return None
+ if isinstance(choice.value, list):
+ return [int(p) for p in choice.value]
+ else:
+ return int(choice.value)
-def get_default_partition_layout(block_devices: Union['BlockDevice', List['BlockDevice']],
- advanced_options: bool = False) -> Dict[str, Any]:
+def get_default_partition_layout(
+ block_devices: Union['BlockDevice', List['BlockDevice']],
+ advanced_options: bool = False
+) -> Optional[Dict[str, Any]]:
from ..disk import suggest_single_disk_layout, suggest_multi_disk_layout
if len(block_devices) == 1:
@@ -107,14 +123,15 @@ def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
for device in block_devices:
layout = manage_new_and_existing_partitions(device)
-
result[device.path] = layout
return result
-def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50
+def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50
block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]}
+ original_layout = copy.deepcopy(block_device_struct)
+
# Test code: [part.__dump__() for part in block_device.partitions.values()]
# TODO: Squeeze in BTRFS subvolumes here
@@ -129,11 +146,13 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)'))
set_filesystem_partition = str(_('Set desired filesystem for a partition'))
set_btrfs_subvolumes = str(_('Set desired subvolumes on a btrfs partition'))
+ save_and_exit = str(_('Save and exit'))
+ cancel = str(_('Cancel'))
while True:
modes = [new_partition, suggest_partition_layout]
- if len(block_device_struct['partitions']):
+ if len(block_device_struct['partitions']) > 0:
modes += [
delete_partition,
delete_all_partitions,
@@ -143,20 +162,31 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
mark_bootable,
mark_compressed,
set_filesystem_partition,
- set_btrfs_subvolumes,
]
+ indexes = _get_partitions(
+ block_device_struct["partitions"],
+ filter_=lambda x: True if x.get('filesystem', {}).get('format') == 'btrfs' else False
+ )
+
+ if len(indexes) > 0:
+ modes += [set_btrfs_subvolumes]
+
title = _('Select what to do with\n{}').format(block_device)
# show current partition layout:
if len(block_device_struct["partitions"]):
- title += _current_partition_layout(block_device_struct['partitions']) + '\n'
+ title += current_partition_layout(block_device_struct['partitions']) + '\n'
- task = Menu(title, modes, sort=False).run()
+ modes += [save_and_exit, cancel]
- if not task:
- break
+ task = Menu(title, modes, sort=False, skip=False).run()
+ task = task.value
+ if task == cancel:
+ return original_layout
+ elif task == save_and_exit:
+ break
if task == new_partition:
from ..disk import valid_parted_position
@@ -165,7 +195,10 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
# # https://www.gnu.org/software/parted/manual/html_node/mklabel.html
# name = input("Enter a desired name for the partition: ").strip()
- fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run()
+ fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run()
+
+ if fs_choice.type_ == MenuSelectionType.Esc:
+ continue
prompt = _('Enter the start sector (percentage or block number, default: {}): ').format(
block_device.first_free_sector)
@@ -197,7 +230,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
"mountpoint": None,
"wipe": True,
"filesystem": {
- "format": fstype
+ "format": fs_choice.value
}
})
else:
@@ -208,18 +241,15 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
from ..disk import suggest_single_disk_layout
if len(block_device_struct["partitions"]):
- prompt = _('{} contains queued partitions, this will remove those, are you sure?').format(block_device)
- choice = Menu(prompt, ['yes', 'no'], default_option='no').run()
+ prompt = _('{}\ncontains queued partitions, this will remove those, are you sure?').format(block_device)
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run()
- if choice == 'no':
+ if choice.value == Menu.no():
continue
block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path])
-
- elif task is None:
- return block_device_struct
else:
- current_layout = _current_partition_layout(block_device_struct['partitions'], with_idx=True)
+ current_layout = current_partition_layout(block_device_struct['partitions'], with_idx=True)
if task == delete_partition:
title = _('{}\n\nSelect by index which partitions to delete').format(current_layout)
@@ -243,15 +273,14 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
block_device_struct["partitions"][partition]["filesystem"]["mount_options"].append("compress=zstd")
elif task == delete_all_partitions:
block_device_struct["partitions"] = []
+ block_device_struct["wipe"] = True
elif task == assign_mount_point:
title = _('{}\n\nSelect by index which partition to mount where').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
- print(
- _(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.'))
- mountpoint = input(
- _('Select where to mount partition (leave blank to remove mountpoint): ')).strip()
+ print(_(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.'))
+ mountpoint = input(_('Select where to mount partition (leave blank to remove mountpoint): ')).strip()
if len(mountpoint):
block_device_struct["partitions"][partition]['mountpoint'] = mountpoint
@@ -273,14 +302,13 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if not block_device_struct["partitions"][partition].get('filesystem', None):
block_device_struct["partitions"][partition]['filesystem'] = {}
- fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(),
- skip=False).run()
+ fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run()
- block_device_struct["partitions"][partition]['filesystem']['format'] = fstype
+ if fs_choice.type_ == MenuSelectionType.Selection:
+ block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value
# Negate the current wipe marking
- block_device_struct["partitions"][partition][
- 'wipe'] = not block_device_struct["partitions"][partition].get('wipe', False)
+ block_device_struct["partitions"][partition]['wipe'] = not block_device_struct["partitions"][partition].get('wipe', False)
elif task == mark_encrypted:
title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout)
@@ -288,16 +316,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if partition is not None:
# Negate the current encryption marking
- block_device_struct["partitions"][partition][
- 'encrypted'] = not block_device_struct["partitions"][partition].get('encrypted', False)
+ block_device_struct["partitions"][partition]['encrypted'] = \
+ not block_device_struct["partitions"][partition].get('encrypted', False)
elif task == mark_bootable:
title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
- block_device_struct["partitions"][partition][
- 'boot'] = not block_device_struct["partitions"][partition].get('boot', False)
+ block_device_struct["partitions"][partition]['boot'] = \
+ not block_device_struct["partitions"][partition].get('boot', False)
elif task == set_filesystem_partition:
title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout)
@@ -308,16 +336,18 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
block_device_struct["partitions"][partition]['filesystem'] = {}
fstype_title = _('Enter a desired filesystem type for the partition: ')
- fstype = Menu(fstype_title, fs_types(), skip=False).run()
+ fs_choice = Menu(fstype_title, fs_types()).run()
- block_device_struct["partitions"][partition]['filesystem']['format'] = fstype
+ if fs_choice.type_ == MenuSelectionType.Selection:
+ block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value
elif task == set_btrfs_subvolumes:
from .subvolume_config import SubvolumeList
# TODO get preexisting partitions
title = _('{}\n\nSelect which partition to set subvolumes on').format(current_layout)
- partition = select_partition(title, block_device_struct["partitions"],filter=lambda x:True if x.get('filesystem',{}).get('format') == 'btrfs' else False)
+ partition = select_partition(title, block_device_struct["partitions"],filter_=lambda x:True if x.get('filesystem',{}).get('format') == 'btrfs' else False)
+
if partition is not None:
if not block_device_struct["partitions"][partition].get('btrfs', {}):
block_device_struct["partitions"][partition]['btrfs'] = {}
@@ -333,19 +363,30 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
return block_device_struct
+def select_encrypted_partitions(
+ title :str,
+ partitions :List[Partition],
+ multiple :bool = True,
+ filter_ :Callable = None
+) -> Optional[int, List[int]]:
+ partition_indexes = _get_partitions(partitions, filter_)
-def select_encrypted_partitions(block_devices: dict, password: str) -> dict:
- for device in block_devices:
- for partition in block_devices[device]['partitions']:
- if partition.get('mountpoint', None) != '/boot':
- partition['encrypted'] = True
- partition['!password'] = password
+ if len(partition_indexes) == 0:
+ return None
+
+ title = _('Select which partitions to mark for formatting:')
- if not has_mountpoint(partition,'/'):
- # Tell the upcoming steps to generate a key-file for non root mounts.
- partition['generate-encryption-key-file'] = True
+ # show current partition layout:
+ if len(partitions):
+ title += current_partition_layout(partitions) + '\n'
- return block_devices
+ choice = Menu(title, partition_indexes, multi=multiple).run()
- # TODO: Next version perhaps we can support mixed multiple encrypted partitions
- # Users might want to single out a partition for non-encryption to share between dualboot etc.
+ if choice.type_ == MenuSelectionType.Esc:
+ return None
+
+ if isinstance(choice.value, list):
+ for partition_index in choice.value:
+ yield int(partition_index)
+ else:
+ yield (partition_index)
diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py
index c52b97e2..f542bc9b 100644
--- a/archinstall/lib/user_interaction/save_conf.py
+++ b/archinstall/lib/user_interaction/save_conf.py
@@ -5,6 +5,7 @@ from typing import Any, Dict, TYPE_CHECKING
from ..configuration import ConfigurationOutput
from ..menu import Menu
+from ..menu.menu import MenuSelectionType
from ..output import log
if TYPE_CHECKING:
@@ -45,14 +46,16 @@ def save_config(config: Dict):
'all': str(_('Save all'))
}
- selection = Menu(_('Choose which configuration to save'),
- list(options.values()),
- sort=False,
- skip=True,
- preview_size=0.75,
- preview_command=preview).run()
+ choice = Menu(
+ _('Choose which configuration to save'),
+ list(options.values()),
+ sort=False,
+ skip=True,
+ preview_size=0.75,
+ preview_command=preview
+ ).run()
- if not selection:
+ if choice.type_ == MenuSelectionType.Esc:
return
while True:
@@ -62,13 +65,13 @@ def save_config(config: Dict):
break
log(_('Not a valid directory: {}').format(dest_path), fg='red')
- if options['user_config'] == selection:
+ if options['user_config'] == choice.value:
config_output.save_user_config(dest_path)
- elif options['user_creds'] == selection:
+ elif options['user_creds'] == choice.value:
config_output.save_user_creds(dest_path)
- elif options['disk_layout'] == selection:
+ elif options['disk_layout'] == choice.value:
config_output.save_disk_layout(dest_path)
- elif options['all'] == selection:
+ elif options['all'] == choice.value:
config_output.save_user_config(dest_path)
config_output.save_user_creds(dest_path)
- config_output.save_disk_layout
+ config_output.save_disk_layout(dest_path)
diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py
index 0515876b..af783639 100644
--- a/archinstall/lib/user_interaction/subvolume_config.py
+++ b/archinstall/lib/user_interaction/subvolume_config.py
@@ -1,9 +1,11 @@
-from typing import List, Any, Dict
+from typing import Dict, List
from ..menu.list_manager import ListManager
+from ..menu.menu import MenuSelectionType
from ..menu.selection_menu import Selector, GeneralMenu
from ..menu.text_input import TextInput
from ..menu import Menu
+
"""
UI classes
"""
@@ -14,7 +16,7 @@ class SubvolumeList(ListManager):
self.ObjectDefaultAction = str(_('Add'))
super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction)
- def reformat(self, data: Any) -> List[Any]:
+ def reformat(self, data: Dict) -> Dict:
def presentation(key :str, value :Dict):
text = _(" Subvolume :{:16}").format(key)
if isinstance(value,str):
@@ -24,18 +26,20 @@ class SubvolumeList(ListManager):
text += _(" mounted at {:16}").format(value['mountpoint'])
else:
text += (' ' * 28)
+
if value.get('options',[]):
text += _(" with option {}").format(', '.join(value['options']))
return text
- return sorted(list(map(lambda x:presentation(x,data[x]),data)))
+ formatted = {presentation(k, v): k for k, v in data.items()}
+ return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])}
def action_list(self):
return super().action_list()
- def exec_action(self, data: Any):
+ def exec_action(self, data: Dict):
if self.target:
- origkey,origval = list(self.target.items())[0]
+ origkey, origval = list(self.target.items())[0]
else:
origkey = None
@@ -46,13 +50,15 @@ class SubvolumeList(ListManager):
self.target = {}
print(_('\n Fill the desired values for a new subvolume \n'))
with SubvolumeMenu(self.target,self.action) as add_menu:
- for data in ['name','mountpoint','options']:
- add_menu.exec_option(data)
+ for elem in ['name','mountpoint','options']:
+ add_menu.exec_option(elem)
else:
SubvolumeMenu(self.target,self.action).run()
data.update(self.target)
+ return data
+
class SubvolumeMenu(GeneralMenu):
def __init__(self,parameters,action=None):
@@ -124,7 +130,17 @@ class SubvolumeMenu(GeneralMenu):
def _select_subvolume_mount_point(self,value):
return TextInput(str(_("Select a mount point :")),value).run()
- def _select_subvolume_options(self,value):
+ def _select_subvolume_options(self,value) -> List[str]:
# def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True):
- return Menu(str(_("Select the desired subvolume options ")),['nodatacow','compress'],
- skip=True,preset_values=value,multi=True).run()
+ choice = Menu(
+ str(_("Select the desired subvolume options ")),
+ ['nodatacow','compress'],
+ skip=True,
+ preset_values=value,
+ multi=True
+ ).run()
+
+ if choice.type_ == MenuSelectionType.Selection:
+ return choice.value
+
+ return []
diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py
index 0284dc5f..78daa6a5 100644
--- a/archinstall/lib/user_interaction/system_conf.py
+++ b/archinstall/lib/user_interaction/system_conf.py
@@ -6,10 +6,9 @@ from ..disk import all_blockdevices
from ..exceptions import RequirementError
from ..hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics
from ..menu import Menu
+from ..menu.menu import MenuSelectionType
from ..storage import storage
-from ..translation import DeferredTranslation
-
if TYPE_CHECKING:
_: Any
@@ -25,13 +24,22 @@ def select_kernel(preset: List[str] = None) -> List[str]:
kernels = ["linux", "linux-lts", "linux-zen", "linux-pae"]
default_kernel = "linux"
- selected_kernels = Menu(_('Choose which kernels to use or leave blank for default "{}"').format(default_kernel),
- kernels,
- sort=True,
- multi=True,
- preset_values=preset,
- default_option=default_kernel).run()
- return selected_kernels
+ warning = str(_('Are you sure you want to reset this setting?'))
+
+ choice = Menu(
+ _('Choose which kernels to use or leave blank for default "{}"').format(default_kernel),
+ kernels,
+ sort=True,
+ multi=True,
+ preset_values=preset,
+ explode_on_interrupt=True,
+ explode_warning=warning
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Ctrl_c: return []
+ case MenuSelectionType.Selection: return choice.value
def select_harddrives(preset: List[str] = []) -> List[str]:
@@ -49,18 +57,27 @@ def select_harddrives(preset: List[str] = []) -> List[str]:
else:
preset_disks = {}
- selected_harddrive = Menu(_('Select one or more hard drives to use and configure'),
- list(options.keys()),
- preset_values=list(preset_disks.keys()),
- multi=True).run()
+ title = str(_('Select one or more hard drives to use and configure\n'))
+ title += str(_('Any modifications to the existing setting will reset the disk layout!'))
- if selected_harddrive and len(selected_harddrive) > 0:
- return [options[i] for i in selected_harddrive]
+ warning = str(_('If you reset the harddrive selection this will also reset the current disk layout. Are you sure?'))
- return []
+ selected_harddrive = Menu(
+ title,
+ list(options.keys()),
+ preset_values=list(preset_disks.keys()),
+ multi=True,
+ explode_on_interrupt=True,
+ explode_warning=warning
+ ).run()
+ match selected_harddrive.type_:
+ case MenuSelectionType.Ctrl_c: return []
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Selection: return [options[i] for i in selected_harddrive.value]
-def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask: bool = False) -> str:
+
+def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str:
"""
Some what convoluted function, whose job is simple.
Select a graphics driver from a pre-defined set of popular options.
@@ -73,72 +90,85 @@ def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask: bo
if drivers:
arguments = storage.get('arguments', {})
- title = DeferredTranslation('')
+ title = ''
if has_amd_graphics():
- title += _(
+ title += str(_(
'For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.'
- ) + '\n'
+ )) + '\n'
if has_intel_graphics():
- title += _(
+ title += str(_(
'For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n'
- )
+ ))
if has_nvidia_graphics():
- title += _(
+ title += str(_(
'For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n'
- )
+ ))
- if not arguments.get('gfx_driver', None) or force_ask:
- title += _('\n\nSelect a graphics driver or leave blank to install all open-source drivers')
- arguments['gfx_driver'] = Menu(title, drivers).run()
+ title += str(_('\n\nSelect a graphics driver or leave blank to install all open-source drivers'))
+ choice = Menu(title, drivers).run()
- if arguments.get('gfx_driver', None) is None:
- arguments['gfx_driver'] = _("All open-source (default)")
+ if choice.type_ != MenuSelectionType.Selection:
+ return arguments.get('gfx_driver')
- return options.get(arguments.get('gfx_driver'))
+ arguments['gfx_driver'] = choice.value
+ return options.get(choice.value)
raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
def ask_for_bootloader(advanced_options: bool = False, preset: str = None) -> str:
-
if preset == 'systemd-bootctl':
- preset_val = 'systemd-boot' if advanced_options else 'no'
+ preset_val = 'systemd-boot' if advanced_options else Menu.no()
elif preset == 'grub-install':
- preset_val = 'grub' if advanced_options else 'yes'
+ preset_val = 'grub' if advanced_options else Menu.yes()
else:
preset_val = preset
bootloader = "systemd-bootctl" if has_uefi() else "grub-install"
+
if has_uefi():
if not advanced_options:
- bootloader_choice = Menu(_('Would you like to use GRUB as a bootloader instead of systemd-boot?'),
- ['yes', 'no'],
- preset_values=preset_val,
- default_option='no').run()
-
- if bootloader_choice == "yes":
- bootloader = "grub-install"
+ selection = Menu(
+ _('Would you like to use GRUB as a bootloader instead of systemd-boot?'),
+ Menu.yes_no(),
+ preset_values=preset_val,
+ default_option=Menu.no()
+ ).run()
+
+ match selection.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Selection: bootloader = 'grub-install' if selection.value == Menu.yes() else bootloader
else:
# We use the common names for the bootloader as the selection, and map it back to the expected values.
choices = ['systemd-boot', 'grub', 'efistub']
selection = Menu(_('Choose a bootloader'), choices, preset_values=preset_val).run()
- if selection != "":
- if selection == 'systemd-boot':
+
+ value = ''
+ match selection.type_:
+ case MenuSelectionType.Esc: value = preset_val
+ case MenuSelectionType.Selection: value = selection.value
+
+ if value != "":
+ if value == 'systemd-boot':
bootloader = 'systemd-bootctl'
- elif selection == 'grub':
+ elif value == 'grub':
bootloader = 'grub-install'
else:
- bootloader = selection
+ bootloader = value
return bootloader
def ask_for_swap(preset: bool = True) -> bool:
if preset:
- preset_val = 'yes'
+ preset_val = Menu.yes()
else:
- preset_val = 'no'
+ preset_val = Menu.no()
+
prompt = _('Would you like to use swap on zram?')
- choice = Menu(prompt, ['yes', 'no'], default_option='yes', preset_values=preset_val).run()
- return False if choice == 'no' else True
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), preset_values=preset_val).run()
+
+ match choice.type_:
+ case MenuSelectionType.Esc: return preset
+ case MenuSelectionType.Selection: return False if choice.value == Menu.no() else True
diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/user_interaction/utils.py
index 48b55e8c..fa079bc2 100644
--- a/archinstall/lib/user_interaction/utils.py
+++ b/archinstall/lib/user_interaction/utils.py
@@ -28,12 +28,9 @@ def check_password_strong(passwd: str) -> bool:
symbol_count += 40
if symbol_count**len(passwd) < 10e20:
-
- prompt = _("The password you are using seems to be weak,")
- prompt += _("are you sure you want to use it?")
-
- choice = Menu(prompt, ["yes", "no"], default_option="yes").run()
- return choice == "yes"
+ prompt = str(_("The password you are using seems to be weak, are you sure you want to use it?"))
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run()
+ return choice.value == Menu.yes()
return True
@@ -43,7 +40,6 @@ def get_password(prompt: str = '') -> Optional[str]:
prompt = _("Enter a password: ")
while passwd := getpass.getpass(prompt):
-
if len(passwd.strip()) <= 0:
break
@@ -56,6 +52,7 @@ def get_password(prompt: str = '') -> Optional[str]:
continue
return passwd
+
return None
@@ -84,12 +81,13 @@ def do_countdown() -> bool:
if SIG_TRIGGER:
prompt = _('Do you really want to abort?')
- choice = Menu(prompt, ['yes', 'no'], skip=False).run()
- if choice == 'yes':
+ choice = Menu(prompt, Menu.yes_no(), skip=False).run()
+ if choice.value == Menu.yes():
exit(0)
if SIG_TRIGGER is False:
sys.stdin.read()
+
SIG_TRIGGER = False
signal.signal(signal.SIGINT, sig_handler)