index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/blockdevice.py | 64 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs.py | 96 | ||||
-rw-r--r-- | archinstall/lib/disk/filesystem.py | 59 | ||||
-rw-r--r-- | archinstall/lib/disk/helpers.py | 47 | ||||
-rw-r--r-- | archinstall/lib/disk/partition.py | 99 | ||||
-rw-r--r-- | archinstall/lib/disk/user_guides.py | 84 | ||||
-rw-r--r-- | archinstall/lib/disk/validators.py | 6 |
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 2be31375..5288f92b 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,14 +1,20 @@ +from __future__ import annotations import os import json import logging import time +from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from ..exceptions import DiskError from ..output import log from ..general import SysCommand from ..storage import storage class BlockDevice: - def __init__(self, path, info=None): + def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: from .helpers import all_disks # If we don't give any information, we need to auto-fill it. @@ -24,32 +30,32 @@ class BlockDevice: # It's actually partition-encryption, but for future-proofing this # I'm placing the encryption password on a BlockDevice level. - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str) -> str: return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})" - def __iter__(self): + def __iter__(self) -> Iterator[Partition]: for partition in self.partitions: yield self.partitions[partition] - def __getitem__(self, key, *args, **kwargs): + def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: if key not in self.info: raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] - def __len__(self): + def __len__(self) -> int: return len(self.partitions) - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :'BlockDevice') -> bool: return self.path < left_comparitor.path - def json(self): + def json(self) -> str: """ json() has precedence over __dump__, so this is a way to give less/partial information for user readability. """ return self.path - def __dump__(self): + def __dump__(self) -> Dict[str, Dict[str, Any]]: return { self.path : { 'partuuid' : self.uuid, @@ -59,14 +65,14 @@ class BlockDevice: } @property - def partition_type(self): + def partition_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['pttype'] @property - def device_or_backfile(self): + def device_or_backfile(self) -> str: """ Returns the actual device-endpoint of the BlockDevice. If it's a loop-back-device it returns the back-file, @@ -82,7 +88,7 @@ class BlockDevice: return self.device @property - def device(self): + def device(self) -> str: """ Returns the device file of the BlockDevice. If it's a loop-back-device it returns the /dev/X device, @@ -108,7 +114,7 @@ class BlockDevice: # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @property - def partitions(self): + def partitions(self) -> Dict[str, Partition]: from .filesystem import Partition self.partprobe() @@ -133,17 +139,19 @@ class BlockDevice: return {k: self.part_cache[k] for k in sorted(self.part_cache)} @property - def partition(self): + def partition(self) -> Partition: all_partitions = self.partitions return [all_partitions[k] for k in all_partitions] @property - def partition_table_type(self): + def partition_table_type(self) -> int: + # TODO: Don't hardcode :) + # Remove if we don't use this function anywhere from .filesystem import GPT return GPT @property - def uuid(self): + def uuid(self) -> str: log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') """ Returns the disk UUID as returned by lsblk. @@ -153,7 +161,7 @@ class BlockDevice: return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') @property - def size(self): + def size(self) -> float: from .helpers import convert_size_to_gb output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) @@ -162,21 +170,21 @@ class BlockDevice: return convert_size_to_gb(device['size']) @property - def bus_type(self): + def bus_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['tran'] @property - def spinning(self): + def spinning(self) -> bool: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['rota'] is True @property - def free_space(self): + def free_space(self) -> Tuple[str, str, str]: # NOTE: parted -s will default to `cancel` on prompt, skipping any partition # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, # so the free will ignore the ESP partition and just give the "free" space. @@ -187,7 +195,7 @@ class BlockDevice: yield (start, end, size) @property - def largest_free_space(self): + def largest_free_space(self) -> List[str]: info = [] for space_info in self.free_space: if not info: @@ -199,7 +207,7 @@ class BlockDevice: return info @property - def first_free_sector(self): + def first_free_sector(self) -> str: if info := self.largest_free_space: start = info[0] else: @@ -207,29 +215,29 @@ class BlockDevice: return start @property - def first_end_sector(self): + def first_end_sector(self) -> str: if info := self.largest_free_space: end = info[1] else: end = f"{self.size}GB" return end - def partprobe(self): - SysCommand(['partprobe', self.path]) + def partprobe(self) -> bool: + return SysCommand(['partprobe', self.path]).exit_code == 0 - def has_partitions(self): + def has_partitions(self) -> int: return len(self.partitions) - def has_mount_point(self, mountpoint): + def has_mount_point(self, mountpoint :str) -> bool: for partition in self.partitions: if self.partitions[partition].mountpoint == mountpoint: return True return False - def flush_cache(self): + def flush_cache(self) -> None: self.part_cache = {} - def get_partition(self, uuid): + def get_partition(self, uuid :str) -> Partition: count = 0 while count < 5: for partition_uuid, partition in self.partitions.items(): diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py index fb9712f8..ad8d0a52 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs.py @@ -1,23 +1,30 @@ +from __future__ import annotations import pathlib import glob import logging -from typing import Union +from typing import Union, Dict, TYPE_CHECKING + +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from ..installer import Installer from .helpers import get_mount_info from ..exceptions import DiskError from ..general import SysCommand from ..output import log -from .partition import Partition -def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: +def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: """ This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name. @installation: archinstall.Installer instance @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot @force: overrides the check for weither or not the subvolume mountpoint is empty or not - """ + This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure. + Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name] + """ + log("function btrfs.mount_subvolume DEPRECATED. See code for alternatives",fg="yellow",level=logging.WARNING) installation_mountpoint = installation.target if type(installation_mountpoint) == str: installation_mountpoint = pathlib.Path(installation_mountpoint) @@ -42,7 +49,7 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0 -def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) -> bool: +def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -75,22 +82,38 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: raise DiskError(f"Could not create a subvolume at {target}: {cmd}") -def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None): +def _has_option(option :str,options :list) -> bool: + """ auxiliary routine to check if an option is present in a list. + we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...) + """ + if not options: + return False + for item in options: + if option in item: + return True + return False + +def manage_btrfs_subvolumes(installation :Installer, + partition :Dict[str, str],) -> list: + from copy import deepcopy """ we do the magic with subvolumes in a centralized place parameters: * the installation object * the partition dictionary entry which represents the physical partition - * mountpoinst, the dictionary which contains all the partititon to be mounted - * subvolumes is the dictionary with the names of the subvolumes and its location + returns + * mountpoinst, the list which contains all the "new" partititon to be mounted + We expect the partition has been mounted as / , and it to be unmounted after the processing Then we create all the subvolumes inside btrfs as demand We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs - Then we add it them to the mountpoints dictionary to be processed as "normal" partitions + Then we return a list of "new" partitions to be processed as "normal" partitions # TODO For encrypted devices we need some special processing prior to it """ # We process each of the pairs <subvolume name: mount point | None | mount info dict> # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list # of mount options (or similar used by brtfs) + mountpoints = [] + subvolumes = partition['btrfs']['subvolumes'] for name, right_hand in subvolumes.items(): try: # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use @@ -98,7 +121,7 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su name = name[1:] # renormalize the right hand. location = None - mount_options = [] + subvol_options = [] # no contents, so it is not to be mounted if not right_hand: location = None @@ -108,38 +131,37 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿? elif isinstance(right_hand,dict): location = right_hand.get('mountpoint',None) - mount_options = right_hand.get('options',[]) + subvol_options = right_hand.get('options',[]) # we create the subvolume create_subvolume(installation,name) # Make the nodatacow processing now # It will be the main cause of creation of subvolumes which are not to be mounted # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in mount_options: + if 'nodatacow' in subvol_options: if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") # entry is deleted so nodatacow doesn't propagate to the mount options - del mount_options[mount_options.index('nodatacow')] + del subvol_options[subvol_options.index('nodatacow')] # Make the compress processing now # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new # in this way only zstd compression is activaded # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in mount_options: - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del mount_options[mount_options.index('compress')] + if 'compress' in subvol_options: + if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])): + if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so compress doesn't propagate to the mount options + del subvol_options[subvol_options.index('compress')] # END compress processing. # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume if not partition['mountpoint'] and location is not None: # we begin to create a fake partition entry. First we copy the original -the one that corresponds to - # the primary partition - fake_partition = partition.copy() + # the primary partition. We make a deepcopy to avoid altering the original content in any case + fake_partition = deepcopy(partition) # we start to modify entries in the "fake partition" to match the needs of the subvolumes - # # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy - # and reset the encryption parameters del fake_partition['btrfs'] fake_partition['encrypted'] = False fake_partition['generate-encryption-key-file'] = False @@ -147,22 +169,16 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su fake_partition['mountpoint'] = location # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path. fake_partition['subvolume'] = name - # here we add the mount options - fake_partition['options'] = mount_options - # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. - # We instanciate a new object with following attributes coming / adapted from the instance which was in the primary partition entry (the one we are coping - partition['device_instance'] - # * path, which will be expanded with the subvolume name to use the bind mount syntax the system uses for naming mounted subvolumes - # * size. When the OS queries all the subvolumes share the same size as the full partititon - # * uuid. All the subvolumes on a partition share the same uuid - if not unlocked_device: - fake_partition['device_instance'] = Partition(f"{partition['device_instance'].path}[/{name}]",partition['device_instance'].size,partition['device_instance'].uuid) + # here we add the special mount options for the subvolume, if any. + # if the original partition['options'] is not a list might give trouble + if fake_partition.get('filesystem',{}).get('mount_options',[]): + fake_partition['filesystem']['mount_options'].extend(subvol_options) else: - # for subvolumes IN an encrypted partition we make our device instance from unlocked device instead of the raw partition. - # This time we make a copy (we should to the same above TODO) and alter the path by hand - from copy import copy - # KIDS DONT'T DO THIS AT HOME - fake_partition['device_instance'] = copy(unlocked_device) - fake_partition['device_instance'].path = f"{unlocked_device.path}[/{name}]" + fake_partition['filesystem']['mount_options'] = subvol_options + # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. + # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes + # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless + fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" # we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'. # If i don't reset it, process will abort as "already mounted' . # TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice @@ -170,9 +186,7 @@ def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, su # # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, # as "normal" ones - mountpoints[fake_partition['mountpoint']] = fake_partition + mountpoints.append(fake_partition) except Exception as e: raise e - # if the physical partition has been selected to be mounted, we include it at the list. Remmeber, all the above treatement won't happen except the creation of the subvolume - if partition['mountpoint']: - mountpoints[partition['mountpoint']] = partition + return mountpoints diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 51ef949b..3b09ec6c 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,7 +1,13 @@ +from __future__ import annotations import time import logging import json import pathlib +from typing import Optional, Dict, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .partition import Partition from .validators import valid_fs_type from ..exceptions import DiskError @@ -16,24 +22,25 @@ class Filesystem: # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them # as well as close any crypto handles. - def __init__(self, blockdevice, mode): + def __init__(self, blockdevice :BlockDevice, mode :int): self.blockdevice = blockdevice self.mode = mode - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': return self - def __repr__(self): + def __repr__(self) -> str: return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] + SysCommand('sync') return True - def partuuid_to_index(self, uuid): + def partuuid_to_index(self, uuid :str) -> Optional[int]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() time.sleep(5) @@ -50,7 +57,7 @@ class Filesystem: raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") - def load_layout(self, layout :dict): + def load_layout(self, layout :Dict[str, Any]) -> None: from ..luks import luks2 # If the layout tells us to wipe the drive, we do so @@ -83,6 +90,8 @@ class Filesystem: raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition_uuid)}).") if partition.get('filesystem', {}).get('format', False): + # needed for backward compatibility with the introduction of the new "format_options" + format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[]) if partition.get('encrypted', False): if not partition.get('!password'): if not storage['arguments'].get('!encryption-password'): @@ -93,15 +102,12 @@ class Filesystem: storage['arguments']['!encryption-password'] = get_password(f"Enter a encryption password for {partition['device_instance']}") partition['!password'] = storage['arguments']['!encryption-password'] - # to be able to generate an unique name in case the partition will not be mounted + if partition.get('mountpoint',None): - ppath = partition['mountpoint'] + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: - ppath = partition['device_instance'].path - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop" - + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" partition['device_instance'].encrypt(password=partition['!password']) - # Immediately unlock the encrypted device to format the inner volume with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=True) as unlocked_device: if not partition.get('format'): @@ -119,29 +125,29 @@ class Filesystem: continue break - unlocked_device.format(partition['filesystem']['format'], options=partition.get('options', [])) + unlocked_device.format(partition['filesystem']['format'], options=format_options) elif partition.get('format', False): - partition['device_instance'].format(partition['filesystem']['format'], options=partition.get('options', [])) + partition['device_instance'].format(partition['filesystem']['format'], options=format_options) if partition.get('boot', False): log(f"Marking partition {partition['device_instance']} as bootable.") self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on') - def find_partition(self, mountpoint): + def find_partition(self, mountpoint :str) -> Partition: for partition in self.blockdevice: if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: return partition - def partprobe(self): - SysCommand(f'bash -c "partprobe"') + def partprobe(self) -> bool: + return SysCommand(f'bash -c "partprobe"').exit_code == 0 - def raw_parted(self, string: str): + def raw_parted(self, string: str) -> SysCommand: if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0: log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red") time.sleep(0.5) return cmd_handle - def parted(self, string: str): + def parted(self, string: str) -> bool: """ Performs a parted execution of the given string @@ -149,16 +155,17 @@ class Filesystem: :type string: str """ if (parted_handle := self.raw_parted(string)).exit_code == 0: - self.partprobe() - return True + if self.partprobe(): + return True + return False else: raise DiskError(f"Parted failed to add a partition: {parted_handle}") - def use_entire_disk(self, root_filesystem_type='ext4') -> Partition: + def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type, start, end, partition_format=None): + def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()} @@ -197,14 +204,14 @@ class Filesystem: log("Add partition is exiting due to excessive wait time",level=logging.INFO) raise DiskError(f"New partition never showed up after adding new partition on {self}.") - def set_name(self, partition: int, name: str): + def set_name(self, partition: int, name: str) -> bool: return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 - def set(self, partition: int, string: str): + def set(self, partition: int, string: str) -> bool: log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 - def parted_mklabel(self, device: str, disk_label: str): + def parted_mklabel(self, device: str, disk_label: str) -> bool: log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") # Try to unmount devices before attempting to run mklabel try: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index ba29744f..e9f6bc10 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -1,10 +1,15 @@ +from __future__ import annotations import json import logging import os import pathlib import re import time -from typing import Union +from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from .blockdevice import BlockDevice from ..exceptions import SysCallError, DiskError from ..general import SysCommand @@ -14,10 +19,10 @@ from ..storage import storage ROOT_DIR_PATTERN = re.compile('^.*?/devices') GIGA = 2 ** 30 -def convert_size_to_gb(size): +def convert_size_to_gb(size :Union[int, float]) -> float: return round(size / GIGA,1) -def sort_block_devices_based_on_performance(block_devices): +def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]: result = {device: 0 for device in block_devices} for device, weight in result.items(): @@ -35,12 +40,12 @@ def sort_block_devices_based_on_performance(block_devices): return result -def filter_disks_below_size_in_gb(devices, gigabytes): +def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]: for disk in devices: if disk.size >= gigabytes: yield disk -def select_largest_device(devices, gigabytes, filter_out=None): +def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -56,7 +61,7 @@ def select_largest_device(devices, gigabytes, filter_out=None): return max(copy_devices, key=(lambda device : device.size)) -def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): +def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -70,7 +75,7 @@ def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) -def convert_to_gigabytes(string): +def convert_to_gigabytes(string :str) -> float: unit = string.strip()[-1] size = float(string.strip()[:-1]) @@ -81,7 +86,7 @@ def convert_to_gigabytes(string): return size -def device_state(name, *args, **kwargs): +def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: @@ -99,7 +104,7 @@ def device_state(name, *args, **kwargs): return True # lsblk --json -l -n -o path -def all_disks(*args, **kwargs): +def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]: kwargs.setdefault("partitions", False) drives = {} @@ -113,7 +118,7 @@ def all_disks(*args, **kwargs): return drives -def harddrive(size=None, model=None, fuzzy=False): +def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: collection = all_disks() for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: @@ -133,7 +138,7 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list: bind_path = None return device_path,bind_path -def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict: +def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: device_path,bind_path = split_bind_name(path) for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: @@ -170,7 +175,7 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p return {} -def get_partitions_in_use(mountpoint) -> list: +def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: @@ -193,7 +198,7 @@ def get_partitions_in_use(mountpoint) -> list: return mounts -def get_filesystem_type(path): +def get_filesystem_type(path :str) -> Optional[str]: device_name, bind_name = split_bind_name(path) try: return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip() @@ -201,10 +206,10 @@ def get_filesystem_type(path): return None -def disk_layouts(): +def disk_layouts() -> Optional[Dict[str, Any]]: try: if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0: - return json.loads(handle.decode('UTF-8')) + return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()} else: log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow") return None @@ -216,20 +221,22 @@ def disk_layouts(): return None -def encrypted_partitions(blockdevices :dict) -> bool: +def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: for partition in blockdevices.values(): if partition.get('encrypted', False): yield partition -def find_partition_by_mountpoint(block_devices, relative_mountpoint :str): +def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: for partition in block_devices[device]['partitions']: if partition.get('mountpoint', None) == relative_mountpoint: return partition -def partprobe(): - SysCommand(f'bash -c "partprobe"') - time.sleep(5) +def partprobe() -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk + return True + return False def convert_device_to_uuid(path :str) -> str: device_name, bind_name = split_bind_name(path) diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index bb6f2d53..56ead68c 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -5,7 +5,8 @@ import logging import json import os import hashlib -from typing import Optional +from typing import Optional, Dict, Any, List, Union + from .blockdevice import BlockDevice from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage @@ -15,7 +16,15 @@ from ..general import SysCommand class Partition: - def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + def __init__(self, + path: str, + block_device: BlockDevice, + part_id :Optional[str] = None, + filesystem :Optional[str] = None, + mountpoint :Optional[str] = None, + encrypted :bool = False, + autodetect_filesystem :bool = True): + if not part_id: part_id = os.path.basename(path) @@ -50,14 +59,16 @@ class Partition: if self.filesystem == 'crypto_LUKS': self.encrypted = True - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path else: left_comparitor = str(left_comparitor) - return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. - def __repr__(self, *args, **kwargs): + # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 + return self.path < left_comparitor + + def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' if self.mountpoint: mount_repr = f", mounted={self.mountpoint}" @@ -69,7 +80,7 @@ class Partition: else: return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - def __dump__(self): + def __dump__(self) -> Dict[str, Any]: return { 'type': 'primary', 'PARTUUID': self._safe_uuid, @@ -86,14 +97,14 @@ class Partition: } @property - def sector_size(self): + def sector_size(self) -> Optional[int]: output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) for device in output['blockdevices']: return device.get('log-sec', None) @property - def start(self): + def start(self) -> Optional[str]: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) for partition in output.get('partitiontable', {}).get('partitions', []): @@ -101,7 +112,7 @@ class Partition: return partition['start'] # * self.sector_size @property - def end(self): + def end(self) -> Optional[str]: # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) @@ -110,7 +121,7 @@ class Partition: return partition['size'] # * self.sector_size @property - def size(self): + def size(self) -> Optional[float]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() @@ -123,7 +134,7 @@ class Partition: time.sleep(storage['DISK_TIMEOUTS']) @property - def boot(self): + def boot(self) -> bool: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) # Get the bootable flag from the sfdisk output: @@ -143,7 +154,7 @@ class Partition: return False @property - def partition_type(self): + def partition_type(self) -> Optional[str]: lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) for device in lsblk['blockdevices']: @@ -179,19 +190,19 @@ class Partition: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() @property - def encrypted(self): + def encrypted(self) -> Union[bool, None]: return self._encrypted @encrypted.setter - def encrypted(self, value: bool): + def encrypted(self, value: bool) -> None: self._encrypted = value @property - def parent(self): + def parent(self) -> str: return self.real_device @property - def real_device(self): + def real_device(self) -> str: for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): return f"/dev/{parent}" @@ -199,25 +210,27 @@ class Partition: return self.path @property - def device_path(self): + def device_path(self) -> str: """ for bind mounts returns the phisical path of the partition """ device_path, bind_name = split_bind_name(self.path) return device_path @property - def bind_name(self): + def bind_name(self) -> str: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ device_path, bind_name = split_bind_name(self.path) return bind_name - def partprobe(self): - SysCommand(f'bash -c "partprobe"') - time.sleep(1) + def partprobe(self) -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(1) + return True + return False - def detect_inner_filesystem(self, password): + def detect_inner_filesystem(self, password :str) -> Optional[str]: log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) from ..luks import luks2 @@ -227,7 +240,7 @@ class Partition: except SysCallError: return None - def has_content(self): + def has_content(self) -> bool: fs_type = get_filesystem_type(self.path) if not fs_type or "swap" in fs_type: return False @@ -248,7 +261,7 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args, **kwargs): + def encrypt(self, *args :str, **kwargs :str) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ @@ -257,7 +270,7 @@ class Partition: handle = luks2(self, None, None) return handle.encrypt(self, *args, **kwargs) - def format(self, filesystem=None, path=None, log_formatting=True, options=[]): + def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool: """ Format can be given an overriding path, for instance /dev/null to test the formatting functionality and in essence the support for the given filesystem. @@ -286,9 +299,8 @@ class Partition: elif filesystem == 'fat32': options = ['-F32'] + options - mkfs = SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}").decode('UTF-8') - if ('mkfs.fat' not in mkfs and 'mkfs.vfat' not in mkfs) or 'command not found' in mkfs: - raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}") + if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") self.filesystem = filesystem elif filesystem == 'ext4': @@ -342,7 +354,7 @@ class Partition: return True - def find_parent_of(self, data, name, parent=None): + def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: if data['name'] == name: return parent elif 'children' in data: @@ -350,7 +362,7 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent - def mount(self, target, fs=None, options=''): + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: @@ -386,25 +398,24 @@ class Partition: self.mountpoint = target return True - def unmount(self): - try: - SysCommand(f"/usr/bin/umount {self.path}") - except SysCallError as err: - exit_code = err.exit_code - - # Without to much research, it seams that low error codes are errors. - # And above 8k is indicators such as "/dev/x not mounted.". - # So anything in between 0 and 8k are errors (?). - if 0 < exit_code < 8000: - raise err + return False + + def unmount(self) -> bool: + worker = SysCommand(f"/usr/bin/umount {self.path}") + + # Without to much research, it seams that low error codes are errors. + # And above 8k is indicators such as "/dev/x not mounted.". + # So anything in between 0 and 8k are errors (?). + if 0 < worker.exit_code < 8000: + raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) self.mountpoint = None return True - def umount(self): + def umount(self) -> bool: return self.unmount() - def filesystem_supported(self): + def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling partition.format() with a path set to '/dev/null' which returns two exceptions: @@ -420,7 +431,7 @@ class Partition: return True -def get_mount_fs_type(fs): +def get_mount_fs_type(fs :str) -> str: if fs == 'ntfs': return 'ntfs3' # Needed to use the Paragon R/W NTFS driver elif fs == 'fat32': diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 8dbd15dd..a90ac506 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -1,14 +1,25 @@ +from __future__ import annotations import logging +from typing import Optional, Dict, Any, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to +from ..hardware import has_uefi from ..output import log -def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_options=False): +def suggest_single_disk_layout(block_device :BlockDevice, + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb + MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB using_subvolumes = False + using_home_partition = False if default_filesystem == 'btrfs': using_subvolumes = input('Would you like to use BTRFS subvolumes with a default structure? (Y/n): ').strip().lower() in ('', 'y', 'yes') @@ -20,11 +31,19 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o } } + # Used for reference: https://wiki.archlinux.org/title/partitioning + + # 2 MiB is unallocated for GRUB on BIOS. Potentially unneeded for + # other bootloaders? + + # TODO: On BIOS, /boot partition is only needed if the drive will + # be encrypted, otherwise it is not recommended. We should probably + # add a check for whether the drive will be encrypted or not. layout[block_device.path]['partitions'].append({ # Boot "type" : "primary", - "start" : "5MB", - "size" : "513MB", + "start" : "3MiB", + "size" : "203MiB", "boot" : True, "encrypted" : False, "format" : True, @@ -33,10 +52,18 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o "format" : "fat32" } }) + + # Increase the UEFI partition if UEFI is detected. + # Also re-align the start to 1MiB since we don't need the first sectors + # like we do in MBR layouts where the boot loader is installed traditionally. + if has_uefi(): + layout[block_device.path]['partitions'][-1]['start'] = '1MiB' + layout[block_device.path]['partitions'][-1]['size'] = '512MiB' + layout[block_device.path]['partitions'].append({ # Root "type" : "primary", - "start" : "518MB", + "start" : "206MiB", "encrypted" : False, "format" : True, "mountpoint" : "/", @@ -45,13 +72,20 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o } }) + if has_uefi(): + layout[block_device.path]['partitions'][-1]['start'] = '513MiB' + + if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: + using_home_partition = input('Would you like to create a separate partition for /home? (Y/n): ').strip().lower() in ('', 'y', 'yes') + # Set a size for / (/root) - if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART: + if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition: # We'll use subvolumes # Or the disk size is too small to allow for a separate /home + # Or the user doesn't want to create a separate partition for /home layout[block_device.path]['partitions'][-1]['size'] = '100%' else: - layout[block_device.path]['partitions'][-1]['size'] = f"{min(block_device.size, 20)}GB" + layout[block_device.path]['partitions'][-1]['size'] = f"{min(block_device.size, 20)}GiB" if default_filesystem == 'btrfs' and using_subvolumes: # if input('Do you want to use a recommended structure? (Y/n): ').strip().lower() in ('', 'y', 'yes'): @@ -69,17 +103,17 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o # else: # pass # ... implement a guided setup - elif block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: + elif using_home_partition: # If we don't want to use subvolumes, # But we want to be able to re-use data between re-installs.. # A second partition for /home would be nice if we have the space for it layout[block_device.path]['partitions'].append({ # Home "type" : "primary", + "start" : f"{min(block_device.size, 20)}GiB", + "size" : "100%", "encrypted" : False, "format" : True, - "start" : f"{min(block_device.size+0.5, 20.5)}GB", - "size" : "100%", "mountpoint" : "/home", "filesystem" : { "format" : default_filesystem @@ -89,7 +123,10 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o return layout -def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_options=False): +def suggest_multi_disk_layout(block_devices :List[BlockDevice], + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) @@ -98,8 +135,8 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_o # https://www.reddit.com/r/btrfs/comments/m287gp/partition_strategy_for_two_physical_disks/ # https://www.reddit.com/r/btrfs/comments/9us4hr/what_is_your_btrfs_partitionsubvolumes_scheme/ - MIN_SIZE_TO_ALLOW_HOME_PART = 40 # Gb - ARCH_LINUX_INSTALLED_SIZE = 20 # Gb, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size? + MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB + ARCH_LINUX_INSTALLED_SIZE = 20 # GiB, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size? block_devices = sort_block_devices_based_on_performance(block_devices).keys() @@ -119,11 +156,13 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_o }, } + # TODO: Same deal as with the single disk layout, we should + # probably check if the drive will be encrypted. layout[root_device.path]['partitions'].append({ # Boot "type" : "primary", - "start" : "5MB", - "size" : "513MB", + "start" : "3MiB", + "size" : "203MiB", "boot" : True, "encrypted" : False, "format" : True, @@ -132,26 +171,33 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_o "format" : "fat32" } }) + + if has_uefi(): + layout[root_device.path]['partitions'][-1]['start'] = '1MiB' + layout[root_device.path]['partitions'][-1]['size'] = '512MiB' + layout[root_device.path]['partitions'].append({ # Root "type" : "primary", - "start" : "518MB", + "start" : "206MiB", + "size" : "100%", "encrypted" : False, "format" : True, - "size" : "100%", "mountpoint" : "/", "filesystem" : { "format" : default_filesystem } }) + if has_uefi(): + layout[root_device.path]['partitions'][-1]['start'] = '513MiB' layout[home_device.path]['partitions'].append({ # Home "type" : "primary", + "start" : "1MiB", + "size" : "100%", "encrypted" : False, "format" : True, - "start" : "5MB", - "size" : "100%", "mountpoint" : "/home", "filesystem" : { "format" : default_filesystem diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index 464f0d73..fd1b7f33 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -1,4 +1,6 @@ -def valid_parted_position(pos :str): +from typing import List + +def valid_parted_position(pos :str) -> bool: if not len(pos): return False @@ -17,7 +19,7 @@ def valid_parted_position(pos :str): return False -def fs_types(): +def fs_types() -> List[str]: # https://www.gnu.org/software/parted/manual/html_node/mkpart.html # Above link doesn't agree with `man parted` /mkpart documentation: """ |