index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
diff --git a/archinstall/__init__.py b/archinstall/__init__.py index 9d7e238d..c81a630f 100644 --- a/archinstall/__init__.py +++ b/archinstall/__init__.py @@ -21,6 +21,8 @@ from .lib.storage import * from .lib.systemd import * from .lib.user_interaction import * from .lib.menu import Menu +from .lib.menu.selection_menu import GlobalMenu + parser = ArgumentParser() diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 2be31375..5288f92b 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,14 +1,20 @@ +from __future__ import annotations import os import json import logging import time +from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from ..exceptions import DiskError from ..output import log from ..general import SysCommand from ..storage import storage class BlockDevice: - def __init__(self, path, info=None): + def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: from .helpers import all_disks # If we don't give any information, we need to auto-fill it. @@ -24,32 +30,32 @@ class BlockDevice: # It's actually partition-encryption, but for future-proofing this # I'm placing the encryption password on a BlockDevice level. - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str) -> str: return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})" - def __iter__(self): + def __iter__(self) -> Iterator[Partition]: for partition in self.partitions: yield self.partitions[partition] - def __getitem__(self, key, *args, **kwargs): + def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: if key not in self.info: raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] - def __len__(self): + def __len__(self) -> int: return len(self.partitions) - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :'BlockDevice') -> bool: return self.path < left_comparitor.path - def json(self): + def json(self) -> str: """ json() has precedence over __dump__, so this is a way to give less/partial information for user readability. """ return self.path - def __dump__(self): + def __dump__(self) -> Dict[str, Dict[str, Any]]: return { self.path : { 'partuuid' : self.uuid, @@ -59,14 +65,14 @@ class BlockDevice: } @property - def partition_type(self): + def partition_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['pttype'] @property - def device_or_backfile(self): + def device_or_backfile(self) -> str: """ Returns the actual device-endpoint of the BlockDevice. If it's a loop-back-device it returns the back-file, @@ -82,7 +88,7 @@ class BlockDevice: return self.device @property - def device(self): + def device(self) -> str: """ Returns the device file of the BlockDevice. If it's a loop-back-device it returns the /dev/X device, @@ -108,7 +114,7 @@ class BlockDevice: # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @property - def partitions(self): + def partitions(self) -> Dict[str, Partition]: from .filesystem import Partition self.partprobe() @@ -133,17 +139,19 @@ class BlockDevice: return {k: self.part_cache[k] for k in sorted(self.part_cache)} @property - def partition(self): + def partition(self) -> Partition: all_partitions = self.partitions return [all_partitions[k] for k in all_partitions] @property - def partition_table_type(self): + def partition_table_type(self) -> int: + # TODO: Don't hardcode :) + # Remove if we don't use this function anywhere from .filesystem import GPT return GPT @property - def uuid(self): + def uuid(self) -> str: log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') """ Returns the disk UUID as returned by lsblk. @@ -153,7 +161,7 @@ class BlockDevice: return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') @property - def size(self): + def size(self) -> float: from .helpers import convert_size_to_gb output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) @@ -162,21 +170,21 @@ class BlockDevice: return convert_size_to_gb(device['size']) @property - def bus_type(self): + def bus_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['tran'] @property - def spinning(self): + def spinning(self) -> bool: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['rota'] is True @property - def free_space(self): + def free_space(self) -> Tuple[str, str, str]: # NOTE: parted -s will default to `cancel` on prompt, skipping any partition # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, # so the free will ignore the ESP partition and just give the "free" space. @@ -187,7 +195,7 @@ class BlockDevice: yield (start, end, size) @property - def largest_free_space(self): + def largest_free_space(self) -> List[str]: info = [] for space_info in self.free_space: if not info: @@ -199,7 +207,7 @@ class BlockDevice: return info @property - def first_free_sector(self): + def first_free_sector(self) -> str: if info := self.largest_free_space: start = info[0] else: @@ -207,29 +215,29 @@ class BlockDevice: return start @property - def first_end_sector(self): + def first_end_sector(self) -> str: if info := self.largest_free_space: end = info[1] else: end = f"{self.size}GB" return end - def partprobe(self): - SysCommand(['partprobe', self.path]) + def partprobe(self) -> bool: + return SysCommand(['partprobe', self.path]).exit_code == 0 - def has_partitions(self): + def has_partitions(self) -> int: return len(self.partitions) - def has_mount_point(self, mountpoint): + def has_mount_point(self, mountpoint :str) -> bool: for partition in self.partitions: if self.partitions[partition].mountpoint == mountpoint: return True return False - def flush_cache(self): + def flush_cache(self) -> None: self.part_cache = {} - def get_partition(self, uuid): + def get_partition(self, uuid :str) -> Partition: count = 0 while count < 5: for partition_uuid, partition in self.partitions.items(): diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py index fb9712f8..ad8d0a52 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs.py @@ -1,23 +1,30 @@ +from __future__ import annotations import pathlib import glob import logging -from typing import Union +from typing import Union, Dict, TYPE_CHECKING + +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from ..installer import Installer from .helpers import get_mount_info from ..exceptions import DiskError from ..general import SysCommand from ..output import log -from .partition import Partition -def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: +def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: """ This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name. @installation: archinstall.Installer instance @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot @force: overrides the check for weither or not the subvolume mountpoint is empty or not - """ + This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure. + Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name] + """ + log("function btrfs.mount_subvolume DEPRECATED. See code for alternatives",fg="yellow",level=logging.WARNING) installation_mountpoint = installation.target if type(installation_mountpoint) == str: installation_mountpoint = pathlib.Path(installation_mountpoint) @@ -42,7 +49,7 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0 -def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) -> bool: +def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -75,22 +82,38 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: raise DiskError(f"Could not create a subvolume at {target}: {cmd}") -def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None): +def _has_option(option :str,options :list) -> bool: + """ auxiliary routine to check if an option is present in a list. + we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...) + """ + if not options: + return False + for item in options: + if option in item: + return True + return False + +def manage_btrfs_subvolumes(installation :Installer, + partition :Dict[str, str],) -> list: + from copy import deepcopy """ we do the magic with subvolumes in a centralized place parameters: * the installation object * the partition dictionary entry which represents the physical partition - * mountpoinst, the dictionary which contains all the partititon to be mounted - * subvolumes is the dictionary with the names of the subvolumes and its location + returns + * mountpoinst, the list which contains all the "new" partititon to be mounted + We expect the partition has been mounted as / , and it to be unmounted after the processing Then we create all the subvolumes inside btrfs as demand We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs - Then we add it them to the mountpoints dictionary to be processed as "normal" partitions + Then we return a list of "new" partitions to be processed as "normal" partitions # TODO For encrypted devices we need some special processing prior to it """ # We process each of the pairs <subvolume name: mount point | None | mount info dict> # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list # of mount options (or similar used by brtfs) + mountpoints = [] + subvolumes = partition['btrfs']['subvolumes'] for name, right_hand in subvolumes.items(): try: # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use @@ -98,7 +121,7 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su name = name[1:] # renormalize the right hand. location = None - mount_options = [] + subvol_options = [] # no contents, so it is not to be mounted if not right_hand: location = None @@ -108,38 +131,37 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿? elif isinstance(right_hand,dict): location = right_hand.get('mountpoint',None) - mount_options = right_hand.get('options',[]) + subvol_options = right_hand.get('options',[]) # we create the subvolume create_subvolume(installation,name) # Make the nodatacow processing now # It will be the main cause of creation of subvolumes which are not to be mounted # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in mount_options: + if 'nodatacow' in subvol_options: if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") # entry is deleted so nodatacow doesn't propagate to the mount options - del mount_options[mount_options.index('nodatacow')] + del subvol_options[subvol_options.index('nodatacow')] # Make the compress processing now # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new # in this way only zstd compression is activaded # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in mount_options: - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del mount_options[mount_options.index('compress')] + if 'compress' in subvol_options: + if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])): + if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so compress doesn't propagate to the mount options + del subvol_options[subvol_options.index('compress')] # END compress processing. # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume if not partition['mountpoint'] and location is not None: # we begin to create a fake partition entry. First we copy the original -the one that corresponds to - # the primary partition - fake_partition = partition.copy() + # the primary partition. We make a deepcopy to avoid altering the original content in any case + fake_partition = deepcopy(partition) # we start to modify entries in the "fake partition" to match the needs of the subvolumes - # # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy - # and reset the encryption parameters del fake_partition['btrfs'] fake_partition['encrypted'] = False fake_partition['generate-encryption-key-file'] = False @@ -147,22 +169,16 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su fake_partition['mountpoint'] = location # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path. fake_partition['subvolume'] = name - # here we add the mount options - fake_partition['options'] = mount_options - # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. - # We instanciate a new object with following attributes coming / adapted from the instance which was in the primary partition entry (the one we are coping - partition['device_instance'] - # * path, which will be expanded with the subvolume name to use the bind mount syntax the system uses for naming mounted subvolumes - # * size. When the OS queries all the subvolumes share the same size as the full partititon - # * uuid. All the subvolumes on a partition share the same uuid - if not unlocked_device: - fake_partition['device_instance'] = Partition(f"{partition['device_instance'].path}[/{name}]",partition['device_instance'].size,partition['device_instance'].uuid) + # here we add the special mount options for the subvolume, if any. + # if the original partition['options'] is not a list might give trouble + if fake_partition.get('filesystem',{}).get('mount_options',[]): + fake_partition['filesystem']['mount_options'].extend(subvol_options) else: - # for subvolumes IN an encrypted partition we make our device instance from unlocked device instead of the raw partition. - # This time we make a copy (we should to the same above TODO) and alter the path by hand - from copy import copy - # KIDS DONT'T DO THIS AT HOME - fake_partition['device_instance'] = copy(unlocked_device) - fake_partition['device_instance'].path = f"{unlocked_device.path}[/{name}]" + fake_partition['filesystem']['mount_options'] = subvol_options + # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. + # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes + # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless + fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" # we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'. # If i don't reset it, process will abort as "already mounted' . # TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice @@ -170,9 +186,7 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su # # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, # as "normal" ones - mountpoints[fake_partition['mountpoint']] = fake_partition + mountpoints.append(fake_partition) except Exception as e: raise e - # if the physical partition has been selected to be mounted, we include it at the list. Remmeber, all the above treatement won't happen except the creation of the subvolume - if partition['mountpoint']: - mountpoints[partition['mountpoint']] = partition + return mountpoints diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 51ef949b..3b09ec6c 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,7 +1,13 @@ +from __future__ import annotations import time import logging import json import pathlib +from typing import Optional, Dict, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .partition import Partition from .validators import valid_fs_type from ..exceptions import DiskError @@ -16,24 +22,25 @@ class Filesystem: # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them # as well as close any crypto handles. - def __init__(self, blockdevice, mode): + def __init__(self, blockdevice :BlockDevice, mode :int): self.blockdevice = blockdevice self.mode = mode - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': return self - def __repr__(self): + def __repr__(self) -> str: return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] + SysCommand('sync') return True - def partuuid_to_index(self, uuid): + def partuuid_to_index(self, uuid :str) -> Optional[int]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() time.sleep(5) @@ -50,7 +57,7 @@ class Filesystem: raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") - def load_layout(self, layout :dict): + def load_layout(self, layout :Dict[str, Any]) -> None: from ..luks import luks2 # If the layout tells us to wipe the drive, we do so @@ -83,6 +90,8 @@ class Filesystem: raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition_uuid)}).") if partition.get('filesystem', {}).get('format', False): + # needed for backward compatibility with the introduction of the new "format_options" + format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[]) if partition.get('encrypted', False): if not partition.get('!password'): if not storage['arguments'].get('!encryption-password'): @@ -93,15 +102,12 @@ class Filesystem: storage['arguments']['!encryption-password'] = get_password(f"Enter a encryption password for {partition['device_instance']}") partition['!password'] = storage['arguments']['!encryption-password'] - # to be able to generate an unique name in case the partition will not be mounted + if partition.get('mountpoint',None): - ppath = partition['mountpoint'] + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: - ppath = partition['device_instance'].path - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop" - + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" partition['device_instance'].encrypt(password=partition['!password']) - # Immediately unlock the encrypted device to format the inner volume with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=True) as unlocked_device: if not partition.get('format'): @@ -119,29 +125,29 @@ class Filesystem: continue break - unlocked_device.format(partition['filesystem']['format'], options=partition.get('options', [])) + unlocked_device.format(partition['filesystem']['format'], options=format_options) elif partition.get('format', False): - partition['device_instance'].format(partition['filesystem']['format'], options=partition.get('options', [])) + partition['device_instance'].format(partition['filesystem']['format'], options=format_options) if partition.get('boot', False): log(f"Marking partition {partition['device_instance']} as bootable.") self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on') - def find_partition(self, mountpoint): + def find_partition(self, mountpoint :str) -> Partition: for partition in self.blockdevice: if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: return partition - def partprobe(self): - SysCommand(f'bash -c "partprobe"') + def partprobe(self) -> bool: + return SysCommand(f'bash -c "partprobe"').exit_code == 0 - def raw_parted(self, string: str): + def raw_parted(self, string: str) -> SysCommand: if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0: log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red") time.sleep(0.5) return cmd_handle - def parted(self, string: str): + def parted(self, string: str) -> bool: """ Performs a parted execution of the given string @@ -149,16 +155,17 @@ class Filesystem: :type string: str """ if (parted_handle := self.raw_parted(string)).exit_code == 0: - self.partprobe() - return True + if self.partprobe(): + return True + return False else: raise DiskError(f"Parted failed to add a partition: {parted_handle}") - def use_entire_disk(self, root_filesystem_type='ext4') -> Partition: + def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type, start, end, partition_format=None): + def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()} @@ -197,14 +204,14 @@ class Filesystem: log("Add partition is exiting due to excessive wait time",level=logging.INFO) raise DiskError(f"New partition never showed up after adding new partition on {self}.") - def set_name(self, partition: int, name: str): + def set_name(self, partition: int, name: str) -> bool: return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 - def set(self, partition: int, string: str): + def set(self, partition: int, string: str) -> bool: log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 - def parted_mklabel(self, device: str, disk_label: str): + def parted_mklabel(self, device: str, disk_label: str) -> bool: log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") # Try to unmount devices before attempting to run mklabel try: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index ba29744f..e9f6bc10 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -1,10 +1,15 @@ +from __future__ import annotations import json import logging import os import pathlib import re import time -from typing import Union +from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from .blockdevice import BlockDevice from ..exceptions import SysCallError, DiskError from ..general import SysCommand @@ -14,10 +19,10 @@ from ..storage import storage ROOT_DIR_PATTERN = re.compile('^.*?/devices') GIGA = 2 ** 30 -def convert_size_to_gb(size): +def convert_size_to_gb(size :Union[int, float]) -> float: return round(size / GIGA,1) -def sort_block_devices_based_on_performance(block_devices): +def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]: result = {device: 0 for device in block_devices} for device, weight in result.items(): @@ -35,12 +40,12 @@ def sort_block_devices_based_on_performance(block_devices): return result -def filter_disks_below_size_in_gb(devices, gigabytes): +def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]: for disk in devices: if disk.size >= gigabytes: yield disk -def select_largest_device(devices, gigabytes, filter_out=None): +def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -56,7 +61,7 @@ def select_largest_device(devices, gigabytes, filter_out=None): return max(copy_devices, key=(lambda device : device.size)) -def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): +def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -70,7 +75,7 @@ def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) -def convert_to_gigabytes(string): +def convert_to_gigabytes(string :str) -> float: unit = string.strip()[-1] size = float(string.strip()[:-1]) @@ -81,7 +86,7 @@ def convert_to_gigabytes(string): return size -def device_state(name, *args, **kwargs): +def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: @@ -99,7 +104,7 @@ def device_state(name, *args, **kwargs): return True # lsblk --json -l -n -o path -def all_disks(*args, **kwargs): +def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]: kwargs.setdefault("partitions", False) drives = {} @@ -113,7 +118,7 @@ def all_disks(*args, **kwargs): return drives -def harddrive(size=None, model=None, fuzzy=False): +def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: collection = all_disks() for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: @@ -133,7 +138,7 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list: bind_path = None return device_path,bind_path -def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict: +def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: device_path,bind_path = split_bind_name(path) for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: @@ -170,7 +175,7 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p return {} -def get_partitions_in_use(mountpoint) -> list: +def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: @@ -193,7 +198,7 @@ def get_partitions_in_use(mountpoint) -> list: return mounts -def get_filesystem_type(path): +def get_filesystem_type(path :str) -> Optional[str]: device_name, bind_name = split_bind_name(path) try: return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip() @@ -201,10 +206,10 @@ def get_filesystem_type(path): return None -def disk_layouts(): +def disk_layouts() -> Optional[Dict[str, Any]]: try: if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0: - return json.loads(handle.decode('UTF-8')) + return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()} else: log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow") return None @@ -216,20 +221,22 @@ def disk_layouts(): return None -def encrypted_partitions(blockdevices :dict) -> bool: +def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: for partition in blockdevices.values(): if partition.get('encrypted', False): yield partition -def find_partition_by_mountpoint(block_devices, relative_mountpoint :str): +def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: for partition in block_devices[device]['partitions']: if partition.get('mountpoint', None) == relative_mountpoint: return partition -def partprobe(): - SysCommand(f'bash -c "partprobe"') - time.sleep(5) +def partprobe() -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk + return True + return False def convert_device_to_uuid(path :str) -> str: device_name, bind_name = split_bind_name(path) diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index bb6f2d53..56ead68c 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -5,7 +5,8 @@ import logging import json import os import hashlib -from typing import Optional +from typing import Optional, Dict, Any, List, Union + from .blockdevice import BlockDevice from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage @@ -15,7 +16,15 @@ from ..general import SysCommand class Partition: - def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + def __init__(self, + path: str, + block_device: BlockDevice, + part_id :Optional[str] = None, + filesystem :Optional[str] = None, + mountpoint :Optional[str] = None, + encrypted :bool = False, + autodetect_filesystem :bool = True): + if not part_id: part_id = os.path.basename(path) @@ -50,14 +59,16 @@ class Partition: if self.filesystem == 'crypto_LUKS': self.encrypted = True - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path else: left_comparitor = str(left_comparitor) - return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. - def __repr__(self, *args, **kwargs): + # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 + return self.path < left_comparitor + + def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' if self.mountpoint: mount_repr = f", mounted={self.mountpoint}" @@ -69,7 +80,7 @@ class Partition: else: return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - def __dump__(self): + def __dump__(self) -> Dict[str, Any]: return { 'type': 'primary', 'PARTUUID': self._safe_uuid, @@ -86,14 +97,14 @@ class Partition: } @property - def sector_size(self): + def sector_size(self) -> Optional[int]: output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) for device in output['blockdevices']: return device.get('log-sec', None) @property - def start(self): + def start(self) -> Optional[str]: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) for partition in output.get('partitiontable', {}).get('partitions', []): @@ -101,7 +112,7 @@ class Partition: return partition['start'] # * self.sector_size @property - def end(self): + def end(self) -> Optional[str]: # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) @@ -110,7 +121,7 @@ class Partition: return partition['size'] # * self.sector_size @property - def size(self): + def size(self) -> Optional[float]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() @@ -123,7 +134,7 @@ class Partition: time.sleep(storage['DISK_TIMEOUTS']) @property - def boot(self): + def boot(self) -> bool: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) # Get the bootable flag from the sfdisk output: @@ -143,7 +154,7 @@ class Partition: return False @property - def partition_type(self): + def partition_type(self) -> Optional[str]: lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) for device in lsblk['blockdevices']: @@ -179,19 +190,19 @@ class Partition: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() @property - def encrypted(self): + def encrypted(self) -> Union[bool, None]: return self._encrypted @encrypted.setter - def encrypted(self, value: bool): + def encrypted(self, value: bool) -> None: self._encrypted = value @property - def parent(self): + def parent(self) -> str: return self.real_device @property - def real_device(self): + def real_device(self) -> str: for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): return f"/dev/{parent}" @@ -199,25 +210,27 @@ class Partition: return self.path @property - def device_path(self): + def device_path(self) -> str: """ for bind mounts returns the phisical path of the partition """ device_path, bind_name = split_bind_name(self.path) return device_path @property - def bind_name(self): + def bind_name(self) -> str: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ device_path, bind_name = split_bind_name(self.path) return bind_name - def partprobe(self): - SysCommand(f'bash -c "partprobe"') - time.sleep(1) + def partprobe(self) -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(1) + return True + return False - def detect_inner_filesystem(self, password): + def detect_inner_filesystem(self, password :str) -> Optional[str]: log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) from ..luks import luks2 @@ -227,7 +240,7 @@ class Partition: except SysCallError: return None - def has_content(self): + def has_content(self) -> bool: fs_type = get_filesystem_type(self.path) if not fs_type or "swap" in fs_type: return False @@ -248,7 +261,7 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args, **kwargs): + def encrypt(self, *args :str, **kwargs :str) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ @@ -257,7 +270,7 @@ class Partition: handle = luks2(self, None, None) return handle.encrypt(self, *args, **kwargs) - def format(self, filesystem=None, path=None, log_formatting=True, options=[]): + def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool: """ Format can be given an overriding path, for instance /dev/null to test the formatting functionality and in essence the support for the given filesystem. @@ -286,9 +299,8 @@ class Partition: elif filesystem == 'fat32': options = ['-F32'] + options - mkfs = SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}").decode('UTF-8') - if ('mkfs.fat' not in mkfs and 'mkfs.vfat' not in mkfs) or 'command not found' in mkfs: - raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}") + if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") self.filesystem = filesystem elif filesystem == 'ext4': @@ -342,7 +354,7 @@ class Partition: return True - def find_parent_of(self, data, name, parent=None): + def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: if data['name'] == name: return parent elif 'children' in data: @@ -350,7 +362,7 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent - def mount(self, target, fs=None, options=''): + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: @@ -386,25 +398,24 @@ class Partition: self.mountpoint = target return True - def unmount(self): - try: - SysCommand(f"/usr/bin/umount {self.path}") - except SysCallError as err: - exit_code = err.exit_code - - # Without to much research, it seams that low error codes are errors. - # And above 8k is indicators such as "/dev/x not mounted.". - # So anything in between 0 and 8k are errors (?). - if 0 < exit_code < 8000: - raise err + return False + + def unmount(self) -> bool: + worker = SysCommand(f"/usr/bin/umount {self.path}") + + # Without to much research, it seams that low error codes are errors. + # And above 8k is indicators such as "/dev/x not mounted.". + # So anything in between 0 and 8k are errors (?). + if 0 < worker.exit_code < 8000: + raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) self.mountpoint = None return True - def umount(self): + def umount(self) -> bool: return self.unmount() - def filesystem_supported(self): + def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling partition.format() with a path set to '/dev/null' which returns two exceptions: @@ -420,7 +431,7 @@ class Partition: return True -def get_mount_fs_type(fs): +def get_mount_fs_type(fs :str) -> str: if fs == 'ntfs': return 'ntfs3' # Needed to use the Paragon R/W NTFS driver elif fs == 'fat32': diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 8dbd15dd..a90ac506 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -1,14 +1,25 @@ +from __future__ import annotations import logging +from typing import Optional, Dict, Any, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to +from ..hardware import has_uefi from ..output import log -def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_options=False): +def suggest_single_disk_layout(block_device :BlockDevice, + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb + MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB using_subvolumes = False + using_home_partition = False if default_filesystem == 'btrfs': using_subvolumes = input('Would you like to use BTRFS subvolumes with a default structure? (Y/n): ').strip().lower() in ('', 'y', 'yes') @@ -20,11 +31,19 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o } } + # Used for reference: https://wiki.archlinux.org/title/partitioning + + # 2 MiB is unallocated for GRUB on BIOS. Potentially unneeded for + # other bootloaders? + + # TODO: On BIOS, /boot partition is only needed if the drive will + # be encrypted, otherwise it is not recommended. We should probably + # add a check for whether the drive will be encrypted or not. layout[block_device.path]['partitions'].append({ # Boot "type" : "primary", - "start" : "5MB", - "size" : "513MB", + "start" : "3MiB", + "size" : "203MiB", "boot" : True, "encrypted" : False, "format" : True, @@ -33,10 +52,18 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o "format" : "fat32" } }) + + # Increase the UEFI partition if UEFI is detected. + # Also re-align the start to 1MiB since we don't need the first sectors + # like we do in MBR layouts where the boot loader is installed traditionally. + if has_uefi(): + layout[block_device.path]['partitions'][-1]['start'] = '1MiB' + layout[block_device.path]['partitions'][-1]['size'] = '512MiB' + layout[block_device.path]['partitions'].append({ # Root "type" : "primary", - "start" : "518MB", + "start" : "206MiB", "encrypted" : False, "format" : True, "mountpoint" : "/", @@ -45,13 +72,20 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o } }) + if has_uefi(): + layout[block_device.path]['partitions'][-1]['start'] = '513MiB' + + if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: + using_home_partition = input('Would you like to create a separate partition for /home? (Y/n): ').strip().lower() in ('', 'y', 'yes') + # Set a size for / (/root) - if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART: + if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition: # We'll use subvolumes # Or the disk size is too small to allow for a separate /home + # Or the user doesn't want to create a separate partition for /home layout[block_device.path]['partitions'][-1]['size'] = '100%' else: - layout[block_device.path]['partitions'][-1]['size'] = f"{min(block_device.size, 20)}GB" + layout[block_device.path]['partitions'][-1]['size'] = f"{min(block_device.size, 20)}GiB" if default_filesystem == 'btrfs' and using_subvolumes: # if input('Do you want to use a recommended structure? (Y/n): ').strip().lower() in ('', 'y', 'yes'): @@ -69,17 +103,17 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o # else: # pass # ... implement a guided setup - elif block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: + elif using_home_partition: # If we don't want to use subvolumes, # But we want to be able to re-use data between re-installs.. # A second partition for /home would be nice if we have the space for it layout[block_device.path]['partitions'].append({ # Home "type" : "primary", + "start" : f"{min(block_device.size, 20)}GiB", + "size" : "100%", "encrypted" : False, "format" : True, - "start" : f"{min(block_device.size+0.5, 20.5)}GB", - "size" : "100%", "mountpoint" : "/home", "filesystem" : { "format" : default_filesystem @@ -89,7 +123,10 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o return layout -def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_options=False): +def suggest_multi_disk_layout(block_devices :List[BlockDevice], + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) @@ -98,8 +135,8 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_o # https://www.reddit.com/r/btrfs/comments/m287gp/partition_strategy_for_two_physical_disks/ # https://www.reddit.com/r/btrfs/comments/9us4hr/what_is_your_btrfs_partitionsubvolumes_scheme/ - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb - ARCH_LINUX_INSTALLED_SIZE = 20 # Gb, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size? + MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB + ARCH_LINUX_INSTALLED_SIZE = 20 # GiB, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size? block_devices = sort_block_devices_based_on_performance(block_devices).keys() @@ -119,11 +156,13 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_o }, } + # TODO: Same deal as with the single disk layout, we should + # probably check if the drive will be encrypted. layout[root_device.path]['partitions'].append({ # Boot "type" : "primary", - "start" : "5MB", - "size" : "513MB", + "start" : "3MiB", + "size" : "203MiB", "boot" : True, "encrypted" : False, "format" : True, @@ -132,26 +171,33 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_o "format" : "fat32" } }) + + if has_uefi(): + layout[root_device.path]['partitions'][-1]['start'] = '1MiB' + layout[root_device.path]['partitions'][-1]['size'] = '512MiB' + layout[root_device.path]['partitions'].append({ # Root "type" : "primary", - "start" : "518MB", + "start" : "206MiB", + "size" : "100%", "encrypted" : False, "format" : True, - "size" : "100%", "mountpoint" : "/", "filesystem" : { "format" : default_filesystem } }) + if has_uefi(): + layout[root_device.path]['partitions'][-1]['start'] = '513MiB' layout[home_device.path]['partitions'].append({ # Home "type" : "primary", + "start" : "1MiB", + "size" : "100%", "encrypted" : False, "format" : True, - "start" : "5MB", - "size" : "100%", "mountpoint" : "/home", "filesystem" : { "format" : default_filesystem diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index 464f0d73..fd1b7f33 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -1,4 +1,6 @@ -def valid_parted_position(pos :str): +from typing import List + +def valid_parted_position(pos :str) -> bool: if not len(pos): return False @@ -17,7 +19,7 @@ def valid_parted_position(pos :str): return False -def fs_types(): +def fs_types() -> List[str]: # https://www.gnu.org/software/parted/manual/html_node/mkpart.html # Above link doesn't agree with `man parted` /mkpart documentation: """ diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py index aa86124b..783bc9c5 100644 --- a/archinstall/lib/exceptions.py +++ b/archinstall/lib/exceptions.py @@ -17,12 +17,16 @@ class ProfileError(BaseException): class SysCallError(BaseException): - def __init__(self, message :str, exit_code :Optional[int]) -> None: + def __init__(self, message :str, exit_code :Optional[int] = None) -> None: super(SysCallError, self).__init__(message) self.message = message self.exit_code = exit_code +class PermissionError(BaseException): + pass + + class ProfileNotFound(BaseException): pass diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index cc50e80a..680e41cd 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -1,3 +1,4 @@ +from __future__ import annotations import hashlib import json import logging @@ -9,7 +10,10 @@ import string import sys import time from datetime import datetime, date -from typing import Callable, Optional, Dict, Any, List, Union, Iterator +from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .installer import Installer if sys.platform == 'linux': from select import epoll, EPOLLIN, EPOLLHUP @@ -46,14 +50,14 @@ from .exceptions import RequirementError, SysCallError from .output import log from .storage import storage -def gen_uid(entropy_length=256): +def gen_uid(entropy_length :int = 256) -> str: return hashlib.sha512(os.urandom(entropy_length)).hexdigest() -def generate_password(length=64): +def generate_password(length :int = 64) -> str: haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace return ''.join(secrets.choice(haystack) for i in range(length)) -def multisplit(s, splitters): +def multisplit(s :str, splitters :List[str]) -> str: s = [s, ] for key in splitters: ns = [] @@ -77,12 +81,12 @@ def locate_binary(name :str) -> str: raise RequirementError(f"Binary {name} does not exist.") -def json_dumps(*args, **kwargs): +def json_dumps(*args :str, **kwargs :str) -> str: return json.dumps(*args, **{**kwargs, 'cls': JSON}) class JsonEncoder: @staticmethod - def _encode(obj): + def _encode(obj :Any) -> Any: """ This JSON encoder function will try it's best to convert any archinstall data structures, instances or variables into @@ -119,7 +123,7 @@ class JsonEncoder: return obj @staticmethod - def _unsafe_encode(obj): + def _unsafe_encode(obj :Any) -> Any: """ Same as _encode() but it keeps dictionary keys starting with ! """ @@ -141,20 +145,20 @@ class JSON(json.JSONEncoder, json.JSONDecoder): """ A safe JSON encoder that will omit private information in dicts (starting with !) """ - def _encode(self, obj): + def _encode(self, obj :Any) -> Any: return JsonEncoder._encode(obj) - def encode(self, obj): + def encode(self, obj :Any) -> Any: return super(JSON, self).encode(self._encode(obj)) class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder): """ UNSAFE_JSON will call/encode and keep private information in dicts (starting with !) """ - def _encode(self, obj): + def _encode(self, obj :Any) -> Any: return JsonEncoder._unsafe_encode(obj) - def encode(self, obj): + def encode(self, obj :Any) -> Any: return super(UNSAFE_JSON, self).encode(self._encode(obj)) class SysCommandWorker: @@ -184,7 +188,8 @@ class SysCommandWorker: self.cmd = cmd self.callbacks = callbacks self.peak_output = peak_output - self.environment_vars = environment_vars + # define the standard locale for command outputs. For now the C ascii one. Can be overriden + self.environment_vars = {'LC_ALL':'C' , **environment_vars} self.logfile = logfile self.working_directory = working_directory @@ -364,7 +369,7 @@ class SysCommand: peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, working_directory :Optional[str] = './'): - + _callbacks = {} if callbacks: for hook, func in callbacks.items(): @@ -455,12 +460,16 @@ class SysCommand: return None -def prerequisite_check(): - if not os.path.isdir("/sys/firmware/efi"): - raise RequirementError("Archinstall only supports machines in UEFI mode.") +def prerequisite_check() -> bool: + """ + This function is used as a safety check before + continuing with an installation. - return True + Could be anything from checking that /boot is big enough + to check if nvidia hardware exists when nvidia driver was chosen. + """ + return True def reboot(): SysCommand("/usr/bin/reboot") @@ -473,12 +482,15 @@ def pid_exists(pid: int) -> bool: return False -def run_custom_user_commands(commands, installation): +def run_custom_user_commands(commands :List[str], installation :Installer) -> None: for index, command in enumerate(commands): - log(f'Executing custom command "{command}" ...', fg='yellow') + log(f'Executing custom command "{command}" ...', level=logging.INFO) + with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script: temp_script.write(command) + execution_output = SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh") + log(execution_output) os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh") diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index c02d5717..6e4a6193 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -1,5 +1,4 @@ import time -from typing import Union import logging import os import shutil @@ -7,6 +6,8 @@ import shlex import pathlib import subprocess import glob +from types import ModuleType +from typing import Union, Dict, Any, List, Optional, Iterator, Mapping from .disk import get_partitions_in_use, Partition from .general import SysCommand, generate_password from .hardware import has_uefi, is_vm, cpu_vendor @@ -30,29 +31,29 @@ __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] class InstallationFile: - def __init__(self, installation, filename, owner, mode="w"): + def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"): self.installation = installation self.filename = filename self.owner = owner self.mode = mode self.fh = None - def __enter__(self): + def __enter__(self) -> 'InstallationFile': self.fh = open(self.filename, self.mode) return self - def __exit__(self, *args): + def __exit__(self, *args :str) -> None: self.fh.close() self.installation.chown(self.owner, self.filename) - def write(self, data: Union[str, bytes]): + def write(self, data: Union[str, bytes]) -> int: return self.fh.write(data) - def read(self, *args): + def read(self, *args) -> Union[str, bytes]: return self.fh.read(*args) - def poll(self, *args): - return self.fh.poll(*args) +# def poll(self, *args) -> bool: +# return self.fh.poll(*args) def accessibility_tools_in_use() -> bool: @@ -84,11 +85,12 @@ class Installer: """ - def __init__(self, target, *, base_packages=None, kernels=None): + def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None): if base_packages is None: base_packages = __packages__[:3] if kernels is None: kernels = ['linux'] + self.kernels = kernels self.target = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') @@ -119,18 +121,17 @@ class Installer: self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"] self.KERNEL_PARAMS = [] - def log(self, *args, level=logging.DEBUG, **kwargs): + def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str): """ installer.log() wraps output.log() mainly to set a default log-level for this install session. Any manual override can be done per log() call. """ log(*args, level=level, **kwargs) - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Installer': return self - def __exit__(self, *args, **kwargs): - # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. + def __exit__(self, *args :str, **kwargs :str) -> None: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: @@ -163,10 +164,10 @@ class Installer: return False @property - def partitions(self): + def partitions(self) -> List[Partition]: return get_partitions_in_use(self.target) - def sync_log_to_install_medium(self): + def sync_log_to_install_medium(self) -> bool: # Copy over the install log (if there is one) to the install medium if # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. if self.helper_flags.get('base-strapped', False) is True: @@ -180,90 +181,111 @@ class Installer: return True - def mount_ordered_layout(self, layouts: dict): - from .luks import luks2 - - mountpoints = {} - for blockdevice in layouts: - for partition in layouts[blockdevice]['partitions']: - if (subvolumes := partition.get('btrfs', {}).get('subvolumes', {})): - if partition.get('encrypted',False): - if partition.get('mountpoint',None): - ppath = partition['mountpoint'] - else: - ppath = partition['device_instance'].path - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop" - # Immediately unlock the encrypted device to format the inner volume - with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=False) as unlocked_device: - unlocked_device.mount(f"{self.target}/") - try: - manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes,unlocked_device) - except Exception as e: - # every exception unmounts the physical volume. Otherwise we let the system in an unstable state - unlocked_device.unmount() - raise e - unlocked_device.unmount() - # TODO generate key - else: - self.mount(partition['device_instance'],"/") - try: - manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes) - except Exception as e: - # every exception unmounts the physical volume. Otherwise we let the system in an unstable state - partition['device_instance'].unmount() - raise e - partition['device_instance'].unmount() - else: - mountpoints[partition['mountpoint']] = partition - for mountpoint in sorted([mnt_dest for mnt_dest in mountpoints.keys() if mnt_dest is not None]): - partition = mountpoints[mountpoint] - if partition.get('encrypted', False) and not partition.get('subvolume',None): - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" - if not (password := partition.get('!password', None)): - raise RequirementError(f"Missing mountpoint {mountpoint} encryption password in layout: {partition}") - - with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: - if partition.get('generate-encryption-key-file'): - if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): - cryptkey_dir.mkdir(parents=True) - - # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key - # if we name the device to "xyzloop". - encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" - with open(f"{self.target}{encryption_key_path}", "w") as keyfile: - keyfile.write(generate_password(length=512)) + def _create_keyfile(self,luks_handle , partition :dict, password :str): + """ roiutine to create keyfiles, so it can be moved elsewere + """ + if partition.get('generate-encryption-key-file'): + if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): + cryptkey_dir.mkdir(parents=True) + # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key + # if we name the device to "xyzloop". + if partition.get('mountpoint',None): + encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" + else: + encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key" + with open(f"{self.target}{encryption_key_path}", "w") as keyfile: + keyfile.write(generate_password(length=512)) - os.chmod(f"{self.target}{encryption_key_path}", 0o400) + os.chmod(f"{self.target}{encryption_key_path}", 0o400) - luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) - luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) + luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) + luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {unlocked_device}", level=logging.INFO) - unlocked_device.mount(f"{self.target}{mountpoint}") + def _has_root(self, partition :dict) -> bool: + """ + Determine if an encrypted partition contains root in it + """ + if partition.get("mountpoint") is None: + if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})): + for mountpoint in [sub_list[subvolume] if isinstance(sub_list[subvolume],str) else sub_list[subvolume].get("mountpoint") for subvolume in sub_list]: + if mountpoint == '/': + return True + return False + else: + return False + elif partition.get("mountpoint") == '/': + return True + else: + return False + def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: + from .luks import luks2 + # set the partitions as a list not part of a tree (which we don't need anymore (i think) + list_part = [] + list_luks_handles = [] + for blockdevice in layouts: + list_part.extend(layouts[blockdevice]['partitions']) + + # we manage the encrypted partititons + for partition in [entry for entry in list_part if entry.get('encrypted',False)]: + # open the luks device and all associate stuff + if not (password := partition.get('!password', None)): + raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}") + # i change a bit the naming conventions for the loop device + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) - if partition.get('options',[]): - mount_options = ','.join(partition['options']) - partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options) - else: - partition['device_instance'].mount(f"{self.target}{mountpoint}") + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" + # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used + with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: + if partition.get('generate-encryption-key-file',False) and not self._has_root(partition): + list_luks_handles.append([luks_handle,partition,password]) + # this way all the requesrs will be to the dm_crypt device and not to the physical partition + partition['device_instance'] = unlocked_device + + # we manage the btrfs partitions + for partition in [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]: + self.mount(partition['device_instance'],"/") + try: + new_mountpoints = manage_btrfs_subvolumes(self,partition) + except Exception as e: + # every exception unmounts the physical volume. Otherwise we let the system in an unstable state + partition['device_instance'].unmount() + raise e + partition['device_instance'].unmount() + if new_mountpoints: + list_part.extend(new_mountpoints) + + # we mount. We need to sort by mountpoint to get a good working order + for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']): + mountpoint = partition['mountpoint'] + log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) + if partition.get('filesystem',{}).get('mount_options',[]): + mount_options = ','.join(partition['filesystem']['mount_options']) + partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options) + else: + partition['device_instance'].mount(f"{self.target}{mountpoint}") time.sleep(1) try: get_mount_info(f"{self.target}{mountpoint}", traverse=False) except DiskError: raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).") - def mount(self, partition, mountpoint, create_mountpoint=True): + # once everything is mounted, we generate the key files in the correct place + for handle in list_luks_handles: + ppath = handle[1]['device_instance'].path + log(f"creating key-file for {ppath}",level=logging.INFO) + self._create_keyfile(handle[0],handle[1],handle[2]) + + def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None: if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'): os.makedirs(f'{self.target}{mountpoint}') partition.mount(f'{self.target}{mountpoint}') - def post_install_check(self, *args, **kwargs): + def post_install_check(self, *args :str, **kwargs :str) -> List[bool]: return [step for step, flag in self.helper_flags.items() if flag is False] - def pacstrap(self, *packages, **kwargs): + def pacstrap(self, *packages :str, **kwargs :str) -> bool: if type(packages[0]) in (list, tuple): packages = packages[0] @@ -284,7 +306,7 @@ class Installer: else: self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO) - def set_mirrors(self, mirrors): + def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): if result := plugin.on_mirrors(mirrors): @@ -292,7 +314,7 @@ class Installer: return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') - def genfstab(self, flags='-pU'): + def genfstab(self, flags :str = '-pU') -> bool: self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: @@ -307,11 +329,11 @@ class Installer: return True - def set_hostname(self, hostname: str, *args, **kwargs): + def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: with open(f'{self.target}/etc/hostname', 'w') as fh: fh.write(hostname + '\n') - def set_locale(self, locale, encoding='UTF-8', *args, **kwargs): + def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool: if not len(locale): return True @@ -322,7 +344,7 @@ class Installer: return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False - def set_timezone(self, zone, *args, **kwargs): + def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool: if not zone: return True if not len(zone): @@ -337,6 +359,7 @@ class Installer: (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') return True + else: self.log( f"Time zone {zone} does not exist, continuing with system default.", @@ -344,11 +367,13 @@ class Installer: fg='red' ) - def activate_ntp(self): + return False + + def activate_ntp(self) -> None: log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO) self.activate_time_syncronization() - def activate_time_syncronization(self): + def activate_time_syncronization(self) -> None: self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO) self.enable_service('systemd-timesyncd') @@ -361,11 +386,11 @@ class Installer: with Boot(self) as session: session.SysCommand(["timedatectl", "set-ntp", 'true']) - def enable_espeakup(self): + def enable_espeakup(self) -> None: self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO) self.enable_service('espeakup') - def enable_service(self, *services): + def enable_service(self, *services :str) -> None: for service in services: self.log(f'Enabling service {service}', level=logging.INFO) if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0: @@ -375,19 +400,27 @@ class Installer: if hasattr(plugin, 'on_service'): plugin.on_service(service) - def run_command(self, cmd, *args, **kwargs): + def run_command(self, cmd :str, *args :str, **kwargs :str) -> None: return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}') - def arch_chroot(self, cmd, run_as=None): + def arch_chroot(self, cmd :str, run_as :Optional[str] = None): if run_as: cmd = f"su - {run_as} -c {shlex.quote(cmd)}" return self.run_command(cmd) - def drop_to_shell(self): + def drop_to_shell(self) -> None: subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) - def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs): + def configure_nic(self, + nic :str, + dhcp :bool = True, + ip :Optional[str] = None, + gateway :Optional[str] = None, + dns :Optional[str] = None, + *args :str, + **kwargs :str + ) -> None: from .systemd import Networkd if dhcp: @@ -412,7 +445,7 @@ class Installer: with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf: netconf.write(str(conf)) - def copy_iso_network_config(self, enable_services=False): + def copy_iso_network_config(self, enable_services :bool = False) -> bool: # Copy (if any) iwd password and config files if os.path.isdir('/var/lib/iwd/'): if psk_files := glob.glob('/var/lib/iwd/*.psk'): @@ -427,7 +460,7 @@ class Installer: # This function will be called after minimal_installation() # as a hook for post-installs. This hook is only needed if # base is not installed yet. - def post_install_enable_iwd_service(*args, **kwargs): + def post_install_enable_iwd_service(*args :str, **kwargs :str): self.enable_service('iwd') self.post_base_install.append(post_install_enable_iwd_service) @@ -452,7 +485,7 @@ class Installer: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: - def post_install_enable_networkd_resolved(*args, **kwargs): + def post_install_enable_networkd_resolved(*args :str, **kwargs :str): self.enable_service('systemd-networkd', 'systemd-resolved') self.post_base_install.append(post_install_enable_networkd_resolved) @@ -462,7 +495,7 @@ class Installer: return True - def detect_encryption(self, partition): + def detect_encryption(self, partition :Partition) -> bool: part = Partition(partition.parent, None, autodetect_filesystem=True) if partition.encrypted: return partition @@ -471,7 +504,7 @@ class Installer: return False - def mkinitcpio(self, *flags): + def mkinitcpio(self, *flags :str) -> bool: for plugin in plugins.values(): if hasattr(plugin, 'on_mkinitcpio'): # Allow plugins to override the usage of mkinitcpio altogether. @@ -483,9 +516,10 @@ class Installer: mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") - SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') - def minimal_installation(self): + return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0 + + def minimal_installation(self) -> bool: # Add necessary packages if encrypting the drive # (encrypted partitions default to btrfs for now, so we need btrfs-progs) # TODO: Perhaps this should be living in the function which dictates @@ -562,7 +596,7 @@ class Installer: return True - def setup_swap(self, kind='zram'): + def setup_swap(self, kind :str = 'zram') -> bool: if kind == 'zram': self.log(f"Setting up swap on zram") self.pacstrap('zram-generator') @@ -578,7 +612,18 @@ class Installer: else: raise ValueError(f"Archinstall currently only supports setting up swap on zram") - def add_bootloader(self, bootloader='systemd-bootctl'): + def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool: + """ + Adds a bootloader to the installation instance. + Archinstall supports one of three types: + * systemd-bootctl + * grub + * efistub (beta) + + :param bootloader: Can be one of the three strings + 'systemd-bootctl', 'grub' or 'efistub' (beta) + """ + for plugin in plugins.values(): if hasattr(plugin, 'on_add_bootloader'): # Allow plugins to override the boot-loader handling. @@ -669,6 +714,7 @@ class Installer: base_path,bind_path = split_bind_name(str(root_partition.path)) if bind_path is not None: # and root_fs_type == 'btrfs': options_entry = f"rootflags=subvol={bind_path} " + options_entry + if real_device := self.detect_encryption(root_partition): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) @@ -757,10 +803,19 @@ class Installer: return True - def add_additional_packages(self, *packages): + def add_additional_packages(self, *packages :str) -> bool: return self.pacstrap(*packages) - def install_profile(self, profile): + def install_profile(self, profile :str) -> ModuleType: + """ + Installs a archinstall profile script (.py file). + This profile can be either local, remote or part of the library. + + :param profile: Can be a local path or a remote path (URL) + :return: Returns the imported script as a module, this way + you can access any remaining functions exposed by the profile. + :rtype: module + """ storage['installation_session'] = self if type(profile) == str: @@ -769,13 +824,13 @@ class Installer: self.log(f'Installing network profile {profile}', level=logging.INFO) return profile.install() - def enable_sudo(self, entity: str, group=False): + def enable_sudo(self, entity: str, group :bool = False) -> bool: self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) with open(f'{self.target}/etc/sudoers', 'a') as sudoers: sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') return True - def user_create(self, user: str, password=None, groups=None, sudo=False): + def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None: if groups is None: groups = [] @@ -789,7 +844,8 @@ class Installer: if not handled_by_plugin: self.log(f'Creating user {user}', level=logging.INFO) - SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') + if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0: + raise SystemError(f"Could not create user inside installation: {output}") for plugin in plugins.values(): if hasattr(plugin, 'on_user_created'): @@ -806,24 +862,24 @@ class Installer: if sudo and self.enable_sudo(user): self.helper_flags['user'] = True - def user_set_pw(self, user, password): + def user_set_pw(self, user :str, password :str) -> bool: self.log(f'Setting password for {user}', level=logging.INFO) if user == 'root': # This means the root account isn't locked/disabled with * in /etc/passwd self.helper_flags['user'] = True - SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"") + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"").exit_code == 0 - def user_set_shell(self, user, shell): + def user_set_shell(self, user :str, shell :str) -> bool: self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) - SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0 - def chown(self, owner, path, options=[]): - return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}") + def chown(self, owner :str, path :str, options :List[str] = []) -> bool: + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0 - def create_file(self, filename, owner=None): + def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: return InstallationFile(self, filename, owner) def set_keyboard_language(self, language: str) -> bool: diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py index ad85ea1b..cbba8d52 100644 --- a/archinstall/lib/locale_helpers.py +++ b/archinstall/lib/locale_helpers.py @@ -1,41 +1,60 @@ import logging +from typing import Iterator, List from .exceptions import ServiceException from .general import SysCommand from .output import log -def list_keyboard_languages(): +def list_keyboard_languages() -> Iterator[str]: for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}): yield line.decode('UTF-8').strip() -def list_x11_keyboard_languages(): +def list_locales() -> List[str]: + with open('/etc/locale.gen', 'r') as fp: + locales = [] + # before the list of locales begins there's an empty line with a '#' in front + # so we'll collect the localels from bottom up and halt when we're donw + entries = fp.readlines() + entries.reverse() + + for entry in entries: + text = entry[1:].strip() + if text == '': + break + locales.append(text) + + locales.reverse() + return locales + + +def list_x11_keyboard_languages() -> Iterator[str]: for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}): yield line.decode('UTF-8').strip() -def verify_keyboard_layout(layout): +def verify_keyboard_layout(layout :str) -> bool: for language in list_keyboard_languages(): if layout.lower() == language.lower(): return True return False -def verify_x11_keyboard_layout(layout): +def verify_x11_keyboard_layout(layout :str) -> bool: for language in list_x11_keyboard_languages(): if layout.lower() == language.lower(): return True return False -def search_keyboard_layout(layout): +def search_keyboard_layout(layout :str) -> Iterator[str]: for language in list_keyboard_languages(): if layout.lower() in language.lower(): yield language -def set_keyboard_language(locale): +def set_keyboard_language(locale :str) -> bool: if len(locale.strip()): if not verify_keyboard_layout(locale): log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR) @@ -49,6 +68,6 @@ def set_keyboard_language(locale): return False -def list_timezones(): +def list_timezones() -> Iterator[str]: for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}): yield line.decode('UTF-8').strip() diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 255c75d9..d39bce0f 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -1,9 +1,15 @@ +from __future__ import annotations import json import logging import os import pathlib import shlex import time +from typing import Optional, List,TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .installer import Installer + from .disk import Partition, convert_device_to_uuid from .general import SysCommand, SysCommandWorker from .output import log @@ -11,7 +17,15 @@ from .exceptions import SysCallError, DiskError from .storage import storage class luks2: - def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs): + def __init__(self, + partition :Partition, + mountpoint :str, + password :str, + key_file :Optional[str] = None, + auto_unmount :bool = False, + *args :str, + **kwargs :str): + self.password = password self.partition = partition self.mountpoint = mountpoint @@ -22,7 +36,7 @@ class luks2: self.filesystem = 'crypto_LUKS' self.mapdev = None - def __enter__(self): + def __enter__(self) -> Partition: if not self.key_file: self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? @@ -34,16 +48,23 @@ class luks2: return self.unlock(self.partition, self.mountpoint, self.key_file) - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if self.auto_unmount: self.close() if len(args) >= 2 and args[1]: raise args[1] + return True - def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None): + def encrypt(self, partition :Partition, + password :Optional[str] = None, + key_size :int = 512, + hash_type :str = 'sha512', + iter_time :int = 10000, + key_file :Optional[str] = None) -> str: + log(f'Encrypting {partition} (This might take a while)', level=logging.INFO) if not key_file: @@ -119,7 +140,7 @@ class luks2: return key_file - def unlock(self, partition, mountpoint, key_file): + def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition: """ Mounts a luks2 compatible partition to a certain mountpoint. Keyfile must be specified as there's no way to interact with the pw-prompt atm. @@ -142,24 +163,24 @@ class luks2: unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False) return unlocked_partition - def close(self, mountpoint=None): + def close(self, mountpoint :Optional[str] = None) -> bool: if not mountpoint: mountpoint = self.mapdev SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}') return os.path.islink(self.mapdev) is False - def format(self, path): + def format(self, path :str) -> None: if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') - def add_key(self, path :pathlib.Path, password :str): + def add_key(self, path :pathlib.Path, password :str) -> bool: if not path.exists(): raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path)) log(f'Adding additional key-file {path} for {self.partition}', level=logging.INFO) - - worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}") + worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}", + environment_vars={'LC_ALL':'C'}) pw_injected = False while worker.is_alive(): if b'Enter any existing passphrase' in worker and pw_injected is False: @@ -169,7 +190,9 @@ class luks2: if worker.exit_code != 0: raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}') - def crypttab(self, installation, key_path :str, options=["luks", "key-slot=1"]): + return True + + def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None: log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO) with open(f"{installation.target}/etc/crypttab", "a") as crypttab: crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n") diff --git a/archinstall/lib/menu/__init__.py b/archinstall/lib/menu/__init__.py new file mode 100644 index 00000000..6e28c8a2 --- /dev/null +++ b/archinstall/lib/menu/__init__.py @@ -0,0 +1 @@ +from .menu import Menu diff --git a/archinstall/lib/menu.py b/archinstall/lib/menu/menu.py index 6f1c2237..dfd47a7a 100644 --- a/archinstall/lib/menu.py +++ b/archinstall/lib/menu/menu.py @@ -1,15 +1,20 @@ -from .simple_menu import TerminalMenu +from archinstall.lib.menu.simple_menu import TerminalMenu +from ..exceptions import RequirementError +from ..output import log +from collections.abc import Iterable +import sys +import logging class Menu(TerminalMenu): - def __init__(self, title, options, skip=True, multi=False, default_option=None, sort=True): + def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True): """ Creates a new menu :param title: Text that will be displayed above the menu :type title: str - :param options: Options to be displayed in the menu to chose from; + :param p_options: Options to be displayed in the menu to chose from; if dict is specified then the keys of such will be used as options :type options: list, dict @@ -25,9 +30,29 @@ class Menu(TerminalMenu): :param sort: Indicate if the options should be sorted alphabetically before displaying :type sort: bool """ + # we guarantee the inmutability of the options outside the class. + # an unknown number of iterables (.keys(),.values(),generator,...) can't be directly copied, in this case + # we recourse to make them lists before, but thru an exceptions + # this is the old code, which is not maintenable with more types + # options = copy(list(p_options) if isinstance(p_options,(type({}.keys()),type({}.values()))) else p_options) + # We check that the options are iterable. If not we abort. Else we copy them to lists + # it options is a dictionary we use the values as entries of the list + # if options is a string object, each character becomes an entry + # if options is a list, we implictily build a copy to mantain immutability + if not isinstance(p_options,Iterable): + log(f"Objects of type {type(p_options)} is not iterable, and are not supported at Menu",fg="red") + log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) + raise RequirementError("Menu() requires an iterable as option.") + + if isinstance(p_options,dict): + options = list(p_options.keys()) + else: + options = list(p_options) - if isinstance(options, dict): - options = list(options) + if not options: + log(" * Menu didn't find any options to choose from * ", fg='red') + log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) + raise RequirementError('Menu.__init__() requires at least one option to proceed.') if sort: options = sorted(options) diff --git a/archinstall/lib/menu/selection_menu.py b/archinstall/lib/menu/selection_menu.py new file mode 100644 index 00000000..9eb022a6 --- /dev/null +++ b/archinstall/lib/menu/selection_menu.py @@ -0,0 +1,392 @@ +import sys + +import archinstall +from archinstall import Menu + + +class Selector: + def __init__( + self, + description, + func=None, + display_func=None, + default=None, + enabled=False, + dependencies=[], + dependencies_not=[] + ): + """ + Create a new menu selection entry + + :param description: Text that will be displayed as the menu entry + :type description: str + + :param func: Function that is called when the menu entry is selected + :type func: Callable + + :param display_func: After specifying a setting for a menu item it is displayed + on the right side of the item as is; with this function one can modify the entry + to be displayed; e.g. when specifying a password one can display **** instead + :type display_func: Callable + + :param default: Default value for this menu entry + :type default: Any + + :param enabled: Specify if this menu entry should be displayed + :type enabled: bool + + :param dependencies: Specify dependencies for this menu entry; if the dependencies + are not set yet, then this item is not displayed; e.g. disk_layout depends on selectiong + harddrive(s) first + :type dependencies: list + + :param dependencies_not: These are the exclusive options; the menu item will only be + displayed if non of the entries in the list have been specified + :type dependencies_not: list + """ + + self._description = description + self.func = func + self._display_func = display_func + self._current_selection = default + self.enabled = enabled + self.text = self.menu_text() + self._dependencies = dependencies + self._dependencies_not = dependencies_not + + @property + def dependencies(self): + return self._dependencies + + @property + def dependencies_not(self): + return self._dependencies_not + + def set_enabled(self): + self.enabled = True + + def update_description(self, description): + self._description = description + self.text = self.menu_text() + + def menu_text(self): + current = '' + + if self._display_func: + current = self._display_func(self._current_selection) + else: + if self._current_selection is not None: + current = str(self._current_selection) + + if current: + padding = 35 - len(self._description) + current = ' ' * padding + f'SET: {current}' + + return f'{self._description} {current}' + + def set_current_selection(self, current): + self._current_selection = current + self.text = self.menu_text() + + def has_selection(self): + if self._current_selection is None: + return False + return True + + def is_empty(self): + if self._current_selection is None: + return True + elif isinstance(self._current_selection, (str, list, dict)) and len(self._current_selection) == 0: + return True + + return False + + +class GlobalMenu: + def __init__(self): + self._menu_options = {} + self._setup_selection_menu_options() + + def _setup_selection_menu_options(self): + self._menu_options['keyboard-layout'] = \ + Selector('Select keyboard layout', lambda: archinstall.select_language('us'), default='us') + self._menu_options['mirror-region'] = \ + Selector( + 'Select mirror region', + lambda: archinstall.select_mirror_regions(), + display_func=lambda x: list(x.keys()) if x else '[]', + default={}) + self._menu_options['sys-language'] = \ + Selector('Select locale language', lambda: archinstall.select_locale_lang('en_US'), default='en_US') + self._menu_options['sys-encoding'] = \ + Selector('Select locale encoding', lambda: archinstall.select_locale_enc('utf-8'), default='utf-8') + self._menu_options['harddrives'] = \ + Selector( + 'Select harddrives', + lambda: self._select_harddrives()) + self._menu_options['disk_layouts'] = \ + Selector( + 'Select disk layout', + lambda: archinstall.select_disk_layout( + archinstall.arguments['harddrives'], + archinstall.arguments.get('advanced', False) + ), + dependencies=['harddrives']) + self._menu_options['!encryption-password'] = \ + Selector( + 'Set encryption password', + lambda: archinstall.get_password(prompt='Enter disk encryption password (leave blank for no encryption): '), + display_func=lambda x: self._secret(x) if x else 'None', + dependencies=['harddrives']) + self._menu_options['swap'] = \ + Selector( + 'Use swap', + lambda: archinstall.ask_for_swap(), + default=True) + self._menu_options['bootloader'] = \ + Selector( + 'Select bootloader', + lambda: archinstall.ask_for_bootloader(archinstall.arguments.get('advanced', False)),) + self._menu_options['hostname'] = \ + Selector('Specify hostname', lambda: archinstall.ask_hostname()) + self._menu_options['!root-password'] = \ + Selector( + 'Set root password', + lambda: self._set_root_password(), + display_func=lambda x: self._secret(x) if x else 'None') + self._menu_options['!superusers'] = \ + Selector( + 'Specify superuser account', + lambda: self._create_superuser_account(), + dependencies_not=['!root-password'], + display_func=lambda x: list(x.keys()) if x else '') + self._menu_options['!users'] = \ + Selector( + 'Specify user account', + lambda: self._create_user_account(), + default={}, + display_func=lambda x: list(x.keys()) if x else '[]') + self._menu_options['profile'] = \ + Selector( + 'Specify profile', + lambda: self._select_profile(), + display_func=lambda x: x if x else 'None') + self._menu_options['audio'] = \ + Selector( + 'Select audio', + lambda: archinstall.ask_for_audio_selection(archinstall.is_desktop_profile(archinstall.arguments.get('profile', None)))) + self._menu_options['kernels'] = \ + Selector( + 'Select kernels', + lambda: archinstall.select_kernel(), + default='linux') + self._menu_options['packages'] = \ + Selector( + 'Additional packages to install', + lambda: archinstall.ask_additional_packages_to_install(archinstall.arguments.get('packages', None)), + default=[]) + self._menu_options['nic'] = \ + Selector( + 'Configure network', + lambda: archinstall.ask_to_configure_network(), + display_func=lambda x: x if x else 'Not configured, unavailable unless setup manually', + default={}) + self._menu_options['timezone'] = \ + Selector('Select timezone', lambda: archinstall.ask_for_a_timezone()) + self._menu_options['ntp'] = \ + Selector( + 'Set automatic time sync (NTP)', + lambda: archinstall.ask_ntp(), + default=True) + self._menu_options['install'] = \ + Selector( + self._install_text(), + enabled=True) + self._menu_options['abort'] = Selector('Abort', enabled=True) + + def enable(self, selector_name, omit_if_set=False): + arg = archinstall.arguments.get(selector_name, None) + + # don't display the menu option if it was defined already + if arg is not None and omit_if_set: + return + + if self._menu_options.get(selector_name, None): + self._menu_options[selector_name].set_enabled() + if arg is not None: + self._menu_options[selector_name].set_current_selection(arg) + else: + print(f'No selector found: {selector_name}') + sys.exit(1) + + def run(self): + while True: + # # Before continuing, set the preferred keyboard layout/language in the current terminal. + # # This will just help the user with the next following questions. + self._set_kb_language() + + enabled_menus = self._menus_to_enable() + menu_text = [m.text for m in enabled_menus.values()] + selection = Menu('Set/Modify the below options', menu_text, sort=False).run() + if selection: + selection = selection.strip() + if 'Abort' in selection: + exit(0) + elif 'Install' in selection: + if self._missing_configs() == 0: + self._post_processing() + break + else: + self._process_selection(selection) + + def _process_selection(self, selection): + # find the selected option in our option list + option = [[k, v] for k, v in self._menu_options.items() if v.text.strip() == selection] + + if len(option) != 1: + raise ValueError(f'Selection not found: {selection}') + + selector_name = option[0][0] + selector = option[0][1] + result = selector.func() + self._menu_options[selector_name].set_current_selection(result) + archinstall.arguments[selector_name] = result + + self._update_install() + + def _update_install(self): + text = self._install_text() + self._menu_options.get('install').update_description(text) + + def _post_processing(self): + if archinstall.arguments.get('harddrives', None) and archinstall.arguments.get('!encryption-password', None): + # If no partitions was marked as encrypted, but a password was supplied and we have some disks to format.. + # Then we need to identify which partitions to encrypt. This will default to / (root). + if len(list(archinstall.encrypted_partitions(archinstall.storage['disk_layouts']))) == 0: + archinstall.storage['disk_layouts'] = archinstall.select_encrypted_partitions( + archinstall.storage['disk_layouts'], archinstall.arguments['!encryption-password']) + + def _install_text(self): + missing = self._missing_configs() + if missing > 0: + return f'Install ({missing} config(s) missing)' + return 'Install' + + def _missing_configs(self): + def check(s): + return self._menu_options.get(s).has_selection() + + missing = 0 + if not check('bootloader'): + missing += 1 + if not check('hostname'): + missing += 1 + if not check('audio'): + missing += 1 + if not check('timezone'): + missing += 1 + if not check('!root-password') and not check('!superusers'): + missing += 1 + if not check('harddrives'): + missing += 1 + if check('harddrives'): + if not self._menu_options.get('harddrives').is_empty() and not check('disk_layouts'): + missing += 1 + + return missing + + def _set_root_password(self): + prompt = 'Enter root password (leave blank to disable root & create superuser): ' + password = archinstall.get_password(prompt=prompt) + + if password is not None: + self._menu_options.get('!superusers').set_current_selection(None) + archinstall.arguments['!users'] = {} + archinstall.arguments['!superusers'] = {} + + return password + + def _select_harddrives(self): + old_haddrives = archinstall.arguments.get('harddrives') + harddrives = archinstall.select_harddrives() + + # in case the harddrives got changed we have to reset the disk layout as well + if old_haddrives != harddrives: + self._menu_options.get('disk_layouts').set_current_selection(None) + archinstall.arguments['disk_layouts'] = {} + + if not harddrives: + prompt = 'You decided to skip harddrive selection\n' + prompt += f"and will use whatever drive-setup is mounted at {archinstall.storage['MOUNT_POINT']} (experimental)\n" + prompt += "WARNING: Archinstall won't check the suitability of this setup\n" + + prompt += 'Do you wish to continue?' + choice = Menu(prompt, ['yes', 'no'], default_option='yes').run() + + if choice == 'no': + return self._select_harddrives() + + return harddrives + + def _secret(self, x): + return '*' * len(x) + + def _select_profile(self): + profile = archinstall.select_profile() + + # Check the potentially selected profiles preparations to get early checks if some additional questions are needed. + if profile and profile.has_prep_function(): + namespace = f'{profile.namespace}.py' + with profile.load_instructions(namespace=namespace) as imported: + if not imported._prep_function(): + archinstall.log(' * Profile\'s preparation requirements was not fulfilled.', fg='red') + exit(1) + + return profile + + def _create_superuser_account(self): + superuser = archinstall.ask_for_superuser_account('Create a required super-user with sudo privileges: ', forced=True) + return superuser + + def _create_user_account(self): + users, superusers = archinstall.ask_for_additional_users('Enter a username to create an additional user: ') + if not archinstall.arguments.get('!superusers', None): + archinstall.arguments['!superusers'] = superusers + else: + archinstall.arguments['!superusers'] = {**archinstall.arguments['!superusers'], **superusers} + + return users + + def _set_kb_language(self): + # Before continuing, set the preferred keyboard layout/language in the current terminal. + # This will just help the user with the next following questions. + if archinstall.arguments.get('keyboard-layout', None) and len(archinstall.arguments['keyboard-layout']): + archinstall.set_keyboard_language(archinstall.arguments['keyboard-layout']) + + def _verify_selection_enabled(self, selection_name): + if selection := self._menu_options.get(selection_name, None): + if not selection.enabled: + return False + + if len(selection.dependencies) > 0: + for d in selection.dependencies: + if not self._verify_selection_enabled(d) or self._menu_options.get(d).is_empty(): + return False + + if len(selection.dependencies_not) > 0: + for d in selection.dependencies_not: + if not self._menu_options.get(d).is_empty(): + return False + + return True + + raise ValueError(f'No selection found: {selection_name}') + + def _menus_to_enable(self): + enabled_menus = {} + + for name, selection in self._menu_options.items(): + if self._verify_selection_enabled(name): + enabled_menus[name] = selection + + return enabled_menus diff --git a/archinstall/lib/simple_menu.py b/archinstall/lib/menu/simple_menu.py index 6e4853ea..a9d6d7ec 100644 --- a/archinstall/lib/simple_menu.py +++ b/archinstall/lib/menu/simple_menu.py @@ -61,7 +61,6 @@ try: except ImportError as e: raise NotImplementedError('"{}" is currently not supported.'.format(platform.system())) from e - __author__ = "Ingo Meyer" __email__ = "i.meyer@fz-juelich.de" __copyright__ = "Copyright © 2021 Forschungszentrum Jülich GmbH. All rights reserved." diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index 5fad6cb6..6b6bfed4 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,7 +1,7 @@ import logging import urllib.error import urllib.request -from typing import Union, Mapping, Iterable +from typing import Union, Mapping, Iterable, Dict, Any, List from .general import SysCommand from .output import log @@ -51,7 +51,12 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: return new_raw_data -def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', sort_order=["https", "http"], *args, **kwargs) -> Union[bool, bytes]: +def filter_mirrors_by_region(regions :str, + destination :str = '/etc/pacman.d/mirrorlist', + sort_order :List[str] = ["https", "http"], + *args :str, + **kwargs :str +) -> Union[bool, bytes]: """ This function will change the active mirrors on the live medium by filtering which regions are active based on `regions`. @@ -75,7 +80,7 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', so return new_list.decode('UTF-8') -def add_custom_mirrors(mirrors: list, *args, **kwargs): +def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool: """ This will append custom mirror definitions in pacman.conf @@ -91,7 +96,7 @@ def add_custom_mirrors(mirrors: list, *args, **kwargs): return True -def insert_mirrors(mirrors, *args, **kwargs): +def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool: """ This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`. It will not flush any other mirrors, just insert new ones. @@ -138,7 +143,7 @@ def re_rank_mirrors( return True -def list_mirrors(sort_order=["https", "http"]): +def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" regions = {} diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index 0d94572a..6b09deba 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -2,7 +2,7 @@ import logging import os import socket import struct -from collections import OrderedDict +from typing import Union, Dict, Any, List from .exceptions import HardwareIncompatibilityError from .general import SysCommand @@ -10,36 +10,40 @@ from .output import log from .storage import storage -def get_hw_addr(ifname): +def get_hw_addr(ifname :str) -> str: import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) return ':'.join('%02x' % b for b in info[18:24]) -def list_interfaces(skip_loopback=True): - interfaces = OrderedDict() +def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: + interfaces = {} + for index, iface in socket.if_nameindex(): if skip_loopback and iface == "lo": continue mac = get_hw_addr(iface).replace(':', '-').lower() interfaces[mac] = iface + return interfaces -def check_mirror_reachable(): +def check_mirror_reachable() -> bool: log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO) if SysCommand("pacman -Sy").exit_code == 0: return True + elif os.geteuid() != 0: log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red") return False -def enrich_iface_types(interfaces: dict): +def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str, str]: result = {} + for iface in interfaces: if os.path.isdir(f"/sys/class/net/{iface}/bridge/"): result[iface] = 'BRIDGE' @@ -53,19 +57,21 @@ def enrich_iface_types(interfaces: dict): result[iface] = 'PHYSICAL' else: result[iface] = 'UNKNOWN' + return result -def get_interface_from_mac(mac): +def get_interface_from_mac(mac :str) -> str: return list_interfaces().get(mac.lower(), None) -def wireless_scan(interface): +def wireless_scan(interface :str) -> None: interfaces = enrich_iface_types(list_interfaces().values()) if interfaces[interface] != 'WIRELESS': raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}") - SysCommand(f"iwctl station {interface} scan") + if not (output := SysCommand(f"iwctl station {interface} scan")).exit_code == 0: + raise SystemError(f"Could not scan for wireless networks: {output}") if '_WIFI' not in storage: storage['_WIFI'] = {} @@ -76,8 +82,9 @@ def wireless_scan(interface): # TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25 -def get_wireless_networks(interface): +def get_wireless_networks(interface :str) -> None: # TODO: Make this oneliner pritter to check if the interface is scanning or not. + # TODO: Rename this to list_wireless_networks() as it doesn't return anything if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False: import time diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py index ffc44cbe..1d46ef5e 100644 --- a/archinstall/lib/packages.py +++ b/archinstall/lib/packages.py @@ -3,6 +3,7 @@ import ssl import urllib.error import urllib.parse import urllib.request +from typing import Dict, Any from .exceptions import RequirementError @@ -10,7 +11,7 @@ BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}' BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/' -def find_group(name): +def find_group(name :str) -> bool: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE @@ -27,7 +28,7 @@ def find_group(name): return True -def find_package(name): +def find_package(name :str) -> Any: """ Finds a specific package via the package database. It makes a simple web-request, which might be a bit slow. @@ -40,7 +41,7 @@ def find_package(name): return json.loads(data) -def find_packages(*names): +def find_packages(*names :str) -> Dict[str, Any]: """ This function returns the search results for many packages. The function itself is rather slow, so consider not sending to @@ -49,7 +50,7 @@ def find_packages(*names): return {package: find_package(package) for package in names} -def validate_package_list(packages: list): +def validate_package_list(packages: list) -> bool: """ Validates a list of given packages. Raises `RequirementError` if one or more packages are not found. diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index 027b58d5..7f920317 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -7,6 +7,8 @@ import pathlib import urllib.parse import urllib.request from importlib import metadata +from typing import Optional, List +from types import ModuleType from .output import log from .storage import storage @@ -38,7 +40,7 @@ def localize_path(profile_path :str) -> str: return profile_path -def import_via_path(path :str, namespace=None): # -> module (not sure how to write that in type definitions) +def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType: if not namespace: namespace = os.path.basename(path) @@ -62,14 +64,14 @@ def import_via_path(path :str, namespace=None): # -> module (not sure how to wri except: pass -def find_nth(haystack, needle, n): +def find_nth(haystack :List[str], needle :str, n :int) -> int: start = haystack.find(needle) while start >= 0 and n > 1: start = haystack.find(needle, start + len(needle)) n -= 1 return start -def load_plugin(path :str): # -> module (not sure how to write that in type definitions) +def load_plugin(path :str) -> ModuleType: parsed_url = urllib.parse.urlparse(path) # The Profile was not a direct match on a remote URL diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py index 7d5373c5..9befd3d5 100644 --- a/archinstall/lib/profiles.py +++ b/archinstall/lib/profiles.py @@ -1,3 +1,4 @@ +from __future__ import annotations import hashlib import importlib.util import json @@ -8,7 +9,11 @@ import sys import urllib.error import urllib.parse import urllib.request -from typing import Optional +from typing import Optional, Dict, Union, TYPE_CHECKING +from types import ModuleType +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .installer import Installer from .general import multisplit from .networking import list_interfaces @@ -16,16 +21,16 @@ from .storage import storage from .exceptions import ProfileNotFound -def grab_url_data(path): +def grab_url_data(path :str) -> str: safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))]) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE response = urllib.request.urlopen(safe_path, context=ssl_context) - return response.read() + return response.read() # bytes? -def is_desktop_profile(profile) -> bool: +def is_desktop_profile(profile :str) -> bool: if str(profile) == 'Profile(desktop)': return True @@ -42,8 +47,13 @@ def is_desktop_profile(profile) -> bool: return False -def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False): +def list_profiles( + filter_irrelevant_macs :bool = True, + subpath :str = '', + filter_top_level_profiles :bool = False +) -> Dict[str, Dict[str, Union[str, bool]]]: # TODO: Grab from github page as well, not just local static files + if filter_irrelevant_macs: local_macs = list_interfaces() @@ -101,23 +111,27 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof class Script: - def __init__(self, profile, installer=None): - # profile: https://hvornum.se/something.py - # profile: desktop - # profile: /path/to/profile.py + def __init__(self, profile :str, installer :Optional[Installer] = None): + """ + :param profile: A string representing either a boundled profile, a local python file + or a remote path (URL) to a python script-profile. Three examples: + * profile: https://archlinux.org/some_profile.py + * profile: desktop + * profile: /path/to/profile.py + """ self.profile = profile - self.installer = installer + self.installer = installer # TODO: Appears not to be used anymore? self.converted_path = None self.spec = None self.examples = None self.namespace = os.path.splitext(os.path.basename(self.path))[0] self.original_namespace = self.namespace - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> ModuleType: self.execute() return sys.modules[self.namespace] - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> None: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] @@ -125,7 +139,7 @@ class Script: if self.original_namespace: self.namespace = self.original_namespace - def localize_path(self, profile_path): + def localize_path(self, profile_path :str) -> str: if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'): if not self.converted_path: self.converted_path = f"/tmp/{os.path.basename(self.profile).replace('.py', '')}_{hashlib.md5(os.urandom(12)).hexdigest()}.py" @@ -138,7 +152,7 @@ class Script: return profile_path @property - def path(self): + def path(self) -> str: parsed_url = urllib.parse.urlparse(self.profile) # The Profile was not a direct match on a remote URL @@ -163,7 +177,7 @@ class Script: else: raise ProfileNotFound(f"Cannot handle scheme {parsed_url.scheme}") - def load_instructions(self, namespace=None): + def load_instructions(self, namespace :Optional[str] = None) -> 'Script': if namespace: self.namespace = namespace @@ -173,7 +187,7 @@ class Script: return self - def execute(self): + def execute(self) -> ModuleType: if self.namespace not in sys.modules or self.spec is None: self.load_instructions() @@ -183,25 +197,23 @@ class Script: class Profile(Script): - def __init__(self, installer, path, args=None): + def __init__(self, installer :Installer, path :str): super(Profile, self).__init__(path, installer) - if args is None: - args = {} - def __dump__(self, *args, **kwargs): + def __dump__(self, *args :str, **kwargs :str) -> Dict[str, str]: return {'path': self.path} - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str) -> str: return f'Profile({os.path.basename(self.profile)})' - def install(self): + def install(self) -> ModuleType: # Before installing, revert any temporary changes to the namespace. # This ensures that the namespace during installation is the original initiation namespace. # (For instance awesome instead of aweosme.py or app-awesome.py) self.namespace = self.original_namespace return self.execute() - def has_prep_function(self): + def has_prep_function(self) -> bool: with open(self.path, 'r') as source: source_data = source.read() @@ -218,7 +230,7 @@ class Profile(Script): return True return False - def has_post_install(self): + def has_post_install(self) -> bool: with open(self.path, 'r') as source: source_data = source.read() @@ -234,7 +246,7 @@ class Profile(Script): if hasattr(imported, '_post_install'): return True - def is_top_level_profile(self): + def is_top_level_profile(self) -> bool: with open(self.path, 'r') as source: source_data = source.read() @@ -247,7 +259,7 @@ class Profile(Script): # since developers like less code - omitting it should assume they want to present it. return True - def get_profile_description(self): + def get_profile_description(self) -> str: with open(self.path, 'r') as source: source_data = source.read() @@ -282,11 +294,11 @@ class Profile(Script): class Application(Profile): - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str): return f'Application({os.path.basename(self.profile)})' @property - def path(self): + def path(self) -> str: parsed_url = urllib.parse.urlparse(self.profile) # The Profile was not a direct match on a remote URL @@ -311,7 +323,7 @@ class Application(Profile): else: raise ProfileNotFound(f"Application cannot handle scheme {parsed_url.scheme}") - def install(self): + def install(self) -> ModuleType: # Before installing, revert any temporary changes to the namespace. # This ensures that the namespace during installation is the original initiation namespace. # (For instance awesome instead of aweosme.py or app-awesome.py) diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py index d295bdbb..b177052b 100644 --- a/archinstall/lib/services.py +++ b/archinstall/lib/services.py @@ -2,7 +2,7 @@ import os from .general import SysCommand -def service_state(service_name: str): +def service_state(service_name: str) -> str: if os.path.splitext(service_name)[1] != '.service': service_name += '.service' # Just to be safe diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py index c3beafc0..44e634fe 100644 --- a/archinstall/lib/systemd.py +++ b/archinstall/lib/systemd.py @@ -1,5 +1,6 @@ import logging import time +from typing import Iterator from .exceptions import SysCallError from .general import SysCommand, SysCommandWorker, locate_binary from .installer import Installer @@ -8,14 +9,14 @@ from .storage import storage class Ini: - def __init__(self, *args, **kwargs): + def __init__(self, *args :str, **kwargs :str): """ Limited INI handler for now. Supports multiple keywords through dictionary list items. """ self.kwargs = kwargs - def __str__(self): + def __str__(self) -> str: result = '' first_row_done = False for top_level in self.kwargs: @@ -54,7 +55,7 @@ class Boot: self.session = None self.ready = False - def __enter__(self): + def __enter__(self) -> 'Boot': if (existing_session := storage.get('active_boot', None)) and existing_session.instance != self.instance: raise KeyError("Archinstall only supports booting up one instance, and a active session is already active and it is not this one.") @@ -81,7 +82,7 @@ class Boot: storage['active_boot'] = self return self - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> None: # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager @@ -98,24 +99,24 @@ class Boot: else: raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {shutdown}", exit_code=shutdown.exit_code) - def __iter__(self): + def __iter__(self) -> Iterator[str]: if self.session: for value in self.session: yield value - def __contains__(self, key: bytes): + def __contains__(self, key: bytes) -> bool: if self.session is None: return False return key in self.session - def is_alive(self): + def is_alive(self) -> bool: if self.session is None: return False return self.session.is_alive() - def SysCommand(self, cmd: list, *args, **kwargs): + def SysCommand(self, cmd: list, *args, **kwargs) -> SysCommand: if cmd[0][0] != '/' and cmd[0][:2] != './': # This check is also done in SysCommand & SysCommandWorker. # However, that check is done for `machinectl` and not for our chroot command. @@ -125,7 +126,7 @@ class Boot: return SysCommand(["systemd-run", f"--machine={self.container_name}", "--pty", *cmd], *args, **kwargs) - def SysCommandWorker(self, cmd: list, *args, **kwargs): + def SysCommandWorker(self, cmd: list, *args, **kwargs) -> SysCommandWorker: if cmd[0][0] != '/' and cmd[0][:2] != './': cmd[0] = locate_binary(cmd[0]) diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index 6298db19..c213a941 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -1,3 +1,4 @@ +from __future__ import annotations import getpass import ipaddress import logging @@ -7,11 +8,17 @@ import shutil import signal import sys import time +from collections.abc import Iterable +from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING + +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .disk.partition import Partition from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_disks from .exceptions import RequirementError, UserError, DiskError from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics -from .locale_helpers import list_keyboard_languages, list_timezones +from .locale_helpers import list_keyboard_languages, list_timezones, list_locales from .networking import list_interfaces from .menu import Menu from .output import log @@ -21,22 +28,22 @@ from .mirrors import list_mirrors # TODO: Some inconsistencies between the selection processes. # Some return the keys from the options, some the values? -from .. import fs_types - +from .. import fs_types, validate_package_list -def get_terminal_height(): +# TODO: These can be removed after the move to simple_menu.py +def get_terminal_height() -> int: return shutil.get_terminal_size().lines -def get_terminal_width(): +def get_terminal_width() -> int: return shutil.get_terminal_size().columns -def get_longest_option(options): +def get_longest_option(options :List[Any]) -> int: return max([len(x) for x in options]) -def check_for_correct_username(username): +def check_for_correct_username(username :str) -> bool: if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32: return True log( @@ -47,14 +54,14 @@ def check_for_correct_username(username): return False -def do_countdown(): +def do_countdown() -> bool: SIG_TRIGGER = False - def kill_handler(sig, frame): + def kill_handler(sig :int, frame :Any) -> None: print() exit(0) - def sig_handler(sig, frame): + def sig_handler(sig :int, frame :Any) -> None: global SIG_TRIGGER SIG_TRIGGER = True signal.signal(signal.SIGINT, kill_handler) @@ -79,12 +86,14 @@ def do_countdown(): sys.stdin.read() SIG_TRIGGER = False signal.signal(signal.SIGINT, sig_handler) + print() signal.signal(signal.SIGINT, original_sigint_handler) + return True -def get_password(prompt="Enter a password: "): +def get_password(prompt :str = "Enter a password: ") -> Optional[str]: while passwd := getpass.getpass(prompt): passwd_verification = getpass.getpass(prompt='And one more time for verification: ') if passwd != passwd_verification: @@ -98,7 +107,7 @@ def get_password(prompt="Enter a password: "): return None -def print_large_list(options, padding=5, margin_bottom=0, separator=': '): +def print_large_list(options :List[str], padding :int = 5, margin_bottom :int = 0, separator :str = ': ') -> List[int]: highest_index_number_length = len(str(len(options))) longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding spaces_without_option = longest_line - (len(separator) + highest_index_number_length) @@ -136,6 +145,7 @@ def select_encrypted_partitions(block_devices :dict, password :str) -> dict: # Users might want to single out a partition for non-encryption to share between dualboot etc. +# TODO: This can be removed once we have simple_menu everywhere class MiniCurses: def __init__(self, width, height): self.width = width @@ -255,11 +265,24 @@ class MiniCurses: return response -def ask_for_swap(prompt='Would you like to use swap on zram? (Y/n): ', forced=False): - return True if input(prompt).strip(' ').lower() not in ('n', 'no') else False +def ask_for_swap(prompt='Would you like to use swap on zram?', forced=False): + choice = Menu(prompt, ['yes', 'no'], default_option='yes').run() + return False if choice == 'no' else True + +def ask_ntp(): + prompt = 'Would you like to use automatic time synchronization (NTP) with the default time servers?' + prompt += 'Hardware time and other post-configuration steps might be required in order for NTP to work. For more information, please check the Arch wiki' + choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() + return False if choice == 'no' else True -def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False): + +def ask_hostname(): + hostname = input('Desired hostname for the installation: ').strip(' ') + return hostname + + +def ask_for_superuser_account(prompt :str = 'Username for required superuser with sudo privileges: ', forced :bool = False) -> Dict[str, Dict[str, str]]: while 1: new_user = input(prompt).strip(' ') @@ -277,7 +300,7 @@ def ask_for_superuser_account(prompt='Username for required superuser with sudo return {new_user: {"!password": password}} -def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '): +def ask_for_additional_users(prompt :str = 'Any additional users to install (leave blank for no users): ') -> List[Dict[str, Dict[str, str]]]: users = {} superusers = {} @@ -297,13 +320,13 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan return users, superusers -def ask_for_a_timezone(): +def ask_for_a_timezone() -> str: timezones = list_timezones() default = 'UTC' selected_tz = Menu( f'Select a timezone or leave blank to use default "{default}"', - timezones, + list(timezones), skip=False, default_option=default ).run() @@ -311,12 +334,12 @@ def ask_for_a_timezone(): return selected_tz -def ask_for_bootloader(advanced_options=False) -> str: +def ask_for_bootloader(advanced_options :bool = False) -> str: bootloader = "systemd-bootctl" if has_uefi() else "grub-install" if has_uefi(): if not advanced_options: - bootloader_choice = input("Would you like to use GRUB as a bootloader instead of systemd-boot? [y/N] ").lower() - if bootloader_choice == "y": + bootloader_choice = Menu('Would you like to use GRUB as a bootloader instead of systemd-boot?', ['yes', 'no'], default_option='no').run() + if bootloader_choice == "yes": bootloader = "grub-install" else: # We use the common names for the bootloader as the selection, and map it back to the expected values. @@ -333,14 +356,46 @@ def ask_for_bootloader(advanced_options=False) -> str: return bootloader -def ask_for_audio_selection(desktop=True): +def ask_for_audio_selection(desktop :bool = True) -> str: audio = 'pipewire' if desktop else 'none' choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none'] - selected_audio = Menu(f'Choose an audio server or leave blank to use "{audio}"', choices, default_option=audio).run() + selected_audio = Menu( + f'Choose an audio server', + choices, + default_option=audio, + skip=False + ).run() return selected_audio -def ask_to_configure_network(): +# TODO: Remove? Moved? +def ask_additional_packages_to_install(packages :List[str] = None) -> List[str]: + # Additional packages (with some light weight error handling for invalid package names) + print( + "Only packages such as base, base-devel, linux, linux-firmware, efibootmgr and optional profile packages are installed.") + print("If you desire a web browser, such as firefox or chromium, you may specify it in the following prompt.") + while True: + if packages is None: + packages = [p for p in input( + 'Write additional packages to install (space separated, leave blank to skip): ' + ).split(' ') if len(p)] + + if len(packages): + # Verify packages that were given + try: + log("Verifying that additional packages exist (this might take a few seconds)") + validate_package_list(packages) + break + except RequirementError as e: + log(e, fg='red') + else: + # no additional packages were selected, which we'll allow + break + + return packages + + +def ask_to_configure_network() -> Dict[str, Any]: # Optionally configure one network interface. # while 1: # {MAC: Ifname} @@ -350,7 +405,7 @@ def ask_to_configure_network(): **list_interfaces() } - nic = Menu('Select one network interface to configure', interfaces.values()).run() + nic = Menu('Select one network interface to configure', list(interfaces.values())).run() if nic and nic != 'Copy ISO network configuration to installation': if nic == 'Use NetworkManager (necessary to configure internet graphically in GNOME and KDE)': @@ -435,7 +490,7 @@ def ask_for_main_filesystem_format(advanced_options=False): return Menu('Select which filesystem your main partition should use', options, skip=False).run() -def current_partition_layout(partitions, with_idx=False): +def current_partition_layout(partitions :List[Partition], with_idx :bool = False) -> Dict[str, Any]: def do_padding(name, max_len): spaces = abs(len(str(name)) - max_len) + 2 pad_left = int(spaces / 2) @@ -479,7 +534,7 @@ def current_partition_layout(partitions, with_idx=False): return f'\n\nCurrent partition layout:\n\n{current_layout}' -def select_partition(title, partitions, multiple=False): +def select_partition(title :str, partitions :List[Partition], multiple :bool = False) -> Union[int, List[int], None]: partition_indexes = list(map(str, range(len(partitions)))) partition = Menu(title, partition_indexes, multi=multiple).run() @@ -491,47 +546,18 @@ def select_partition(title, partitions, multiple=False): return None -def get_default_partition_layout(block_devices, advanced_options=False): +def get_default_partition_layout( + block_devices :Union[BlockDevice, List[BlockDevice]], + advanced_options :bool = False +) -> Dict[str, Any]: + if len(block_devices) == 1: return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options) else: return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) -def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: - # if has_uefi(): - # partition_type = 'gpt' - # else: - # partition_type = 'msdos' - - # log(f"Selecting which partitions to re-use on {block_device}...", fg="yellow", level=logging.INFO) - # partitions = generic_multi_select(block_device.partitions.values(), "Select which partitions to re-use (the rest will be left alone): ", sort=True) - # partitions_to_wipe = generic_multi_select(partitions, "Which partitions do you wish to wipe (multiple can be selected): ", sort=True) - - # mountpoints = {} - # struct = { - # "partitions" : [] - # } - # for partition in partitions: - # mountpoint = input(f"Select a mountpoint (or skip) for {partition}: ").strip() - - # part_struct = {} - # if mountpoint: - # part_struct['mountpoint'] = mountpoint - # if mountpoint == '/boot': - # part_struct['boot'] = True - # if has_uefi(): - # part_struct['ESP'] = True - # elif mountpoint == '/' and - # if partition.uuid: - # part_struct['PARTUUID'] = partition.uuid - # if partition in partitions_to_wipe: - # part_struct['wipe'] = True - - # struct['partitions'].append(part_struct) - - # return struct - +def manage_new_and_existing_partitions(block_device :BlockDevice) -> Dict[str, Any]: block_device_struct = { "partitions": [partition.__dump__() for partition in block_device.partitions.values()] } @@ -689,7 +715,7 @@ def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: return block_device_struct -def select_individual_blockdevice_usage(block_devices: list): +def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: result = {} for device in block_devices: @@ -700,7 +726,7 @@ def select_individual_blockdevice_usage(block_devices: list): return result -def select_disk_layout(block_devices :list, advanced_options=False): +def select_disk_layout(block_devices :list, advanced_options=False) -> Dict[str, Any]: modes = [ "Wipe all selected drives and use a best-effort default partition layout", "Select what to do with each individual drive (followed by partition usage)" @@ -714,7 +740,7 @@ def select_disk_layout(block_devices :list, advanced_options=False): return select_individual_blockdevice_usage(block_devices) -def select_disk(dict_o_disks): +def select_disk(dict_o_disks :Dict[str, BlockDevice]) -> BlockDevice: """ Asks the user to select a harddrive from the `dict_o_disks` selection. Usually this is combined with :ref:`archinstall.list_drives`. @@ -742,7 +768,7 @@ def select_disk(dict_o_disks): raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.') -def select_profile(): +def select_profile() -> Optional[str]: """ # Asks the user to select a profile from the available profiles. # @@ -762,7 +788,7 @@ def select_profile(): title = 'This is a list of pre-programmed profiles, ' \ 'they might make it easier to install things like desktop environments' - selection = Menu(title=title, options=options.keys()).run() + selection = Menu(title=title, p_options=list(options.keys())).run() if selection is not None: return options[selection] @@ -770,7 +796,7 @@ def select_profile(): return None -def select_language(): +def select_language(default_value :str) -> str: """ Asks the user to select a language Usually this is combined with :ref:`archinstall.list_keyboard_languages`. @@ -784,11 +810,11 @@ def select_language(): # allows for searching anyways sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len) - selected_lang = Menu('Select Keyboard layout', sorted_kb_lang, default_option='us', sort=False).run() + selected_lang = Menu('Select Keyboard layout', sorted_kb_lang, default_option=default_value, sort=False).run() return selected_lang -def select_mirror_regions(): +def select_mirror_regions() -> Dict[str, Any]: """ Asks the user to select a mirror or region Usually this is combined with :ref:`archinstall.list_mirrors`. @@ -800,7 +826,7 @@ def select_mirror_regions(): mirrors = list_mirrors() selected_mirror = Menu( 'Select one of the regions to download packages from', - mirrors.keys(), + list(mirrors.keys()), multi=True ).run() @@ -810,7 +836,7 @@ def select_mirror_regions(): return {} -def select_harddrives(): +def select_harddrives() -> Optional[str]: """ Asks the user to select one or multiple hard drives @@ -822,17 +848,17 @@ def select_harddrives(): selected_harddrive = Menu( 'Select one or more hard drives to use and configure', - options.keys(), + list(options.keys()), multi=True ).run() if selected_harddrive and len(selected_harddrive) > 0: return [options[i] for i in selected_harddrive] - return None + return [] -def select_driver(options=AVAILABLE_GFX_DRIVERS): +def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str: """ Some what convoluted function, whose job is simple. Select a graphics driver from a pre-defined set of popular options. @@ -866,7 +892,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS): raise RequirementError("Selecting drivers require a least one profile to be given as an option.") -def select_kernel(): +def select_kernel() -> List[str]: """ Asks the user to select a kernel for system. @@ -886,3 +912,109 @@ def select_kernel(): ).run() return selected_kernels + + +def select_locale_lang(default): + locales = list_locales() + locale_lang = set([locale.split()[0] for locale in locales]) + + selected_locale = Menu( + f'Choose which locale language to use', + locale_lang, + sort=True, + default_option=default + ).run() + + return selected_locale + + +def select_locale_enc(default): + locales = list_locales() + locale_enc = set([locale.split()[1] for locale in locales]) + + selected_locale = Menu( + f'Choose which locale encoding to use', + locale_enc, + sort=True, + default_option=default + ).run() + + return selected_locale + +def generic_select(p_options :Union[list,dict], + input_text :str = "Select one of the values shown below: ", + allow_empty_input :bool = True, + options_output :bool = True, # function not available + sort :bool = False, + multi :bool = False, + default :Any = None) -> Any: + """ + A generic select function that does not output anything + other than the options and their indexes. As an example: + + generic_select(["first", "second", "third option"]) + > first + second + third option + When the user has entered the option correctly, + this function returns an item from list, a string, or None + + Options can be any iterable. + Duplicate entries are not checked, but the results with them are unreliable. Which element to choose from the duplicates depends on the return of the index() + Default value if not on the list of options will be added as the first element + sort will be handled by Menu() + """ + # We check that the options are iterable. If not we abort. Else we copy them to lists + # it options is a dictionary we use the values as entries of the list + # if options is a string object, each character becomes an entry + # if options is a list, we implictily build a copy to mantain immutability + if not isinstance(p_options,Iterable): + log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select",fg="red") + log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) + raise RequirementError("generic_select() requires an iterable as option.") + + if isinstance(p_options,dict): + options = list(p_options.values()) + else: + options = list(p_options) + # check that the default value is in the list. If not it will become the first entry + if default and default not in options: + options.insert(0,default) + + # one of the drawbacks of the new interface is that in only allows string like options, so we do a conversion + # also for the default value if it exists + soptions = list(map(str,options)) + default_value = options[options.index(default)] if default else None + + selected_option = Menu( + input_text, + soptions, + skip=allow_empty_input, + multi=multi, + default_option=default_value, + sort=sort + ).run() + # we return the original objects, not the strings. + # options is the list with the original objects and soptions the list with the string values + # thru the map, we get from the value selected in soptions it index, and thu it the original object + if not selected_option: + return selected_option + elif isinstance(selected_option,list): # for multi True + selected_option = list(map(lambda x: options[soptions.index(x)],selected_option)) + else: # for multi False + selected_option = options[soptions.index(selected_option)] + return selected_option + + +def generic_multi_select(p_options :Union[list,dict], + text :str = "Select one or more of the options below: ", + sort :bool = False, + default :Any = None, + allow_empty :bool = False) -> Any: + + return generic_select(p_options, + input_text=text, + allow_empty_input=allow_empty, + sort=sort, + multi=True, + default=default) |