index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/partition.py | 349 |
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py new file mode 100644 index 00000000..40a83d6b --- /dev/null +++ b/archinstall/lib/disk/partition.py @@ -0,0 +1,349 @@ +import glob +import pathlib +import time +import logging +import json +import os +from typing import Optional +from .blockdevice import BlockDevice +from .helpers import get_mount_info, get_filesystem_type +from ..output import log +from ..general import SysCommand + +class Partition: + def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + if not part_id: + part_id = os.path.basename(path) + + self.block_device = block_device + self.path = path + self.part_id = part_id + self.mountpoint = mountpoint + self.target_mountpoint = mountpoint + self.filesystem = filesystem + self.size = size # TODO: Refresh? + self._encrypted = None + self.encrypted = encrypted + self.allow_formatting = False + + if mountpoint: + self.mount(mountpoint) + + mount_information = get_mount_info(self.path) + + if self.mountpoint != mount_information.get('target', None) and mountpoint: + raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}") + + if target := mount_information.get('target', None): + self.mountpoint = target + + if not self.filesystem and autodetect_filesystem: + if fstype := mount_information.get('fstype', get_filesystem_type(path)): + self.filesystem = fstype + + if self.filesystem == 'crypto_LUKS': + self.encrypted = True + + def __lt__(self, left_comparitor): + if type(left_comparitor) == Partition: + left_comparitor = left_comparitor.path + else: + left_comparitor = str(left_comparitor) + return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. + + def __repr__(self, *args, **kwargs): + mount_repr = '' + if self.mountpoint: + mount_repr = f", mounted={self.mountpoint}" + elif self.target_mountpoint: + mount_repr = f", rel_mountpoint={self.target_mountpoint}" + + if self._encrypted: + return f'Partition(path={self.path}, size={self.size}, PARTUUID={self.uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' + else: + return f'Partition(path={self.path}, size={self.size}, PARTUUID={self.uuid}, fs={self.filesystem}{mount_repr})' + + def __dump__(self): + return { + 'type' : 'primary', + 'PARTUUID' : self.uuid, + 'wipe' : self.allow_formatting, + 'boot' : self.boot, + 'ESP' : self.boot, + 'mountpoint' : self.target_mountpoint, + 'encrypted' : self._encrypted, + 'start' : self.start, + 'size' : self.end, + 'filesystem' : { + 'format' : get_filesystem_type(self.path) + } + } + + @property + def sector_size(self): + output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.path}").decode('UTF-8')) + + for device in output['blockdevices']: + return device.get('log-sec', None) + + @property + def start(self): + output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition['start']# * self.sector_size + + @property + def end(self): + # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. + output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition['size']# * self.sector_size + + @property + def boot(self): + output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + + # Get the bootable flag from the sfdisk output: + # { + # "partitiontable": { + # "label":"dos", + # "id":"0xd202c10a", + # "device":"/dev/loop0", + # "unit":"sectors", + # "sectorsize":512, + # "partitions": [ + # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true} + # ] + # } + # } + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition.get('bootable', False) + + return False + + @property + def partition_type(self): + lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) + + for device in lsblk['blockdevices']: + return device['pttype'] + + @property + def uuid(self) -> Optional[str]: + """ + Returns the PARTUUID as returned by lsblk. + This is more reliable than relying on /dev/disk/by-partuuid as + it doesn't seam to be able to detect md raid partitions. + """ + + lsblk = json.loads(SysCommand(f'lsblk -J -o+PARTUUID {self.path}').decode('UTF-8')) + for partition in lsblk['blockdevices']: + return partition.get('partuuid', None) + return None + + @property + def encrypted(self): + return self._encrypted + + @encrypted.setter + def encrypted(self, value: bool): + + self._encrypted = value + + @property + def parent(self): + return self.real_device + + @property + def real_device(self): + for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: + if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)): + return f"/dev/{parent}" + # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') + return self.path + + def detect_inner_filesystem(self, password): + log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) + from .luks import luks2 + + try: + with luks2(self, storage.get('ENC_IDENTIFIER', 'ai')+'loop', password, auto_unmount=True) as unlocked_device: + return unlocked_device.filesystem + except SysCallError: + return None + + def has_content(self): + fs_type = get_filesystem_type(self.path) + if not fs_type or "swap" in fs_type: + return False + + temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() + temporary_path = pathlib.Path(temporary_mountpoint) + + temporary_path.mkdir(parents=True, exist_ok=True) + if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: + raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') + + files = len(glob.glob(f"{temporary_mountpoint}/*")) + iterations = 0 + while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10: + time.sleep(1) + + temporary_path.rmdir() + + return True if files > 0 else False + + def encrypt(self, *args, **kwargs): + """ + A wrapper function for luks2() instances and the .encrypt() method of that instance. + """ + from .luks import luks2 + + handle = luks2(self, None, None) + return handle.encrypt(self, *args, **kwargs) + + def format(self, filesystem=None, path=None, log_formatting=True, options=[]): + """ + Format can be given an overriding path, for instance /dev/null to test + the formatting functionality and in essence the support for the given filesystem. + """ + if filesystem is None: + filesystem = self.filesystem + + if path is None: + path = self.path + + # To avoid "unable to open /dev/x: No such file or directory" + start_wait = time.time() + while pathlib.Path(path).exists() is False and time.time() - start_wait < 10: + time.sleep(0.025) + + if log_formatting: + log(f'Formatting {path} -> {filesystem}', level=logging.INFO) + + if filesystem == 'btrfs': + options = ['-f'] + options + + if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')): + raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') + self.filesystem = filesystem + + elif filesystem == 'fat32': + options = ['-F32'] + options + + mkfs = SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}").decode('UTF-8') + if ('mkfs.fat' not in mkfs and 'mkfs.vfat' not in mkfs) or 'command not found' in mkfs: + raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}") + self.filesystem = filesystem + + elif filesystem == 'ext4': + options = ['-F'] + options + + if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem + + elif filesystem == 'ext2': + options = ['-F'] + options + + if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') + self.filesystem = 'ext2' + + elif filesystem == 'xfs': + options = ['-f'] + options + + if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem + + elif filesystem == 'f2fs': + options = ['-f'] + options + + if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem + + elif filesystem == 'crypto_LUKS': + # from .luks import luks2 + # encrypted_partition = luks2(self, None, None) + # encrypted_partition.format(path) + self.filesystem = filesystem + + else: + raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") + + if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': + self.encrypted = True + else: + self.encrypted = False + + return True + + def find_parent_of(self, data, name, parent=None): + if data['name'] == name: + return parent + elif 'children' in data: + for child in data['children']: + if parent := self.find_parent_of(child, name, parent=data['name']): + return parent + + def mount(self, target, fs=None, options=''): + if not self.mountpoint: + log(f'Mounting {self} to {target}', level=logging.INFO) + if not fs: + if not self.filesystem: + raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') + fs = self.filesystem + + pathlib.Path(target).mkdir(parents=True, exist_ok=True) + + try: + if options: + SysCommand(f"/usr/bin/mount -o {options} {self.path} {target}") + else: + SysCommand(f"/usr/bin/mount {self.path} {target}") + except SysCallError as err: + raise err + + self.mountpoint = target + return True + + def unmount(self): + try: + SysCommand(f"/usr/bin/umount {self.path}") + except SysCallError as err: + exit_code = err.exit_code + + # Without to much research, it seams that low error codes are errors. + # And above 8k is indicators such as "/dev/x not mounted.". + # So anything in between 0 and 8k are errors (?). + if 0 < exit_code < 8000: + raise err + + self.mountpoint = None + return True + + def umount(self): + return self.unmount() + + def filesystem_supported(self): + """ + The support for a filesystem (this partition) is tested by calling + partition.format() with a path set to '/dev/null' which returns two exceptions: + 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported + 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type + """ + try: + self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) + except (SysCallError, DiskError): + pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code + except UnknownFilesystemFormat as err: + raise err + return True
\ No newline at end of file |