index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/blockdevice.py | 104 |
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 206c3b7e..c7b69205 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -3,6 +3,7 @@ import os import json import logging import time +from functools import cached_property from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: @@ -32,7 +33,29 @@ class BlockDevice: # I'm placing the encryption password on a BlockDevice level. def __repr__(self, *args :str, **kwargs :str) -> str: - return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})" + return self._str_repr + + @cached_property + def _str_repr(self) -> str: + return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})" + + @cached_property + def display_info(self) -> str: + columns = { + str(_('Device')): self.device_or_backfile, + str(_('Size')): f'{self._safe_size}GB', + str(_('Free space')): f'{self._safe_free_space}', + str(_('Bus-type')): f'{self.bus_type}' + } + + padding = max([len(k) for k in columns.keys()]) + + pretty = '' + for k, v in columns.items(): + k = k.ljust(padding, ' ') + pretty += f'{k} = {v}\n' + + return pretty.rstrip() def __iter__(self) -> Iterator[Partition]: for partition in self.partitions: @@ -74,7 +97,7 @@ class BlockDevice: for device in output['blockdevices']: return device['pttype'] - @property + @cached_property def device_or_backfile(self) -> str: """ Returns the actual device-endpoint of the BlockDevice. @@ -157,7 +180,7 @@ class BlockDevice: from .filesystem import GPT return GPT - @property + @cached_property def uuid(self) -> str: log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') """ @@ -167,7 +190,19 @@ class BlockDevice: """ return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') - @property + @cached_property + def _safe_size(self) -> float: + from .helpers import convert_size_to_gb + + try: + output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) + except SysCallError: + return -1.0 + + for device in output['blockdevices']: + return convert_size_to_gb(device['size']) + + @cached_property def size(self) -> float: from .helpers import convert_size_to_gb @@ -176,22 +211,29 @@ class BlockDevice: for device in output['blockdevices']: return convert_size_to_gb(device['size']) - @property + @cached_property def bus_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['tran'] - @property + @cached_property def spinning(self) -> bool: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['rota'] is True - @property - def free_space(self) -> Tuple[str, str, str]: + @cached_property + def _safe_free_space(self) -> Tuple[str, ...]: + try: + return '+'.join(part[2] for part in self.free_space) + except SysCallError: + return '?' + + @cached_property + def free_space(self) -> Tuple[str, ...]: # NOTE: parted -s will default to `cancel` on prompt, skipping any partition # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, # so the free will ignore the ESP partition and just give the "free" space. @@ -204,7 +246,7 @@ class BlockDevice: except SysCallError as error: log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG) - @property + @cached_property def largest_free_space(self) -> List[str]: info = [] for space_info in self.free_space: @@ -216,7 +258,7 @@ class BlockDevice: info = space_info return info - @property + @cached_property def first_free_sector(self) -> str: if info := self.largest_free_space: start = info[0] @@ -224,7 +266,7 @@ class BlockDevice: start = '512MB' return start - @property + @cached_property def first_end_sector(self) -> str: if info := self.largest_free_space: end = info[1] @@ -247,19 +289,27 @@ class BlockDevice: def flush_cache(self) -> None: self.part_cache = {} - def get_partition(self, uuid :str) -> Partition: - count = 0 - while count < 5: - for partition_uuid, partition in self.partitions.items(): - if partition.uuid.lower() == uuid.lower(): - return partition - else: - log(f"uuid {uuid} not found. Waiting for {count +1} time",level=logging.DEBUG) - time.sleep(float(storage['arguments'].get('disk-sleep', 0.2))) - count += 1 - else: - log(f"Could not find {uuid} in disk after 5 retries",level=logging.INFO) - print(f"Cache: {self.part_cache}") - print(f"Partitions: {self.partitions.items()}") - print(f"UUID: {[uuid]}") - raise DiskError(f"New partition {uuid} never showed up after adding new partition on {self}") + def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition: + if not uuid and not partuuid: + raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.") + + for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)): + for partition_index, partition in self.partitions.items(): + try: + if uuid and partition.uuid.lower() == uuid.lower(): + return partition + elif partuuid and partition.part_uuid.lower() == partuuid.lower(): + return partition + except DiskError as error: + # Most likely a blockdevice that doesn't support or use UUID's + # (like Microsoft recovery partition) + log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray") + pass + + log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) + time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) + + log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) + log(f"Cache: {self.part_cache}") + log(f"Partitions: {self.partitions.items()}") + raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.") |