index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/helpers.py | 241 |
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index b04e2740..afaf9e5e 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -5,12 +5,15 @@ import os import pathlib import re import time +import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: from .partition import Partition from .blockdevice import BlockDevice +from .dmcryptdev import DMCryptDev +from .mapperdev import MapperDev from ..exceptions import SysCallError, DiskError from ..general import SysCommand from ..output import log @@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: return return True -# lsblk --json -l -n -o path -def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]: - kwargs.setdefault("partitions", False) - drives = {} - lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8')) - for drive in lsblk['blockdevices']: - if not kwargs['partitions'] and drive['type'] == 'part': +def cleanup_bash_escapes(data :str) -> str: + return data.replace(r'\ ', ' ') + +def blkid(cmd :str) -> Dict[str, Any]: + if '-o' in cmd and '-o export' not in cmd: + raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.") + elif '-o' not in cmd: + cmd += ' -o export' + + try: + raw_data = SysCommand(cmd).decode() + except SysCallError as error: + log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG) + raise error + + result = {} + # Process the raw result + devname = None + for line in raw_data.split('\r\n'): + if not len(line): + devname = None + continue + + key, val = line.split('=', 1) + if key.lower() == 'devname': + devname = val + # Lowercase for backwards compatability with all_disks() previous use cases + result[devname] = { + "path": devname, + "PATH": devname + } + continue + + result[devname][key] = cleanup_bash_escapes(val) + + return result + +def get_loop_info(path :str) -> Dict[str, Any]: + for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: + if not drive['name'] == path: continue - drives[drive['path']] = BlockDevice(drive['path'], drive) + return { + path: { + **drive, + 'type' : 'loop', + 'TYPE' : 'loop', + 'DEVTYPE' : 'loop', + 'PATH' : drive['name'], + 'path' : drive['name'] + } + } + + return {} + +def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]: + result = {} + for device_path, device_information in information.items(): + dev_name = pathlib.Path(device_information['PATH']).name + if not device_information.get('TYPE') or not device_information.get('DEVTYPE'): + with open(f"/sys/class/block/{dev_name}/uevent") as fh: + device_information.update(uevent(fh.read())) + + if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists(): + with dmcrypt_name.open('r') as fh: + device_information['DMCRYPT_NAME'] = fh.read().strip() + + result[device_path] = device_information + + return result + +def uevent(data :str) -> Dict[str, Any]: + information = {} + + for line in data.replace('\r\n', '\n').split('\n'): + if len((line := line.strip())): + key, val = line.split('=', 1) + information[key] = val + + return information + +def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]: + device_information = {} + with open(f"/sys/class/block/{dev_name}/uevent") as fh: + device_information.update(uevent(fh.read())) + + return { + f"/dev/{dev_name}" : { + **device_information, + 'path' : f'/dev/{dev_name}', + 'PATH' : f'/dev/{dev_name}', + 'PTTYPE' : None + } + } + +def all_disks() -> List[BlockDevice]: + log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow") + return all_blockdevices(partitions=False, mappers=False) + +def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]: + """ + Returns BlockDevice() and Partition() objects for all available devices. + """ + from .partition import Partition - return drives + instances = {} + # Due to lsblk being highly unreliable for this use case, + # we'll iterate the /sys/class definitions and find the information + # from there. + for block_device in glob.glob("/sys/class/block/*"): + device_path = f"/dev/{pathlib.Path(block_device).readlink().name}" + try: + information = blkid(f'blkid -p -o export {device_path}') + + # TODO: No idea why F841 is raised here: + except SysCallError as error: # noqa: F841 + if error.exit_code in (512, 2): + # Assume that it's a loop device, and try to get info on it + try: + information = get_loop_info(device_path) + if not information: + raise SysCallError("Could not get loop information", exit_code=1) + + except SysCallError: + information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) + else: + raise error + + information = enrich_blockdevice_information(information) + + for path, path_info in information.items(): + if path_info.get('DMCRYPT_NAME'): + instances[path] = DMCryptDev(dev_path=path) + elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'): + if partitions: + instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path)))) + elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': + instances[path] = BlockDevice(path, path_info) + elif path_info.get('TYPE') == 'squashfs': + # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) + continue + else: + log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow") + + if mappers: + for block_device in glob.glob("/dev/mapper/*"): + if (pathobj := pathlib.Path(block_device)).is_symlink(): + instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name) + + return instances + + +def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path: + partition_name = path.name + pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve() + return f"/dev/{pci_device.parent.name}" def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: - collection = all_disks() + collection = all_blockdevices(partitions=False) for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: continue @@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy : return collection[drive] def split_bind_name(path :Union[pathlib.Path, str]) -> list: + # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow") # we check for the bind notation. if exist we'll only use the "true" device path if '[' in str(path) : # is a bind path (btrfs subvolume path) device_path, bind_path = str(path).split('[') @@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list: bind_path = None return device_path,bind_path +def find_mountpoint(device_path :str) -> Dict[str, Any]: + try: + for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']: + yield filesystem + except SysCallError: + return {} + def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: - device_path,bind_path = split_bind_name(path) + device_path, bind_path = split_bind_name(path) output = {} for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: - log(f"Getting mount information for device path {traversal}", level=logging.INFO) + log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')): break - except SysCallError: + + except SysCallError as error: + print('ERROR:', error) pass if not traverse: break if not output: - raise DiskError(f"Could not get mount information for device path {path}") + raise DiskError(f"Could not get mount information for device path {device_path}") output = json.loads(output) + # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter # i.e. the subvolume filesystem we're searching for if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None: output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)] + if 'filesystems' in output: if len(output['filesystems']) > 1: - raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}") + raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}") if return_real_path: return output['filesystems'][0], traversal @@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur return {} +def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]: + for info in data: + if info.get('target') not in filters: + filters[info.get('target')] = None + + filters.update(get_all_targets(info.get('children', []))) + + return filters + def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') except SysCallError: - return [] - - mounts = [] + return {} if not output: - return [] + return {} output = json.loads(output) - for target in output.get('filesystems', []): - # We need to create a BlockDevice() instead of 'None' here when creaiting Partition() - # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None. + # print(output) - # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device - # Note: doing print(partition) here will break because the above mentioned issue. - partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target']) - partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target']) + mounts = {} + + block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True) + + block_devices_mountpoints = {} + for blockdev in block_devices_available.values(): + if not type(blockdev) in (Partition, MapperDev): + continue - # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using - # (lsblk pkname lists both the partition and blockdevice, BD being the last entry) - result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1] - block_device = BlockDevice(f"/dev/{result}") + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev - # Once we figured the block device out, we can properly create the partition object - partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target']) + log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) - mounts.append(partition) + for mountpoint in list(get_all_targets(output['filesystems']).keys()): + if mountpoint in block_devices_mountpoints: + if mountpoint not in mounts: + mounts[mountpoint] = block_devices_mountpoints[mountpoint] + # If the already defined mountpoint is a DMCryptDev, and the newly found + # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition. + elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev: + mounts[mountpoint] = block_devices_mountpoints[mountpoint] - for child in target.get('children', []): - mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target'])) + log(f"Available partitions: {mounts}", level=logging.DEBUG) return mounts |