From e32cf71ae7dacbf9674262705cb2e8e1a5a2d206 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Thu, 6 Jan 2022 22:01:15 +0100 Subject: Added type annotations to all functions (#845) * Added type annotations for 1/5 of the files. There's bound to be some issues with type miss-match, will sort that out later. * Added type hints for 4/5 of the code * Added type hints for 4.7/5 of the code * Added type hints for 5/5 of the code base * Split the linters into individual files This should help with more clearly show which runner is breaking since they don't share a single common name any longer. Also moved mypy settings into pyproject.toml * Fixed some of the last flake8 issues * Missing parameter * Fixed invalid lookahead types * __future__ had to be at the top * Fixed last flake8 issues --- archinstall/lib/disk/blockdevice.py | 64 +++++++++-------- archinstall/lib/disk/btrfs.py | 18 +++-- archinstall/lib/disk/filesystem.py | 44 +++++++----- archinstall/lib/disk/helpers.py | 47 ++++++------ archinstall/lib/disk/partition.py | 94 +++++++++++++----------- archinstall/lib/disk/user_guides.py | 16 ++++- archinstall/lib/disk/validators.py | 6 +- archinstall/lib/exceptions.py | 2 +- archinstall/lib/general.py | 45 +++++++----- archinstall/lib/installer.py | 139 ++++++++++++++++++++++-------------- archinstall/lib/locale_helpers.py | 15 ++-- archinstall/lib/luks.py | 41 ++++++++--- archinstall/lib/mirrors.py | 15 ++-- archinstall/lib/networking.py | 27 ++++--- archinstall/lib/packages.py | 9 +-- archinstall/lib/plugins.py | 7 +- archinstall/lib/profiles.py | 69 ++++++++++-------- archinstall/lib/services.py | 2 +- archinstall/lib/systemd.py | 19 ++--- archinstall/lib/user_interaction.py | 106 +++++++++++---------------- 20 files changed, 459 insertions(+), 326 deletions(-) (limited to 'archinstall') diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 2be31375..5288f92b 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,14 +1,20 @@ +from __future__ import annotations import os import json import logging import time +from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from ..exceptions import DiskError from ..output import log from ..general import SysCommand from ..storage import storage class BlockDevice: - def __init__(self, path, info=None): + def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: from .helpers import all_disks # If we don't give any information, we need to auto-fill it. @@ -24,32 +30,32 @@ class BlockDevice: # It's actually partition-encryption, but for future-proofing this # I'm placing the encryption password on a BlockDevice level. - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str) -> str: return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})" - def __iter__(self): + def __iter__(self) -> Iterator[Partition]: for partition in self.partitions: yield self.partitions[partition] - def __getitem__(self, key, *args, **kwargs): + def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: if key not in self.info: raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] - def __len__(self): + def __len__(self) -> int: return len(self.partitions) - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :'BlockDevice') -> bool: return self.path < left_comparitor.path - def json(self): + def json(self) -> str: """ json() has precedence over __dump__, so this is a way to give less/partial information for user readability. """ return self.path - def __dump__(self): + def __dump__(self) -> Dict[str, Dict[str, Any]]: return { self.path : { 'partuuid' : self.uuid, @@ -59,14 +65,14 @@ class BlockDevice: } @property - def partition_type(self): + def partition_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['pttype'] @property - def device_or_backfile(self): + def device_or_backfile(self) -> str: """ Returns the actual device-endpoint of the BlockDevice. If it's a loop-back-device it returns the back-file, @@ -82,7 +88,7 @@ class BlockDevice: return self.device @property - def device(self): + def device(self) -> str: """ Returns the device file of the BlockDevice. If it's a loop-back-device it returns the /dev/X device, @@ -108,7 +114,7 @@ class BlockDevice: # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @property - def partitions(self): + def partitions(self) -> Dict[str, Partition]: from .filesystem import Partition self.partprobe() @@ -133,17 +139,19 @@ class BlockDevice: return {k: self.part_cache[k] for k in sorted(self.part_cache)} @property - def partition(self): + def partition(self) -> Partition: all_partitions = self.partitions return [all_partitions[k] for k in all_partitions] @property - def partition_table_type(self): + def partition_table_type(self) -> int: + # TODO: Don't hardcode :) + # Remove if we don't use this function anywhere from .filesystem import GPT return GPT @property - def uuid(self): + def uuid(self) -> str: log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') """ Returns the disk UUID as returned by lsblk. @@ -153,7 +161,7 @@ class BlockDevice: return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') @property - def size(self): + def size(self) -> float: from .helpers import convert_size_to_gb output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) @@ -162,21 +170,21 @@ class BlockDevice: return convert_size_to_gb(device['size']) @property - def bus_type(self): + def bus_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['tran'] @property - def spinning(self): + def spinning(self) -> bool: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['rota'] is True @property - def free_space(self): + def free_space(self) -> Tuple[str, str, str]: # NOTE: parted -s will default to `cancel` on prompt, skipping any partition # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, # so the free will ignore the ESP partition and just give the "free" space. @@ -187,7 +195,7 @@ class BlockDevice: yield (start, end, size) @property - def largest_free_space(self): + def largest_free_space(self) -> List[str]: info = [] for space_info in self.free_space: if not info: @@ -199,7 +207,7 @@ class BlockDevice: return info @property - def first_free_sector(self): + def first_free_sector(self) -> str: if info := self.largest_free_space: start = info[0] else: @@ -207,29 +215,29 @@ class BlockDevice: return start @property - def first_end_sector(self): + def first_end_sector(self) -> str: if info := self.largest_free_space: end = info[1] else: end = f"{self.size}GB" return end - def partprobe(self): - SysCommand(['partprobe', self.path]) + def partprobe(self) -> bool: + return SysCommand(['partprobe', self.path]).exit_code == 0 - def has_partitions(self): + def has_partitions(self) -> int: return len(self.partitions) - def has_mount_point(self, mountpoint): + def has_mount_point(self, mountpoint :str) -> bool: for partition in self.partitions: if self.partitions[partition].mountpoint == mountpoint: return True return False - def flush_cache(self): + def flush_cache(self) -> None: self.part_cache = {} - def get_partition(self, uuid): + def get_partition(self, uuid :str) -> Partition: count = 0 while count < 5: for partition_uuid, partition in self.partitions.items(): diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py index fb9712f8..084e85d2 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs.py @@ -1,7 +1,12 @@ +from __future__ import annotations import pathlib import glob import logging -from typing import Union +from typing import Union, Dict, TYPE_CHECKING + +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from ..installer import Installer from .helpers import get_mount_info from ..exceptions import DiskError from ..general import SysCommand @@ -9,7 +14,7 @@ from ..output import log from .partition import Partition -def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: +def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: """ This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name. @@ -42,7 +47,7 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0 -def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) -> bool: +def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -75,7 +80,12 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: raise DiskError(f"Could not create a subvolume at {target}: {cmd}") -def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None): +def manage_btrfs_subvolumes(installation :Installer, + partition :Dict[str, str], + mountpoints :Dict[str, str], + subvolumes :Dict[str, str], + unlocked_device :Dict[str, str] = None +) -> None: """ we do the magic with subvolumes in a centralized place parameters: * the installation object diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 51ef949b..e6e965f1 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,7 +1,13 @@ +from __future__ import annotations import time import logging import json import pathlib +from typing import Optional, Dict, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .partition import Partition from .validators import valid_fs_type from ..exceptions import DiskError @@ -16,24 +22,25 @@ class Filesystem: # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them # as well as close any crypto handles. - def __init__(self, blockdevice, mode): + def __init__(self, blockdevice :BlockDevice, mode :int): self.blockdevice = blockdevice self.mode = mode - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': return self - def __repr__(self): + def __repr__(self) -> str: return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] + SysCommand('sync') return True - def partuuid_to_index(self, uuid): + def partuuid_to_index(self, uuid :str) -> Optional[int]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() time.sleep(5) @@ -50,7 +57,7 @@ class Filesystem: raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") - def load_layout(self, layout :dict): + def load_layout(self, layout :Dict[str, Any]) -> None: from ..luks import luks2 # If the layout tells us to wipe the drive, we do so @@ -127,21 +134,21 @@ class Filesystem: log(f"Marking partition {partition['device_instance']} as bootable.") self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on') - def find_partition(self, mountpoint): + def find_partition(self, mountpoint :str) -> Partition: for partition in self.blockdevice: if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: return partition - def partprobe(self): - SysCommand(f'bash -c "partprobe"') + def partprobe(self) -> bool: + return SysCommand(f'bash -c "partprobe"').exit_code == 0 - def raw_parted(self, string: str): + def raw_parted(self, string: str) -> SysCommand: if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0: log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red") time.sleep(0.5) return cmd_handle - def parted(self, string: str): + def parted(self, string: str) -> bool: """ Performs a parted execution of the given string @@ -149,16 +156,17 @@ class Filesystem: :type string: str """ if (parted_handle := self.raw_parted(string)).exit_code == 0: - self.partprobe() - return True + if self.partprobe(): + return True + return False else: raise DiskError(f"Parted failed to add a partition: {parted_handle}") - def use_entire_disk(self, root_filesystem_type='ext4') -> Partition: + def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type, start, end, partition_format=None): + def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()} @@ -197,14 +205,14 @@ class Filesystem: log("Add partition is exiting due to excessive wait time",level=logging.INFO) raise DiskError(f"New partition never showed up after adding new partition on {self}.") - def set_name(self, partition: int, name: str): + def set_name(self, partition: int, name: str) -> bool: return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 - def set(self, partition: int, string: str): + def set(self, partition: int, string: str) -> bool: log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 - def parted_mklabel(self, device: str, disk_label: str): + def parted_mklabel(self, device: str, disk_label: str) -> bool: log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") # Try to unmount devices before attempting to run mklabel try: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index ba29744f..e9f6bc10 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -1,10 +1,15 @@ +from __future__ import annotations import json import logging import os import pathlib import re import time -from typing import Union +from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from .blockdevice import BlockDevice from ..exceptions import SysCallError, DiskError from ..general import SysCommand @@ -14,10 +19,10 @@ from ..storage import storage ROOT_DIR_PATTERN = re.compile('^.*?/devices') GIGA = 2 ** 30 -def convert_size_to_gb(size): +def convert_size_to_gb(size :Union[int, float]) -> float: return round(size / GIGA,1) -def sort_block_devices_based_on_performance(block_devices): +def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]: result = {device: 0 for device in block_devices} for device, weight in result.items(): @@ -35,12 +40,12 @@ def sort_block_devices_based_on_performance(block_devices): return result -def filter_disks_below_size_in_gb(devices, gigabytes): +def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]: for disk in devices: if disk.size >= gigabytes: yield disk -def select_largest_device(devices, gigabytes, filter_out=None): +def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -56,7 +61,7 @@ def select_largest_device(devices, gigabytes, filter_out=None): return max(copy_devices, key=(lambda device : device.size)) -def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): +def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -70,7 +75,7 @@ def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) -def convert_to_gigabytes(string): +def convert_to_gigabytes(string :str) -> float: unit = string.strip()[-1] size = float(string.strip()[:-1]) @@ -81,7 +86,7 @@ def convert_to_gigabytes(string): return size -def device_state(name, *args, **kwargs): +def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: @@ -99,7 +104,7 @@ def device_state(name, *args, **kwargs): return True # lsblk --json -l -n -o path -def all_disks(*args, **kwargs): +def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]: kwargs.setdefault("partitions", False) drives = {} @@ -113,7 +118,7 @@ def all_disks(*args, **kwargs): return drives -def harddrive(size=None, model=None, fuzzy=False): +def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: collection = all_disks() for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: @@ -133,7 +138,7 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list: bind_path = None return device_path,bind_path -def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict: +def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: device_path,bind_path = split_bind_name(path) for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: @@ -170,7 +175,7 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p return {} -def get_partitions_in_use(mountpoint) -> list: +def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: @@ -193,7 +198,7 @@ def get_partitions_in_use(mountpoint) -> list: return mounts -def get_filesystem_type(path): +def get_filesystem_type(path :str) -> Optional[str]: device_name, bind_name = split_bind_name(path) try: return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip() @@ -201,10 +206,10 @@ def get_filesystem_type(path): return None -def disk_layouts(): +def disk_layouts() -> Optional[Dict[str, Any]]: try: if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0: - return json.loads(handle.decode('UTF-8')) + return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()} else: log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow") return None @@ -216,20 +221,22 @@ def disk_layouts(): return None -def encrypted_partitions(blockdevices :dict) -> bool: +def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: for partition in blockdevices.values(): if partition.get('encrypted', False): yield partition -def find_partition_by_mountpoint(block_devices, relative_mountpoint :str): +def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: for partition in block_devices[device]['partitions']: if partition.get('mountpoint', None) == relative_mountpoint: return partition -def partprobe(): - SysCommand(f'bash -c "partprobe"') - time.sleep(5) +def partprobe() -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk + return True + return False def convert_device_to_uuid(path :str) -> str: device_name, bind_name = split_bind_name(path) diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index bb6f2d53..b8fa2b79 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -5,7 +5,8 @@ import logging import json import os import hashlib -from typing import Optional +from typing import Optional, Dict, Any, List, Union + from .blockdevice import BlockDevice from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage @@ -15,7 +16,15 @@ from ..general import SysCommand class Partition: - def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + def __init__(self, + path: str, + block_device: BlockDevice, + part_id :Optional[str] = None, + filesystem :Optional[str] = None, + mountpoint :Optional[str] = None, + encrypted :bool = False, + autodetect_filesystem :bool = True): + if not part_id: part_id = os.path.basename(path) @@ -50,14 +59,16 @@ class Partition: if self.filesystem == 'crypto_LUKS': self.encrypted = True - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path else: left_comparitor = str(left_comparitor) - return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. - def __repr__(self, *args, **kwargs): + # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 + return self.path < left_comparitor + + def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' if self.mountpoint: mount_repr = f", mounted={self.mountpoint}" @@ -69,7 +80,7 @@ class Partition: else: return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - def __dump__(self): + def __dump__(self) -> Dict[str, Any]: return { 'type': 'primary', 'PARTUUID': self._safe_uuid, @@ -86,14 +97,14 @@ class Partition: } @property - def sector_size(self): + def sector_size(self) -> Optional[int]: output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) for device in output['blockdevices']: return device.get('log-sec', None) @property - def start(self): + def start(self) -> Optional[str]: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) for partition in output.get('partitiontable', {}).get('partitions', []): @@ -101,7 +112,7 @@ class Partition: return partition['start'] # * self.sector_size @property - def end(self): + def end(self) -> Optional[str]: # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) @@ -110,7 +121,7 @@ class Partition: return partition['size'] # * self.sector_size @property - def size(self): + def size(self) -> Optional[float]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() @@ -123,7 +134,7 @@ class Partition: time.sleep(storage['DISK_TIMEOUTS']) @property - def boot(self): + def boot(self) -> bool: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) # Get the bootable flag from the sfdisk output: @@ -143,7 +154,7 @@ class Partition: return False @property - def partition_type(self): + def partition_type(self) -> Optional[str]: lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) for device in lsblk['blockdevices']: @@ -179,19 +190,19 @@ class Partition: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() @property - def encrypted(self): + def encrypted(self) -> Union[bool, None]: return self._encrypted @encrypted.setter - def encrypted(self, value: bool): + def encrypted(self, value: bool) -> None: self._encrypted = value @property - def parent(self): + def parent(self) -> str: return self.real_device @property - def real_device(self): + def real_device(self) -> str: for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): return f"/dev/{parent}" @@ -199,25 +210,27 @@ class Partition: return self.path @property - def device_path(self): + def device_path(self) -> str: """ for bind mounts returns the phisical path of the partition """ device_path, bind_name = split_bind_name(self.path) return device_path @property - def bind_name(self): + def bind_name(self) -> str: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ device_path, bind_name = split_bind_name(self.path) return bind_name - def partprobe(self): - SysCommand(f'bash -c "partprobe"') - time.sleep(1) + def partprobe(self) -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(1) + return True + return False - def detect_inner_filesystem(self, password): + def detect_inner_filesystem(self, password :str) -> Optional[str]: log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) from ..luks import luks2 @@ -227,7 +240,7 @@ class Partition: except SysCallError: return None - def has_content(self): + def has_content(self) -> bool: fs_type = get_filesystem_type(self.path) if not fs_type or "swap" in fs_type: return False @@ -248,7 +261,7 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args, **kwargs): + def encrypt(self, *args :str, **kwargs :str) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ @@ -257,7 +270,7 @@ class Partition: handle = luks2(self, None, None) return handle.encrypt(self, *args, **kwargs) - def format(self, filesystem=None, path=None, log_formatting=True, options=[]): + def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool: """ Format can be given an overriding path, for instance /dev/null to test the formatting functionality and in essence the support for the given filesystem. @@ -342,7 +355,7 @@ class Partition: return True - def find_parent_of(self, data, name, parent=None): + def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: if data['name'] == name: return parent elif 'children' in data: @@ -350,7 +363,7 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent - def mount(self, target, fs=None, options=''): + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: @@ -386,25 +399,24 @@ class Partition: self.mountpoint = target return True - def unmount(self): - try: - SysCommand(f"/usr/bin/umount {self.path}") - except SysCallError as err: - exit_code = err.exit_code - - # Without to much research, it seams that low error codes are errors. - # And above 8k is indicators such as "/dev/x not mounted.". - # So anything in between 0 and 8k are errors (?). - if 0 < exit_code < 8000: - raise err + return False + + def unmount(self) -> bool: + worker = SysCommand(f"/usr/bin/umount {self.path}") + + # Without to much research, it seams that low error codes are errors. + # And above 8k is indicators such as "/dev/x not mounted.". + # So anything in between 0 and 8k are errors (?). + if 0 < worker.exit_code < 8000: + raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) self.mountpoint = None return True - def umount(self): + def umount(self) -> bool: return self.unmount() - def filesystem_supported(self): + def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling partition.format() with a path set to '/dev/null' which returns two exceptions: @@ -420,7 +432,7 @@ class Partition: return True -def get_mount_fs_type(fs): +def get_mount_fs_type(fs :str) -> str: if fs == 'ntfs': return 'ntfs3' # Needed to use the Paragon R/W NTFS driver elif fs == 'fat32': diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 3d48c104..b0a8fe8a 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -1,8 +1,17 @@ +from __future__ import annotations import logging +from typing import Optional, Dict, Any, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to from ..output import log -def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_options=False): +def suggest_single_disk_layout(block_device :BlockDevice, + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) @@ -94,7 +103,10 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o return layout -def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_options=False): +def suggest_multi_disk_layout(block_devices :List[BlockDevice], + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index 464f0d73..fd1b7f33 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -1,4 +1,6 @@ -def valid_parted_position(pos :str): +from typing import List + +def valid_parted_position(pos :str) -> bool: if not len(pos): return False @@ -17,7 +19,7 @@ def valid_parted_position(pos :str): return False -def fs_types(): +def fs_types() -> List[str]: # https://www.gnu.org/software/parted/manual/html_node/mkpart.html # Above link doesn't agree with `man parted` /mkpart documentation: """ diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py index 6cf00026..783bc9c5 100644 --- a/archinstall/lib/exceptions.py +++ b/archinstall/lib/exceptions.py @@ -17,7 +17,7 @@ class ProfileError(BaseException): class SysCallError(BaseException): - def __init__(self, message :str, exit_code :Optional[int]) -> None: + def __init__(self, message :str, exit_code :Optional[int] = None) -> None: super(SysCallError, self).__init__(message) self.message = message self.exit_code = exit_code diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index cc50e80a..96c9d50c 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -1,3 +1,4 @@ +from __future__ import annotations import hashlib import json import logging @@ -9,7 +10,10 @@ import string import sys import time from datetime import datetime, date -from typing import Callable, Optional, Dict, Any, List, Union, Iterator +from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .installer import Installer if sys.platform == 'linux': from select import epoll, EPOLLIN, EPOLLHUP @@ -46,14 +50,14 @@ from .exceptions import RequirementError, SysCallError from .output import log from .storage import storage -def gen_uid(entropy_length=256): +def gen_uid(entropy_length :int = 256) -> str: return hashlib.sha512(os.urandom(entropy_length)).hexdigest() -def generate_password(length=64): +def generate_password(length :int = 64) -> str: haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace return ''.join(secrets.choice(haystack) for i in range(length)) -def multisplit(s, splitters): +def multisplit(s :str, splitters :List[str]) -> str: s = [s, ] for key in splitters: ns = [] @@ -77,12 +81,12 @@ def locate_binary(name :str) -> str: raise RequirementError(f"Binary {name} does not exist.") -def json_dumps(*args, **kwargs): +def json_dumps(*args :str, **kwargs :str) -> str: return json.dumps(*args, **{**kwargs, 'cls': JSON}) class JsonEncoder: @staticmethod - def _encode(obj): + def _encode(obj :Any) -> Any: """ This JSON encoder function will try it's best to convert any archinstall data structures, instances or variables into @@ -119,7 +123,7 @@ class JsonEncoder: return obj @staticmethod - def _unsafe_encode(obj): + def _unsafe_encode(obj :Any) -> Any: """ Same as _encode() but it keeps dictionary keys starting with ! """ @@ -141,20 +145,20 @@ class JSON(json.JSONEncoder, json.JSONDecoder): """ A safe JSON encoder that will omit private information in dicts (starting with !) """ - def _encode(self, obj): + def _encode(self, obj :Any) -> Any: return JsonEncoder._encode(obj) - def encode(self, obj): + def encode(self, obj :Any) -> Any: return super(JSON, self).encode(self._encode(obj)) class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder): """ UNSAFE_JSON will call/encode and keep private information in dicts (starting with !) """ - def _encode(self, obj): + def _encode(self, obj :Any) -> Any: return JsonEncoder._unsafe_encode(obj) - def encode(self, obj): + def encode(self, obj :Any) -> Any: return super(UNSAFE_JSON, self).encode(self._encode(obj)) class SysCommandWorker: @@ -455,12 +459,16 @@ class SysCommand: return None -def prerequisite_check(): - if not os.path.isdir("/sys/firmware/efi"): - raise RequirementError("Archinstall only supports machines in UEFI mode.") +def prerequisite_check() -> bool: + """ + This function is used as a safety check before + continuing with an installation. - return True + Could be anything from checking that /boot is big enough + to check if nvidia hardware exists when nvidia driver was chosen. + """ + return True def reboot(): SysCommand("/usr/bin/reboot") @@ -473,12 +481,15 @@ def pid_exists(pid: int) -> bool: return False -def run_custom_user_commands(commands, installation): +def run_custom_user_commands(commands :List[str], installation :Installer) -> None: for index, command in enumerate(commands): - log(f'Executing custom command "{command}" ...', fg='yellow') + log(f'Executing custom command "{command}" ...', level=logging.INFO) + with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script: temp_script.write(command) + execution_output = SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh") + log(execution_output) os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh") diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index c02d5717..4f46e458 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -1,5 +1,4 @@ import time -from typing import Union import logging import os import shutil @@ -7,6 +6,7 @@ import shlex import pathlib import subprocess import glob +from typing import Union, Dict, Any, List, ModuleType, Optional, Iterator, Mapping from .disk import get_partitions_in_use, Partition from .general import SysCommand, generate_password from .hardware import has_uefi, is_vm, cpu_vendor @@ -30,29 +30,29 @@ __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] class InstallationFile: - def __init__(self, installation, filename, owner, mode="w"): + def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"): self.installation = installation self.filename = filename self.owner = owner self.mode = mode self.fh = None - def __enter__(self): + def __enter__(self) -> 'InstallationFile': self.fh = open(self.filename, self.mode) return self - def __exit__(self, *args): + def __exit__(self, *args :str) -> None: self.fh.close() self.installation.chown(self.owner, self.filename) - def write(self, data: Union[str, bytes]): + def write(self, data: Union[str, bytes]) -> int: return self.fh.write(data) - def read(self, *args): + def read(self, *args) -> Union[str, bytes]: return self.fh.read(*args) - def poll(self, *args): - return self.fh.poll(*args) +# def poll(self, *args) -> bool: +# return self.fh.poll(*args) def accessibility_tools_in_use() -> bool: @@ -84,11 +84,12 @@ class Installer: """ - def __init__(self, target, *, base_packages=None, kernels=None): + def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None): if base_packages is None: base_packages = __packages__[:3] if kernels is None: kernels = ['linux'] + self.kernels = kernels self.target = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') @@ -119,18 +120,17 @@ class Installer: self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"] self.KERNEL_PARAMS = [] - def log(self, *args, level=logging.DEBUG, **kwargs): + def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str): """ installer.log() wraps output.log() mainly to set a default log-level for this install session. Any manual override can be done per log() call. """ log(*args, level=level, **kwargs) - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Installer': return self - def __exit__(self, *args, **kwargs): - # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. + def __exit__(self, *args :str, **kwargs :str) -> None: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: @@ -163,10 +163,10 @@ class Installer: return False @property - def partitions(self): + def partitions(self) -> List[Partition]: return get_partitions_in_use(self.target) - def sync_log_to_install_medium(self): + def sync_log_to_install_medium(self) -> bool: # Copy over the install log (if there is one) to the install medium if # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. if self.helper_flags.get('base-strapped', False) is True: @@ -180,7 +180,7 @@ class Installer: return True - def mount_ordered_layout(self, layouts: dict): + def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: from .luks import luks2 mountpoints = {} @@ -254,16 +254,16 @@ class Installer: except DiskError: raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).") - def mount(self, partition, mountpoint, create_mountpoint=True): + def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None: if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'): os.makedirs(f'{self.target}{mountpoint}') partition.mount(f'{self.target}{mountpoint}') - def post_install_check(self, *args, **kwargs): + def post_install_check(self, *args :str, **kwargs :str) -> List[bool]: return [step for step, flag in self.helper_flags.items() if flag is False] - def pacstrap(self, *packages, **kwargs): + def pacstrap(self, *packages :str, **kwargs :str) -> bool: if type(packages[0]) in (list, tuple): packages = packages[0] @@ -284,7 +284,7 @@ class Installer: else: self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO) - def set_mirrors(self, mirrors): + def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): if result := plugin.on_mirrors(mirrors): @@ -292,7 +292,7 @@ class Installer: return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') - def genfstab(self, flags='-pU'): + def genfstab(self, flags :str = '-pU') -> bool: self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: @@ -307,11 +307,11 @@ class Installer: return True - def set_hostname(self, hostname: str, *args, **kwargs): + def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: with open(f'{self.target}/etc/hostname', 'w') as fh: fh.write(hostname + '\n') - def set_locale(self, locale, encoding='UTF-8', *args, **kwargs): + def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool: if not len(locale): return True @@ -322,7 +322,7 @@ class Installer: return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False - def set_timezone(self, zone, *args, **kwargs): + def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool: if not zone: return True if not len(zone): @@ -337,6 +337,7 @@ class Installer: (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') return True + else: self.log( f"Time zone {zone} does not exist, continuing with system default.", @@ -344,11 +345,13 @@ class Installer: fg='red' ) - def activate_ntp(self): + return False + + def activate_ntp(self) -> None: log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO) self.activate_time_syncronization() - def activate_time_syncronization(self): + def activate_time_syncronization(self) -> None: self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO) self.enable_service('systemd-timesyncd') @@ -361,11 +364,11 @@ class Installer: with Boot(self) as session: session.SysCommand(["timedatectl", "set-ntp", 'true']) - def enable_espeakup(self): + def enable_espeakup(self) -> None: self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO) self.enable_service('espeakup') - def enable_service(self, *services): + def enable_service(self, *services :str) -> None: for service in services: self.log(f'Enabling service {service}', level=logging.INFO) if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0: @@ -375,19 +378,27 @@ class Installer: if hasattr(plugin, 'on_service'): plugin.on_service(service) - def run_command(self, cmd, *args, **kwargs): + def run_command(self, cmd :str, *args :str, **kwargs :str) -> None: return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}') - def arch_chroot(self, cmd, run_as=None): + def arch_chroot(self, cmd :str, run_as :Optional[str] = None): if run_as: cmd = f"su - {run_as} -c {shlex.quote(cmd)}" return self.run_command(cmd) - def drop_to_shell(self): + def drop_to_shell(self) -> None: subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) - def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs): + def configure_nic(self, + nic :str, + dhcp :bool = True, + ip :Optional[str] = None, + gateway :Optional[str] = None, + dns :Optional[str] = None, + *args :str, + **kwargs :str + ) -> None: from .systemd import Networkd if dhcp: @@ -412,7 +423,7 @@ class Installer: with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf: netconf.write(str(conf)) - def copy_iso_network_config(self, enable_services=False): + def copy_iso_network_config(self, enable_services :bool = False) -> bool: # Copy (if any) iwd password and config files if os.path.isdir('/var/lib/iwd/'): if psk_files := glob.glob('/var/lib/iwd/*.psk'): @@ -427,7 +438,7 @@ class Installer: # This function will be called after minimal_installation() # as a hook for post-installs. This hook is only needed if # base is not installed yet. - def post_install_enable_iwd_service(*args, **kwargs): + def post_install_enable_iwd_service(*args :str, **kwargs :str): self.enable_service('iwd') self.post_base_install.append(post_install_enable_iwd_service) @@ -452,7 +463,7 @@ class Installer: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: - def post_install_enable_networkd_resolved(*args, **kwargs): + def post_install_enable_networkd_resolved(*args :str, **kwargs :str): self.enable_service('systemd-networkd', 'systemd-resolved') self.post_base_install.append(post_install_enable_networkd_resolved) @@ -462,7 +473,7 @@ class Installer: return True - def detect_encryption(self, partition): + def detect_encryption(self, partition :Partition) -> bool: part = Partition(partition.parent, None, autodetect_filesystem=True) if partition.encrypted: return partition @@ -471,7 +482,7 @@ class Installer: return False - def mkinitcpio(self, *flags): + def mkinitcpio(self, *flags :str) -> bool: for plugin in plugins.values(): if hasattr(plugin, 'on_mkinitcpio'): # Allow plugins to override the usage of mkinitcpio altogether. @@ -483,9 +494,10 @@ class Installer: mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") - SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') - def minimal_installation(self): + return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0 + + def minimal_installation(self) -> bool: # Add necessary packages if encrypting the drive # (encrypted partitions default to btrfs for now, so we need btrfs-progs) # TODO: Perhaps this should be living in the function which dictates @@ -562,7 +574,7 @@ class Installer: return True - def setup_swap(self, kind='zram'): + def setup_swap(self, kind :str = 'zram') -> bool: if kind == 'zram': self.log(f"Setting up swap on zram") self.pacstrap('zram-generator') @@ -578,7 +590,18 @@ class Installer: else: raise ValueError(f"Archinstall currently only supports setting up swap on zram") - def add_bootloader(self, bootloader='systemd-bootctl'): + def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool: + """ + Adds a bootloader to the installation instance. + Archinstall supports one of three types: + * systemd-bootctl + * grub + * efistub (beta) + + :param bootloader: Can be one of the three strings + 'systemd-bootctl', 'grub' or 'efistub' (beta) + """ + for plugin in plugins.values(): if hasattr(plugin, 'on_add_bootloader'): # Allow plugins to override the boot-loader handling. @@ -757,10 +780,19 @@ class Installer: return True - def add_additional_packages(self, *packages): + def add_additional_packages(self, *packages :str) -> bool: return self.pacstrap(*packages) - def install_profile(self, profile): + def install_profile(self, profile :str) -> ModuleType: + """ + Installs a archinstall profile script (.py file). + This profile can be either local, remote or part of the library. + + :param profile: Can be a local path or a remote path (URL) + :return: Returns the imported script as a module, this way + you can access any remaining functions exposed by the profile. + :rtype: module + """ storage['installation_session'] = self if type(profile) == str: @@ -769,13 +801,13 @@ class Installer: self.log(f'Installing network profile {profile}', level=logging.INFO) return profile.install() - def enable_sudo(self, entity: str, group=False): + def enable_sudo(self, entity: str, group :bool = False) -> bool: self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) with open(f'{self.target}/etc/sudoers', 'a') as sudoers: sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') return True - def user_create(self, user: str, password=None, groups=None, sudo=False): + def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None: if groups is None: groups = [] @@ -789,7 +821,8 @@ class Installer: if not handled_by_plugin: self.log(f'Creating user {user}', level=logging.INFO) - SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') + if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0: + raise SystemError(f"Could not create user inside installation: {output}") for plugin in plugins.values(): if hasattr(plugin, 'on_user_created'): @@ -806,24 +839,24 @@ class Installer: if sudo and self.enable_sudo(user): self.helper_flags['user'] = True - def user_set_pw(self, user, password): + def user_set_pw(self, user :str, password :str) -> bool: self.log(f'Setting password for {user}', level=logging.INFO) if user == 'root': # This means the root account isn't locked/disabled with * in /etc/passwd self.helper_flags['user'] = True - SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"") + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"").exit_code == 0 - def user_set_shell(self, user, shell): + def user_set_shell(self, user :str, shell :str) -> bool: self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) - SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0 - def chown(self, owner, path, options=[]): - return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}") + def chown(self, owner :str, path :str, options :List[str] = []) -> bool: + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0 - def create_file(self, filename, owner=None): + def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: return InstallationFile(self, filename, owner) def set_keyboard_language(self, language: str) -> bool: diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py index ad85ea1b..6aa678a6 100644 --- a/archinstall/lib/locale_helpers.py +++ b/archinstall/lib/locale_helpers.py @@ -1,41 +1,42 @@ import logging +from typing import Iterator from .exceptions import ServiceException from .general import SysCommand from .output import log -def list_keyboard_languages(): +def list_keyboard_languages() -> Iterator[str]: for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}): yield line.decode('UTF-8').strip() -def list_x11_keyboard_languages(): +def list_x11_keyboard_languages() -> Iterator[str]: for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}): yield line.decode('UTF-8').strip() -def verify_keyboard_layout(layout): +def verify_keyboard_layout(layout :str) -> bool: for language in list_keyboard_languages(): if layout.lower() == language.lower(): return True return False -def verify_x11_keyboard_layout(layout): +def verify_x11_keyboard_layout(layout :str) -> bool: for language in list_x11_keyboard_languages(): if layout.lower() == language.lower(): return True return False -def search_keyboard_layout(layout): +def search_keyboard_layout(layout :str) -> Iterator[str]: for language in list_keyboard_languages(): if layout.lower() in language.lower(): yield language -def set_keyboard_language(locale): +def set_keyboard_language(locale :str) -> bool: if len(locale.strip()): if not verify_keyboard_layout(locale): log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR) @@ -49,6 +50,6 @@ def set_keyboard_language(locale): return False -def list_timezones(): +def list_timezones() -> Iterator[str]: for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}): yield line.decode('UTF-8').strip() diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 255c75d9..26f2bc1b 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -1,9 +1,15 @@ +from __future__ import annotations import json import logging import os import pathlib import shlex import time +from typing import Optional, List,TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .installer import Installer + from .disk import Partition, convert_device_to_uuid from .general import SysCommand, SysCommandWorker from .output import log @@ -11,7 +17,15 @@ from .exceptions import SysCallError, DiskError from .storage import storage class luks2: - def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs): + def __init__(self, + partition :Partition, + mountpoint :str, + password :str, + key_file :Optional[str] = None, + auto_unmount :bool = False, + *args :str, + **kwargs :str): + self.password = password self.partition = partition self.mountpoint = mountpoint @@ -22,7 +36,7 @@ class luks2: self.filesystem = 'crypto_LUKS' self.mapdev = None - def __enter__(self): + def __enter__(self) -> Partition: if not self.key_file: self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? @@ -34,16 +48,23 @@ class luks2: return self.unlock(self.partition, self.mountpoint, self.key_file) - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if self.auto_unmount: self.close() if len(args) >= 2 and args[1]: raise args[1] + return True - def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None): + def encrypt(self, partition :Partition, + password :Optional[str] = None, + key_size :int = 512, + hash_type :str = 'sha512', + iter_time :int = 10000, + key_file :Optional[str] = None) -> str: + log(f'Encrypting {partition} (This might take a while)', level=logging.INFO) if not key_file: @@ -119,7 +140,7 @@ class luks2: return key_file - def unlock(self, partition, mountpoint, key_file): + def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition: """ Mounts a luks2 compatible partition to a certain mountpoint. Keyfile must be specified as there's no way to interact with the pw-prompt atm. @@ -142,18 +163,18 @@ class luks2: unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False) return unlocked_partition - def close(self, mountpoint=None): + def close(self, mountpoint :Optional[str] = None) -> bool: if not mountpoint: mountpoint = self.mapdev SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}') return os.path.islink(self.mapdev) is False - def format(self, path): + def format(self, path :str) -> None: if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') - def add_key(self, path :pathlib.Path, password :str): + def add_key(self, path :pathlib.Path, password :str) -> bool: if not path.exists(): raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path)) @@ -169,7 +190,9 @@ class luks2: if worker.exit_code != 0: raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}') - def crypttab(self, installation, key_path :str, options=["luks", "key-slot=1"]): + return True + + def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None: log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO) with open(f"{installation.target}/etc/crypttab", "a") as crypttab: crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n") diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index 5fad6cb6..6b6bfed4 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,7 +1,7 @@ import logging import urllib.error import urllib.request -from typing import Union, Mapping, Iterable +from typing import Union, Mapping, Iterable, Dict, Any, List from .general import SysCommand from .output import log @@ -51,7 +51,12 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: return new_raw_data -def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', sort_order=["https", "http"], *args, **kwargs) -> Union[bool, bytes]: +def filter_mirrors_by_region(regions :str, + destination :str = '/etc/pacman.d/mirrorlist', + sort_order :List[str] = ["https", "http"], + *args :str, + **kwargs :str +) -> Union[bool, bytes]: """ This function will change the active mirrors on the live medium by filtering which regions are active based on `regions`. @@ -75,7 +80,7 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', so return new_list.decode('UTF-8') -def add_custom_mirrors(mirrors: list, *args, **kwargs): +def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool: """ This will append custom mirror definitions in pacman.conf @@ -91,7 +96,7 @@ def add_custom_mirrors(mirrors: list, *args, **kwargs): return True -def insert_mirrors(mirrors, *args, **kwargs): +def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool: """ This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`. It will not flush any other mirrors, just insert new ones. @@ -138,7 +143,7 @@ def re_rank_mirrors( return True -def list_mirrors(sort_order=["https", "http"]): +def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" regions = {} diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index 0d94572a..6b09deba 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -2,7 +2,7 @@ import logging import os import socket import struct -from collections import OrderedDict +from typing import Union, Dict, Any, List from .exceptions import HardwareIncompatibilityError from .general import SysCommand @@ -10,36 +10,40 @@ from .output import log from .storage import storage -def get_hw_addr(ifname): +def get_hw_addr(ifname :str) -> str: import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) return ':'.join('%02x' % b for b in info[18:24]) -def list_interfaces(skip_loopback=True): - interfaces = OrderedDict() +def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: + interfaces = {} + for index, iface in socket.if_nameindex(): if skip_loopback and iface == "lo": continue mac = get_hw_addr(iface).replace(':', '-').lower() interfaces[mac] = iface + return interfaces -def check_mirror_reachable(): +def check_mirror_reachable() -> bool: log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO) if SysCommand("pacman -Sy").exit_code == 0: return True + elif os.geteuid() != 0: log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red") return False -def enrich_iface_types(interfaces: dict): +def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str, str]: result = {} + for iface in interfaces: if os.path.isdir(f"/sys/class/net/{iface}/bridge/"): result[iface] = 'BRIDGE' @@ -53,19 +57,21 @@ def enrich_iface_types(interfaces: dict): result[iface] = 'PHYSICAL' else: result[iface] = 'UNKNOWN' + return result -def get_interface_from_mac(mac): +def get_interface_from_mac(mac :str) -> str: return list_interfaces().get(mac.lower(), None) -def wireless_scan(interface): +def wireless_scan(interface :str) -> None: interfaces = enrich_iface_types(list_interfaces().values()) if interfaces[interface] != 'WIRELESS': raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}") - SysCommand(f"iwctl station {interface} scan") + if not (output := SysCommand(f"iwctl station {interface} scan")).exit_code == 0: + raise SystemError(f"Could not scan for wireless networks: {output}") if '_WIFI' not in storage: storage['_WIFI'] = {} @@ -76,8 +82,9 @@ def wireless_scan(interface): # TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25 -def get_wireless_networks(interface): +def get_wireless_networks(interface :str) -> None: # TODO: Make this oneliner pritter to check if the interface is scanning or not. + # TODO: Rename this to list_wireless_networks() as it doesn't return anything if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False: import time diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py index ffc44cbe..1d46ef5e 100644 --- a/archinstall/lib/packages.py +++ b/archinstall/lib/packages.py @@ -3,6 +3,7 @@ import ssl import urllib.error import urllib.parse import urllib.request +from typing import Dict, Any from .exceptions import RequirementError @@ -10,7 +11,7 @@ BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}' BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/' -def find_group(name): +def find_group(name :str) -> bool: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE @@ -27,7 +28,7 @@ def find_group(name): return True -def find_package(name): +def find_package(name :str) -> Any: """ Finds a specific package via the package database. It makes a simple web-request, which might be a bit slow. @@ -40,7 +41,7 @@ def find_package(name): return json.loads(data) -def find_packages(*names): +def find_packages(*names :str) -> Dict[str, Any]: """ This function returns the search results for many packages. The function itself is rather slow, so consider not sending to @@ -49,7 +50,7 @@ def find_packages(*names): return {package: find_package(package) for package in names} -def validate_package_list(packages: list): +def validate_package_list(packages: list) -> bool: """ Validates a list of given packages. Raises `RequirementError` if one or more packages are not found. diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index 027b58d5..e61c114e 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -7,6 +7,7 @@ import pathlib import urllib.parse import urllib.request from importlib import metadata +from typing import ModuleType, Optional, List from .output import log from .storage import storage @@ -38,7 +39,7 @@ def localize_path(profile_path :str) -> str: return profile_path -def import_via_path(path :str, namespace=None): # -> module (not sure how to write that in type definitions) +def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType: if not namespace: namespace = os.path.basename(path) @@ -62,14 +63,14 @@ def import_via_path(path :str, namespace=None): # -> module (not sure how to wri except: pass -def find_nth(haystack, needle, n): +def find_nth(haystack :List[str], needle :str, n :int) -> int: start = haystack.find(needle) while start >= 0 and n > 1: start = haystack.find(needle, start + len(needle)) n -= 1 return start -def load_plugin(path :str): # -> module (not sure how to write that in type definitions) +def load_plugin(path :str) -> ModuleType: parsed_url = urllib.parse.urlparse(path) # The Profile was not a direct match on a remote URL diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py index 7d5373c5..6b0e69bf 100644 --- a/archinstall/lib/profiles.py +++ b/archinstall/lib/profiles.py @@ -1,3 +1,4 @@ +from __future__ import annotations import hashlib import importlib.util import json @@ -8,7 +9,10 @@ import sys import urllib.error import urllib.parse import urllib.request -from typing import Optional +from typing import Optional, ModuleType, Dict, Union, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .installer import Installer from .general import multisplit from .networking import list_interfaces @@ -16,16 +20,16 @@ from .storage import storage from .exceptions import ProfileNotFound -def grab_url_data(path): +def grab_url_data(path :str) -> str: safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))]) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE response = urllib.request.urlopen(safe_path, context=ssl_context) - return response.read() + return response.read() # bytes? -def is_desktop_profile(profile) -> bool: +def is_desktop_profile(profile :str) -> bool: if str(profile) == 'Profile(desktop)': return True @@ -42,8 +46,13 @@ def is_desktop_profile(profile) -> bool: return False -def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False): +def list_profiles( + filter_irrelevant_macs :bool = True, + subpath :str = '', + filter_top_level_profiles :bool = False +) -> Dict[str, Dict[str, Union[str, bool]]]: # TODO: Grab from github page as well, not just local static files + if filter_irrelevant_macs: local_macs = list_interfaces() @@ -101,23 +110,27 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof class Script: - def __init__(self, profile, installer=None): - # profile: https://hvornum.se/something.py - # profile: desktop - # profile: /path/to/profile.py + def __init__(self, profile :str, installer :Optional[Installer] = None): + """ + :param profile: A string representing either a boundled profile, a local python file + or a remote path (URL) to a python script-profile. Three examples: + * profile: https://archlinux.org/some_profile.py + * profile: desktop + * profile: /path/to/profile.py + """ self.profile = profile - self.installer = installer + self.installer = installer # TODO: Appears not to be used anymore? self.converted_path = None self.spec = None self.examples = None self.namespace = os.path.splitext(os.path.basename(self.path))[0] self.original_namespace = self.namespace - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> ModuleType: self.execute() return sys.modules[self.namespace] - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> None: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] @@ -125,7 +138,7 @@ class Script: if self.original_namespace: self.namespace = self.original_namespace - def localize_path(self, profile_path): + def localize_path(self, profile_path :str) -> str: if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'): if not self.converted_path: self.converted_path = f"/tmp/{os.path.basename(self.profile).replace('.py', '')}_{hashlib.md5(os.urandom(12)).hexdigest()}.py" @@ -138,7 +151,7 @@ class Script: return profile_path @property - def path(self): + def path(self) -> str: parsed_url = urllib.parse.urlparse(self.profile) # The Profile was not a direct match on a remote URL @@ -163,7 +176,7 @@ class Script: else: raise ProfileNotFound(f"Cannot handle scheme {parsed_url.scheme}") - def load_instructions(self, namespace=None): + def load_instructions(self, namespace :Optional[str] = None) -> 'Script': if namespace: self.namespace = namespace @@ -173,7 +186,7 @@ class Script: return self - def execute(self): + def execute(self) -> ModuleType: if self.namespace not in sys.modules or self.spec is None: self.load_instructions() @@ -183,25 +196,23 @@ class Script: class Profile(Script): - def __init__(self, installer, path, args=None): + def __init__(self, installer :Installer, path :str): super(Profile, self).__init__(path, installer) - if args is None: - args = {} - def __dump__(self, *args, **kwargs): + def __dump__(self, *args :str, **kwargs :str) -> Dict[str, str]: return {'path': self.path} - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str) -> str: return f'Profile({os.path.basename(self.profile)})' - def install(self): + def install(self) -> ModuleType: # Before installing, revert any temporary changes to the namespace. # This ensures that the namespace during installation is the original initiation namespace. # (For instance awesome instead of aweosme.py or app-awesome.py) self.namespace = self.original_namespace return self.execute() - def has_prep_function(self): + def has_prep_function(self) -> bool: with open(self.path, 'r') as source: source_data = source.read() @@ -218,7 +229,7 @@ class Profile(Script): return True return False - def has_post_install(self): + def has_post_install(self) -> bool: with open(self.path, 'r') as source: source_data = source.read() @@ -234,7 +245,7 @@ class Profile(Script): if hasattr(imported, '_post_install'): return True - def is_top_level_profile(self): + def is_top_level_profile(self) -> bool: with open(self.path, 'r') as source: source_data = source.read() @@ -247,7 +258,7 @@ class Profile(Script): # since developers like less code - omitting it should assume they want to present it. return True - def get_profile_description(self): + def get_profile_description(self) -> str: with open(self.path, 'r') as source: source_data = source.read() @@ -282,11 +293,11 @@ class Profile(Script): class Application(Profile): - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str): return f'Application({os.path.basename(self.profile)})' @property - def path(self): + def path(self) -> str: parsed_url = urllib.parse.urlparse(self.profile) # The Profile was not a direct match on a remote URL @@ -311,7 +322,7 @@ class Application(Profile): else: raise ProfileNotFound(f"Application cannot handle scheme {parsed_url.scheme}") - def install(self): + def install(self) -> ModuleType: # Before installing, revert any temporary changes to the namespace. # This ensures that the namespace during installation is the original initiation namespace. # (For instance awesome instead of aweosme.py or app-awesome.py) diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py index d295bdbb..b177052b 100644 --- a/archinstall/lib/services.py +++ b/archinstall/lib/services.py @@ -2,7 +2,7 @@ import os from .general import SysCommand -def service_state(service_name: str): +def service_state(service_name: str) -> str: if os.path.splitext(service_name)[1] != '.service': service_name += '.service' # Just to be safe diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py index c3beafc0..74229fae 100644 --- a/archinstall/lib/systemd.py +++ b/archinstall/lib/systemd.py @@ -1,5 +1,6 @@ import logging import time +from typing import Interator from .exceptions import SysCallError from .general import SysCommand, SysCommandWorker, locate_binary from .installer import Installer @@ -8,14 +9,14 @@ from .storage import storage class Ini: - def __init__(self, *args, **kwargs): + def __init__(self, *args :str, **kwargs :str): """ Limited INI handler for now. Supports multiple keywords through dictionary list items. """ self.kwargs = kwargs - def __str__(self): + def __str__(self) -> str: result = '' first_row_done = False for top_level in self.kwargs: @@ -54,7 +55,7 @@ class Boot: self.session = None self.ready = False - def __enter__(self): + def __enter__(self) -> 'Boot': if (existing_session := storage.get('active_boot', None)) and existing_session.instance != self.instance: raise KeyError("Archinstall only supports booting up one instance, and a active session is already active and it is not this one.") @@ -81,7 +82,7 @@ class Boot: storage['active_boot'] = self return self - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> None: # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager @@ -98,24 +99,24 @@ class Boot: else: raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {shutdown}", exit_code=shutdown.exit_code) - def __iter__(self): + def __iter__(self) -> Interator[str]: if self.session: for value in self.session: yield value - def __contains__(self, key: bytes): + def __contains__(self, key: bytes) -> bool: if self.session is None: return False return key in self.session - def is_alive(self): + def is_alive(self) -> bool: if self.session is None: return False return self.session.is_alive() - def SysCommand(self, cmd: list, *args, **kwargs): + def SysCommand(self, cmd: list, *args, **kwargs) -> SysCommand: if cmd[0][0] != '/' and cmd[0][:2] != './': # This check is also done in SysCommand & SysCommandWorker. # However, that check is done for `machinectl` and not for our chroot command. @@ -125,7 +126,7 @@ class Boot: return SysCommand(["systemd-run", f"--machine={self.container_name}", "--pty", *cmd], *args, **kwargs) - def SysCommandWorker(self, cmd: list, *args, **kwargs): + def SysCommandWorker(self, cmd: list, *args, **kwargs) -> SysCommandWorker: if cmd[0][0] != '/' and cmd[0][:2] != './': cmd[0] = locate_binary(cmd[0]) diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index 6298db19..df53ce49 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -1,3 +1,4 @@ +from __future__ import annotations import getpass import ipaddress import logging @@ -7,6 +8,11 @@ import shutil import signal import sys import time +from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING + +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .disk.partition import Partition from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_disks from .exceptions import RequirementError, UserError, DiskError @@ -23,20 +29,20 @@ from .mirrors import list_mirrors # Some return the keys from the options, some the values? from .. import fs_types - -def get_terminal_height(): +# TODO: These can be removed after the move to simple_menu.py +def get_terminal_height() -> int: return shutil.get_terminal_size().lines -def get_terminal_width(): +def get_terminal_width() -> int: return shutil.get_terminal_size().columns -def get_longest_option(options): +def get_longest_option(options :List[Any]) -> int: return max([len(x) for x in options]) -def check_for_correct_username(username): +def check_for_correct_username(username :str) -> bool: if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32: return True log( @@ -47,14 +53,14 @@ def check_for_correct_username(username): return False -def do_countdown(): +def do_countdown() -> bool: SIG_TRIGGER = False - def kill_handler(sig, frame): + def kill_handler(sig :int, frame :Any) -> None: print() exit(0) - def sig_handler(sig, frame): + def sig_handler(sig :int, frame :Any) -> None: global SIG_TRIGGER SIG_TRIGGER = True signal.signal(signal.SIGINT, kill_handler) @@ -79,12 +85,14 @@ def do_countdown(): sys.stdin.read() SIG_TRIGGER = False signal.signal(signal.SIGINT, sig_handler) + print() signal.signal(signal.SIGINT, original_sigint_handler) + return True -def get_password(prompt="Enter a password: "): +def get_password(prompt :str = "Enter a password: ") -> Optional[str]: while passwd := getpass.getpass(prompt): passwd_verification = getpass.getpass(prompt='And one more time for verification: ') if passwd != passwd_verification: @@ -98,7 +106,7 @@ def get_password(prompt="Enter a password: "): return None -def print_large_list(options, padding=5, margin_bottom=0, separator=': '): +def print_large_list(options :List[str], padding :int = 5, margin_bottom :int = 0, separator :str = ': ') -> List[int]: highest_index_number_length = len(str(len(options))) longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding spaces_without_option = longest_line - (len(separator) + highest_index_number_length) @@ -136,6 +144,7 @@ def select_encrypted_partitions(block_devices :dict, password :str) -> dict: # Users might want to single out a partition for non-encryption to share between dualboot etc. +# TODO: This can be removed once we have simple_menu everywhere class MiniCurses: def __init__(self, width, height): self.width = width @@ -255,11 +264,11 @@ class MiniCurses: return response -def ask_for_swap(prompt='Would you like to use swap on zram? (Y/n): ', forced=False): +def ask_for_swap(prompt :str = 'Would you like to use swap on zram? (Y/n): ', forced :bool = False) -> bool: return True if input(prompt).strip(' ').lower() not in ('n', 'no') else False -def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False): +def ask_for_superuser_account(prompt :str = 'Username for required superuser with sudo privileges: ', forced :bool = False) -> Dict[str, Dict[str, str]]: while 1: new_user = input(prompt).strip(' ') @@ -277,7 +286,7 @@ def ask_for_superuser_account(prompt='Username for required superuser with sudo return {new_user: {"!password": password}} -def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '): +def ask_for_additional_users(prompt :str = 'Any additional users to install (leave blank for no users): ') -> List[Dict[str, Dict[str, str]]]: users = {} superusers = {} @@ -297,7 +306,7 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan return users, superusers -def ask_for_a_timezone(): +def ask_for_a_timezone() -> str: timezones = list_timezones() default = 'UTC' @@ -311,7 +320,7 @@ def ask_for_a_timezone(): return selected_tz -def ask_for_bootloader(advanced_options=False) -> str: +def ask_for_bootloader(advanced_options :bool = False) -> str: bootloader = "systemd-bootctl" if has_uefi() else "grub-install" if has_uefi(): if not advanced_options: @@ -333,14 +342,14 @@ def ask_for_bootloader(advanced_options=False) -> str: return bootloader -def ask_for_audio_selection(desktop=True): +def ask_for_audio_selection(desktop :bool = True) -> str: audio = 'pipewire' if desktop else 'none' choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none'] selected_audio = Menu(f'Choose an audio server or leave blank to use "{audio}"', choices, default_option=audio).run() return selected_audio -def ask_to_configure_network(): +def ask_to_configure_network() -> Dict[str, Any]: # Optionally configure one network interface. # while 1: # {MAC: Ifname} @@ -435,7 +444,7 @@ def ask_for_main_filesystem_format(advanced_options=False): return Menu('Select which filesystem your main partition should use', options, skip=False).run() -def current_partition_layout(partitions, with_idx=False): +def current_partition_layout(partitions :List[Partition], with_idx :bool = False) -> Dict[str, Any]: def do_padding(name, max_len): spaces = abs(len(str(name)) - max_len) + 2 pad_left = int(spaces / 2) @@ -479,7 +488,7 @@ def current_partition_layout(partitions, with_idx=False): return f'\n\nCurrent partition layout:\n\n{current_layout}' -def select_partition(title, partitions, multiple=False): +def select_partition(title :str, partitions :List[Partition], multiple :bool = False) -> Union[int, List[int], None]: partition_indexes = list(map(str, range(len(partitions)))) partition = Menu(title, partition_indexes, multi=multiple).run() @@ -491,47 +500,18 @@ def select_partition(title, partitions, multiple=False): return None -def get_default_partition_layout(block_devices, advanced_options=False): +def get_default_partition_layout( + block_devices :Union[BlockDevice, List[BlockDevice]], + advanced_options :bool = False +) -> Dict[str, Any]: + if len(block_devices) == 1: return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options) else: return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) -def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: - # if has_uefi(): - # partition_type = 'gpt' - # else: - # partition_type = 'msdos' - - # log(f"Selecting which partitions to re-use on {block_device}...", fg="yellow", level=logging.INFO) - # partitions = generic_multi_select(block_device.partitions.values(), "Select which partitions to re-use (the rest will be left alone): ", sort=True) - # partitions_to_wipe = generic_multi_select(partitions, "Which partitions do you wish to wipe (multiple can be selected): ", sort=True) - - # mountpoints = {} - # struct = { - # "partitions" : [] - # } - # for partition in partitions: - # mountpoint = input(f"Select a mountpoint (or skip) for {partition}: ").strip() - - # part_struct = {} - # if mountpoint: - # part_struct['mountpoint'] = mountpoint - # if mountpoint == '/boot': - # part_struct['boot'] = True - # if has_uefi(): - # part_struct['ESP'] = True - # elif mountpoint == '/' and - # if partition.uuid: - # part_struct['PARTUUID'] = partition.uuid - # if partition in partitions_to_wipe: - # part_struct['wipe'] = True - - # struct['partitions'].append(part_struct) - - # return struct - +def manage_new_and_existing_partitions(block_device :BlockDevice) -> Dict[str, Any]: block_device_struct = { "partitions": [partition.__dump__() for partition in block_device.partitions.values()] } @@ -689,7 +669,7 @@ def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict: return block_device_struct -def select_individual_blockdevice_usage(block_devices: list): +def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: result = {} for device in block_devices: @@ -700,7 +680,7 @@ def select_individual_blockdevice_usage(block_devices: list): return result -def select_disk_layout(block_devices :list, advanced_options=False): +def select_disk_layout(block_devices :list, advanced_options=False) -> Dict[str, Any]: modes = [ "Wipe all selected drives and use a best-effort default partition layout", "Select what to do with each individual drive (followed by partition usage)" @@ -714,7 +694,7 @@ def select_disk_layout(block_devices :list, advanced_options=False): return select_individual_blockdevice_usage(block_devices) -def select_disk(dict_o_disks): +def select_disk(dict_o_disks :Dict[str, BlockDevice]) -> BlockDevice: """ Asks the user to select a harddrive from the `dict_o_disks` selection. Usually this is combined with :ref:`archinstall.list_drives`. @@ -742,7 +722,7 @@ def select_disk(dict_o_disks): raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.') -def select_profile(): +def select_profile() -> Optional[str]: """ # Asks the user to select a profile from the available profiles. # @@ -770,7 +750,7 @@ def select_profile(): return None -def select_language(): +def select_language() -> str: """ Asks the user to select a language Usually this is combined with :ref:`archinstall.list_keyboard_languages`. @@ -788,7 +768,7 @@ def select_language(): return selected_lang -def select_mirror_regions(): +def select_mirror_regions() -> Dict[str, Any]: """ Asks the user to select a mirror or region Usually this is combined with :ref:`archinstall.list_mirrors`. @@ -810,7 +790,7 @@ def select_mirror_regions(): return {} -def select_harddrives(): +def select_harddrives() -> Optional[str]: """ Asks the user to select one or multiple hard drives @@ -832,7 +812,7 @@ def select_harddrives(): return None -def select_driver(options=AVAILABLE_GFX_DRIVERS): +def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str: """ Some what convoluted function, whose job is simple. Select a graphics driver from a pre-defined set of popular options. @@ -866,7 +846,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS): raise RequirementError("Selecting drivers require a least one profile to be given as an option.") -def select_kernel(): +def select_kernel() -> List[str]: """ Asks the user to select a kernel for system. -- cgit v1.2.3-70-g09d2