index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/partition.py | 94 |
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index bb6f2d53..b8fa2b79 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -5,7 +5,8 @@ import logging import json import os import hashlib -from typing import Optional +from typing import Optional, Dict, Any, List, Union + from .blockdevice import BlockDevice from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage @@ -15,7 +16,15 @@ from ..general import SysCommand class Partition: - def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + def __init__(self, + path: str, + block_device: BlockDevice, + part_id :Optional[str] = None, + filesystem :Optional[str] = None, + mountpoint :Optional[str] = None, + encrypted :bool = False, + autodetect_filesystem :bool = True): + if not part_id: part_id = os.path.basename(path) @@ -50,14 +59,16 @@ class Partition: if self.filesystem == 'crypto_LUKS': self.encrypted = True - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path else: left_comparitor = str(left_comparitor) - return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. - def __repr__(self, *args, **kwargs): + # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 + return self.path < left_comparitor + + def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' if self.mountpoint: mount_repr = f", mounted={self.mountpoint}" @@ -69,7 +80,7 @@ class Partition: else: return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - def __dump__(self): + def __dump__(self) -> Dict[str, Any]: return { 'type': 'primary', 'PARTUUID': self._safe_uuid, @@ -86,14 +97,14 @@ class Partition: } @property - def sector_size(self): + def sector_size(self) -> Optional[int]: output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) for device in output['blockdevices']: return device.get('log-sec', None) @property - def start(self): + def start(self) -> Optional[str]: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) for partition in output.get('partitiontable', {}).get('partitions', []): @@ -101,7 +112,7 @@ class Partition: return partition['start'] # * self.sector_size @property - def end(self): + def end(self) -> Optional[str]: # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) @@ -110,7 +121,7 @@ class Partition: return partition['size'] # * self.sector_size @property - def size(self): + def size(self) -> Optional[float]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() @@ -123,7 +134,7 @@ class Partition: time.sleep(storage['DISK_TIMEOUTS']) @property - def boot(self): + def boot(self) -> bool: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) # Get the bootable flag from the sfdisk output: @@ -143,7 +154,7 @@ class Partition: return False @property - def partition_type(self): + def partition_type(self) -> Optional[str]: lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) for device in lsblk['blockdevices']: @@ -179,19 +190,19 @@ class Partition: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() @property - def encrypted(self): + def encrypted(self) -> Union[bool, None]: return self._encrypted @encrypted.setter - def encrypted(self, value: bool): + def encrypted(self, value: bool) -> None: self._encrypted = value @property - def parent(self): + def parent(self) -> str: return self.real_device @property - def real_device(self): + def real_device(self) -> str: for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): return f"/dev/{parent}" @@ -199,25 +210,27 @@ class Partition: return self.path @property - def device_path(self): + def device_path(self) -> str: """ for bind mounts returns the phisical path of the partition """ device_path, bind_name = split_bind_name(self.path) return device_path @property - def bind_name(self): + def bind_name(self) -> str: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ device_path, bind_name = split_bind_name(self.path) return bind_name - def partprobe(self): - SysCommand(f'bash -c "partprobe"') - time.sleep(1) + def partprobe(self) -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(1) + return True + return False - def detect_inner_filesystem(self, password): + def detect_inner_filesystem(self, password :str) -> Optional[str]: log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) from ..luks import luks2 @@ -227,7 +240,7 @@ class Partition: except SysCallError: return None - def has_content(self): + def has_content(self) -> bool: fs_type = get_filesystem_type(self.path) if not fs_type or "swap" in fs_type: return False @@ -248,7 +261,7 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args, **kwargs): + def encrypt(self, *args :str, **kwargs :str) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ @@ -257,7 +270,7 @@ class Partition: handle = luks2(self, None, None) return handle.encrypt(self, *args, **kwargs) - def format(self, filesystem=None, path=None, log_formatting=True, options=[]): + def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool: """ Format can be given an overriding path, for instance /dev/null to test the formatting functionality and in essence the support for the given filesystem. @@ -342,7 +355,7 @@ class Partition: return True - def find_parent_of(self, data, name, parent=None): + def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: if data['name'] == name: return parent elif 'children' in data: @@ -350,7 +363,7 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent - def mount(self, target, fs=None, options=''): + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: @@ -386,25 +399,24 @@ class Partition: self.mountpoint = target return True - def unmount(self): - try: - SysCommand(f"/usr/bin/umount {self.path}") - except SysCallError as err: - exit_code = err.exit_code - - # Without to much research, it seams that low error codes are errors. - # And above 8k is indicators such as "/dev/x not mounted.". - # So anything in between 0 and 8k are errors (?). - if 0 < exit_code < 8000: - raise err + return False + + def unmount(self) -> bool: + worker = SysCommand(f"/usr/bin/umount {self.path}") + + # Without to much research, it seams that low error codes are errors. + # And above 8k is indicators such as "/dev/x not mounted.". + # So anything in between 0 and 8k are errors (?). + if 0 < worker.exit_code < 8000: + raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) self.mountpoint = None return True - def umount(self): + def umount(self) -> bool: return self.unmount() - def filesystem_supported(self): + def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling partition.format() with a path set to '/dev/null' which returns two exceptions: @@ -420,7 +432,7 @@ class Partition: return True -def get_mount_fs_type(fs): +def get_mount_fs_type(fs :str) -> str: if fs == 'ntfs': return 'ntfs3' # Needed to use the Paragon R/W NTFS driver elif fs == 'fat32': |