index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk.py | 228 |
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py index 3241c455..ac59600d 100644 --- a/archinstall/lib/disk.py +++ b/archinstall/lib/disk.py @@ -1,22 +1,26 @@ -import glob, re, os, json, time, hashlib -import pathlib, traceback, logging +import glob +import pathlib +import re +import time from collections import OrderedDict -from .exceptions import DiskError +from typing import Optional + from .general import * +from .hardware import has_uefi from .output import log -from .storage import storage -from .hardware import hasUEFI ROOT_DIR_PATTERN = re.compile('^.*?/devices') GPT = 0b00000001 MBR = 0b00000010 -#import ctypes -#import ctypes.util -#libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) -#libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p) -class BlockDevice(): +# import ctypes +# import ctypes.util +# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) +# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p) + + +class BlockDevice: def __init__(self, path, info=None): if not info: # If we don't give any information, we need to auto-fill it. @@ -53,9 +57,9 @@ class BlockDevice(): to give less/partial information for user readability. """ return { - 'path' : self.path, - 'size' : self.info['size'] if 'size' in self.info else '<unknown>', - 'model' : self.info['model'] if 'model' in self.info else '<unknown>' + 'path': self.path, + 'size': self.info['size'] if 'size' in self.info else '<unknown>', + 'model': self.info['model'] if 'model' in self.info else '<unknown>' } def __dump__(self): @@ -87,8 +91,9 @@ class BlockDevice(): raise DiskError(f'Could not locate backplane info for "{self.path}"') if self.info['type'] == 'loop': - for drive in json.loads(b''.join(sys_command(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']: - if not drive['name'] == self.path: continue + for drive in json.loads(b''.join(SysCommand(['losetup', '--json'])).decode('UTF_8'))['loopdevices']: + if not drive['name'] == self.path: + continue return drive['back-file'] elif self.info['type'] == 'disk': @@ -103,21 +108,21 @@ class BlockDevice(): else: log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG) - # if not stat.S_ISBLK(os.stat(full_path).st_mode): - # raise DiskError(f'Selected disk "{full_path}" is not a block device.') + # if not stat.S_ISBLK(os.stat(full_path).st_mode): + # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @property def partitions(self): - o = b''.join(sys_command(['partprobe', self.path])) + o = b''.join(SysCommand(['partprobe', self.path])) - #o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev))) - o = b''.join(sys_command(['/usr/bin/lsblk', '-J', self.path])) + # o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev))) + o = b''.join(SysCommand(['/usr/bin/lsblk', '-J', self.path])) if b'not a block device' in o: raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') if not o[:1] == b'{': - raise DiskError(f'Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}') + raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}') r = json.loads(o.decode('UTF-8')) if len(r['blockdevices']) and 'children' in r['blockdevices'][0]: @@ -125,7 +130,7 @@ class BlockDevice(): for part in r['blockdevices'][0]['children']: part_id = part['name'][len(os.path.basename(self.path)):] if part_id not in self.part_cache: - ## TODO: Force over-write even if in cache? + # TODO: Force over-write even if in cache? if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: self.part_cache[part_id] = Partition(root_path + part_id, self, part_id=part_id, size=part['size']) @@ -142,13 +147,13 @@ class BlockDevice(): @property def uuid(self): - log(f'BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') + log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') """ Returns the disk UUID as returned by lsblk. This is more reliable than relying on /dev/disk/by-partuuid as it doesn't seam to be able to detect md raid partitions. """ - lsblk = b''.join(sys_command(f'lsblk -J -o+UUID {self.path}')) + lsblk = b''.join(SysCommand(f'lsblk -J -o+UUID {self.path}')) for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: return partition.get('uuid', None) @@ -174,8 +179,9 @@ class BlockDevice(): def flush_cache(self): self.part_cache = OrderedDict() -class Partition(): - def __init__(self, path :str, block_device :BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + +class Partition: + def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): if not part_id: part_id = os.path.basename(path) @@ -185,24 +191,24 @@ class Partition(): self.mountpoint = mountpoint self.target_mountpoint = mountpoint self.filesystem = filesystem - self.size = size # TODO: Refresh? + self.size = size # TODO: Refresh? self._encrypted = None self.encrypted = encrypted - self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions. + self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions. if mountpoint: self.mount(mountpoint) mount_information = get_mount_info(self.path) - + if self.mountpoint != mount_information.get('target', None) and mountpoint: raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}") - if (target := mount_information.get('target', None)): + if target := mount_information.get('target', None): self.mountpoint = target if not self.filesystem and autodetect_filesystem: - if (fstype := mount_information.get('fstype', get_filesystem_type(path))): + if fstype := mount_information.get('fstype', get_filesystem_type(path)): self.filesystem = fstype if self.filesystem == 'crypto_LUKS': @@ -213,7 +219,7 @@ class Partition(): left_comparitor = left_comparitor.path else: left_comparitor = str(left_comparitor) - return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. + return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. def __repr__(self, *args, **kwargs): mount_repr = '' @@ -304,22 +310,23 @@ class Partition(): return device['pttype'] @property - def uuid(self) -> str: + def uuid(self) -> Optional[str]: """ Returns the PARTUUID as returned by lsblk. This is more reliable than relying on /dev/disk/by-partuuid as it doesn't seam to be able to detect md raid partitions. """ - lsblk = b''.join(sys_command(f'lsblk -J -o+PARTUUID {self.path}')) + lsblk = b''.join(SysCommand(f'lsblk -J -o+PARTUUID {self.path}')) for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: return partition.get('partuuid', None) + return None @property def encrypted(self): return self._encrypted @encrypted.setter - def encrypted(self, value :bool): + def encrypted(self, value: bool): self._encrypted = value @@ -329,10 +336,10 @@ class Partition(): @property def real_device(self): - for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']: - if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))): + for blockdevice in json.loads(b''.join(SysCommand('lsblk -J')).decode('UTF-8'))['blockdevices']: + if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)): return f"/dev/{parent}" - # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') + # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') return self.path def detect_inner_filesystem(self, password): @@ -348,16 +355,18 @@ class Partition(): def has_content(self): if not get_filesystem_type(self.path): return False - - temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest() + + temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() temporary_path = pathlib.Path(temporary_mountpoint) temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') - + files = len(glob.glob(f"{temporary_mountpoint}/*")) - sys_command(f'/usr/bin/umount {temporary_mountpoint}') + iterations = 0 + while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10: + time.sleep(1) temporary_path.rmdir() @@ -420,36 +429,36 @@ class Partition(): log(f'Formatting {path} -> {filesystem}', level=logging.INFO) if filesystem == 'btrfs': - o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}')) + o = b''.join(SysCommand(f'/usr/bin/mkfs.btrfs -f {path}')) if b'UUID' not in o: raise DiskError(f'Could not format {path} with {filesystem} because: {o}') self.filesystem = 'btrfs' elif filesystem == 'vfat': - o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {path}')) + o = b''.join(SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}')) if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: raise DiskError(f'Could not format {path} with {filesystem} because: {o}') self.filesystem = 'vfat' elif filesystem == 'ext4': - if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'ext4' elif filesystem == 'xfs': - if (handle := sys_command(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'xfs' elif filesystem == 'f2fs': - if (handle := sys_command(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'f2fs' elif filesystem == 'crypto_LUKS': - # from .luks import luks2 - # encrypted_partition = luks2(self, None, None) - # encrypted_partition.format(path) + # from .luks import luks2 + # encrypted_partition = luks2(self, None, None) + # encrypted_partition.format(path) self.filesystem = 'crypto_LUKS' else: @@ -467,36 +476,40 @@ class Partition(): return parent elif 'children' in data: for child in data['children']: - if (parent := self.find_parent_of(child, name, parent=data['name'])): + if parent := self.find_parent_of(child, name, parent=data['name']): return parent def mount(self, target, fs=None, options=''): if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: - if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') + if not self.filesystem: + raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') fs = self.filesystem pathlib.Path(target).mkdir(parents=True, exist_ok=True) try: - sys_command(f'/usr/bin/mount {self.path} {target}') + if options: + SysCommand(f'/usr/bin/mount -o {options} {self.path} {target}') + else: + SysCommand(f'/usr/bin/mount {self.path} {target}') except SysCallError as err: raise err - + self.mountpoint = target return True def unmount(self): try: - exit_code = sys_command(f'/usr/bin/umount {self.path}').exit_code + exit_code = SysCommand(f'/usr/bin/umount {self.path}').exit_code except SysCallError as err: exit_code = err.exit_code # Without to much research, it seams that low error codes are errors. # And above 8k is indicators such as "/dev/x not mounted.". # So anything in between 0 and 8k are errors (?). - if exit_code > 0 and exit_code < 8000: + if 0 < exit_code < 8000: raise err self.mountpoint = None @@ -509,22 +522,23 @@ class Partition(): """ The support for a filesystem (this partition) is tested by calling partition.format() with a path set to '/dev/null' which returns two exceptions: - 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported - 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type + 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported + 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type """ try: self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) - except SysCallError: - pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code + except (SysCallError, DiskError): + pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code except UnknownFilesystemFormat as err: raise err return True -class Filesystem(): + +class Filesystem: # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them # as well as close any crypto handles. - def __init__(self, blockdevice,mode): + def __init__(self, blockdevice, mode): self.blockdevice = blockdevice self.mode = mode @@ -536,15 +550,15 @@ class Filesystem(): self.blockdevice.flush_cache() return self else: - raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') + raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') elif self.mode == MBR: - if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0: + if SysCommand(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0: return self else: - raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos') + raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos') else: raise DiskError(f'Unknown mode selected to format in: {self.mode}') - + # TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed. elif self.mode == self.blockdevice.partition_table_type: log(f'Kept partition format {self.mode} for {self.blockdevice}', level=logging.DEBUG) @@ -560,7 +574,7 @@ class Filesystem(): # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] - b''.join(sys_command(f'sync')) + b''.join(SysCommand('sync')) return True def find_partition(self, mountpoint): @@ -568,11 +582,11 @@ class Filesystem(): if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: return partition - def raw_parted(self, string:str): - x = sys_command(f'/usr/bin/parted -s {string}') + def raw_parted(self, string: str): + x = SysCommand(f'/usr/bin/parted -s {string}') return x - def parted(self, string:str): + def parted(self, string: str): """ Performs a parted execution of the given string @@ -583,8 +597,8 @@ class Filesystem(): def use_entire_disk(self, root_filesystem_type='ext4'): log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG) - if hasUEFI(): - self.add_partition('primary', start='1MiB', end='513MiB', format='fat32') + if has_uefi(): + self.add_partition('primary', start='1MiB', end='513MiB', partition_format='fat32') self.set_name(0, 'EFI') self.set(0, 'boot on') # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"? @@ -602,39 +616,40 @@ class Filesystem(): self.blockdevice.partition[0].allow_formatting = True self.blockdevice.partition[1].allow_formatting = True else: - #we don't need a seprate boot partition it would be a waste of space + # we don't need a seprate boot partition it would be a waste of space self.add_partition('primary', start='1MB', end='100%') - self.blockdevice.partition[0].filesystem=root_filesystem_type + self.blockdevice.partition[0].filesystem = root_filesystem_type log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG) self.blockdevice.partition[0].target_mountpoint = '/' self.blockdevice.partition[0].allow_formatting = True - def add_partition(self, type, start, end, format=None): + def add_partition(self, partition_type, start, end, partition_format=None): log(f'Adding partition to {self.blockdevice}', level=logging.INFO) - + previous_partitions = self.blockdevice.partitions if self.mode == MBR: - if len(self.blockdevice.partitions)>3: + if len(self.blockdevice.partitions) > 3: DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions") - if format: - partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {format} {start} {end}') == 0 + if partition_format: + partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}') == 0 else: - partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {start} {end}') == 0 + partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {start} {end}') == 0 if partitioning: start_wait = time.time() while previous_partitions == self.blockdevice.partitions: - time.sleep(0.025) # Let the new partition come up in the kernel + time.sleep(0.025) # Let the new partition come up in the kernel if time.time() - start_wait > 10: raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).") return True - def set_name(self, partition:int, name:str): - return self.parted(f'{self.blockdevice.device} name {partition+1} "{name}"') == 0 + def set_name(self, partition: int, name: str): + return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 + + def set(self, partition: int, string: str): + return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 - def set(self, partition:int, string:str): - return self.parted(f'{self.blockdevice.device} set {partition+1} {string}') == 0 def device_state(name, *args, **kwargs): # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 @@ -653,28 +668,32 @@ def device_state(name, *args, **kwargs): return return True + # lsblk --json -l -n -o path def all_disks(*args, **kwargs): kwargs.setdefault("partitions", False) drives = OrderedDict() - #for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']: - for drive in json.loads(b''.join(sys_command(f'lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']: - if not kwargs['partitions'] and drive['type'] == 'part': continue + # for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']: + for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model')).decode('UTF_8'))['blockdevices']: + if not kwargs['partitions'] and drive['type'] == 'part': + continue drives[drive['path']] = BlockDevice(drive['path'], drive) return drives + def convert_to_gigabytes(string): unit = string.strip()[-1] size = float(string.strip()[:-1]) if unit == 'M': - size = size/1024 + size = size / 1024 elif unit == 'T': - size = size*1024 + size = size * 1024 return size + def harddrive(size=None, model=None, fuzzy=False): collection = all_disks() for drive in collection: @@ -685,13 +704,18 @@ def harddrive(size=None, model=None, fuzzy=False): return collection[drive] -def get_mount_info(path): + +def get_mount_info(path) -> dict: try: - output = b''.join(sys_command(f'/usr/bin/findmnt --json {path}')) + output = SysCommand(f'/usr/bin/findmnt --json {path}') except SysCallError: return {} output = output.decode('UTF-8') + + if not output: + return {} + output = json.loads(output) if 'filesystems' in output: if len(output['filesystems']) > 1: @@ -699,15 +723,20 @@ def get_mount_info(path): return output['filesystems'][0] -def get_partitions_in_use(mountpoint): + +def get_partitions_in_use(mountpoint) -> list: try: - output = b''.join(sys_command(f'/usr/bin/findmnt --json -R {mountpoint}')) + output = SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}') except SysCallError: - return {} + return [] mounts = [] output = output.decode('UTF-8') + + if not output: + return [] + output = json.loads(output) for target in output.get('filesystems', []): mounts.append(Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])) @@ -717,21 +746,24 @@ def get_partitions_in_use(mountpoint): return mounts + def get_filesystem_type(path): try: - handle = sys_command(f"blkid -o value -s TYPE {path}") + handle = SysCommand(f"blkid -o value -s TYPE {path}") return b''.join(handle).strip().decode('UTF-8') except SysCallError: return None + def disk_layouts(): try: - handle = sys_command(f"lsblk -f -o+TYPE,SIZE -J") + handle = SysCommand("lsblk -f -o+TYPE,SIZE -J") return json.loads(b''.join(handle).decode('UTF-8')) except SysCallError as err: log(f"Could not return disk layouts: {err}") return None + def encrypted_partitions(blockdevices :dict) -> bool: for partition in blockdevices.values(): if partition.get('encrypted', False): |