index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk.py | 203 |
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py index de39bafd..e0d9b423 100644 --- a/archinstall/lib/disk.py +++ b/archinstall/lib/disk.py @@ -2,7 +2,6 @@ import glob import pathlib import re import time -from collections import OrderedDict from typing import Optional from .general import * @@ -30,11 +29,11 @@ class BlockDevice: self.path = path self.info = info self.keep_partitions = True - self.part_cache = OrderedDict() + self.part_cache = {} + # TODO: Currently disk encryption is a BIT misleading. # It's actually partition-encryption, but for future-proofing this # I'm placing the encryption password on a BlockDevice level. - self.encryption_password = None def __repr__(self, *args, **kwargs): return f"BlockDevice({self.device})" @@ -48,6 +47,12 @@ class BlockDevice: raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] + def __len__(self): + return len(self.partitions) + + def __lt__(self, left_comparitor): + return self.path < left_comparitor.path + def json(self): """ json() has precedence over __dump__, so this is a way @@ -61,12 +66,21 @@ class BlockDevice: def __dump__(self): return { - 'path': self.path, - 'info': self.info, - 'partition_cache': self.part_cache + self.path : { + 'partuuid' : self.uuid, + 'wipe' : self.info.get('wipe', None), + 'partitions' : [part.__dump__() for part in self.partitions.values()] + } } @property + def partition_type(self): + output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) + + for device in output['blockdevices']: + return device['pttype'] + + @property def device(self): """ Returns the actual device-endpoint of the BlockDevice. @@ -78,7 +92,7 @@ class BlockDevice: raise DiskError(f'Could not locate backplane info for "{self.path}"') if self.info['type'] == 'loop': - for drive in json.loads(b''.join(SysCommand(['losetup', '--json'])).decode('UTF_8'))['loopdevices']: + for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: if not drive['name'] == self.path: continue @@ -100,18 +114,17 @@ class BlockDevice: @property def partitions(self): - o = b''.join(SysCommand(['partprobe', self.path])) + SysCommand(['partprobe', self.path]) - # o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev))) - o = b''.join(SysCommand(['/usr/bin/lsblk', '-J', self.path])) + result = SysCommand(['/usr/bin/lsblk', '-J', self.path]) - if b'not a block device' in o: + if b'not a block device' in result: raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') - if not o[:1] == b'{': + if not result[:1] == b'{': raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}') - r = json.loads(o.decode('UTF-8')) + r = json.loads(result.decode('UTF-8')) if len(r['blockdevices']) and 'children' in r['blockdevices'][0]: root_path = f"/dev/{r['blockdevices'][0]['name']}" for part in r['blockdevices'][0]['children']: @@ -140,10 +153,18 @@ class BlockDevice: This is more reliable than relying on /dev/disk/by-partuuid as it doesn't seam to be able to detect md raid partitions. """ - lsblk = b''.join(SysCommand(f'lsblk -J -o+UUID {self.path}')) - for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: + for partition in json.loads(SysCommand(f'lsblk -J -o+UUID {self.path}').decode('UTF-8'))['blockdevices']: return partition.get('uuid', None) + @property + def size(self): + output = json.loads(SysCommand(f"lsblk --json -o+SIZE {self.path}").decode('UTF-8')) + + for device in output['blockdevices']: + assert device['size'][-1] == 'G' # Make sure we're counting in Gigabytes, otherwise the next logic fails. + + return float(device['size'][:-1]) + def has_partitions(self): return len(self.partitions) @@ -154,7 +175,7 @@ class BlockDevice: return False def flush_cache(self): - self.part_cache = OrderedDict() + self.part_cache = {} class Partition: @@ -210,6 +231,77 @@ class Partition: else: return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})' + def __dump__(self): + return { + 'type' : 'primary', + 'PARTUUID' : self.uuid, + 'wipe' : self.allow_formatting, + 'boot' : self.boot, + 'ESP' : self.boot, + 'mountpoint' : self.target_mountpoint, + 'encrypted' : self._encrypted, + 'start' : self.start, + 'size' : self.end, + 'filesystem' : { + 'format' : get_filesystem_type(self.path) + } + } + + @property + def sector_size(self): + output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.path}").decode('UTF-8')) + + for device in output['blockdevices']: + return device.get('log-sec', None) + + @property + def start(self): + output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition['start']# * self.sector_size + + @property + def end(self): + # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. + output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition['size']# * self.sector_size + + @property + def boot(self): + output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + + # Get the bootable flag from the sfdisk output: + # { + # "partitiontable": { + # "label":"dos", + # "id":"0xd202c10a", + # "device":"/dev/loop0", + # "unit":"sectors", + # "sectorsize":512, + # "partitions": [ + # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true} + # ] + # } + # } + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition.get('bootable', False) + + return False + + @property + def partition_type(self): + lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) + + for device in lsblk['blockdevices']: + return device['pttype'] + @property def uuid(self) -> Optional[str]: """ @@ -217,8 +309,8 @@ class Partition: This is more reliable than relying on /dev/disk/by-partuuid as it doesn't seam to be able to detect md raid partitions. """ - lsblk = b''.join(SysCommand(f'lsblk -J -o+PARTUUID {self.path}')) - for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: + lsblk = json.loads(SysCommand(f'lsblk -J -o+PARTUUID {self.path}').decode('UTF-8')) + for partition in lsblk['blockdevices']: return partition.get('partuuid', None) return None @@ -237,7 +329,7 @@ class Partition: @property def real_device(self): - for blockdevice in json.loads(b''.join(SysCommand('lsblk -J')).decode('UTF-8'))['blockdevices']: + for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)): return f"/dev/{parent}" # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') @@ -331,30 +423,29 @@ class Partition: log(f'Formatting {path} -> {filesystem}', level=logging.INFO) if filesystem == 'btrfs': - o = b''.join(SysCommand(f'/usr/bin/mkfs.btrfs -f {path}')) - if b'UUID' not in o: - raise DiskError(f'Could not format {path} with {filesystem} because: {o}') + if b'UUID' not in (mkfs := SysCommand(f'/usr/bin/mkfs.btrfs -f {path}')): + raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') self.filesystem = 'btrfs' elif filesystem == 'vfat': - o = b''.join(SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}')) - if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: - raise DiskError(f'Could not format {path} with {filesystem} because: {o}') + mkfs = SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}') + if (b'mkfs.fat' not in mkfs and b'mkfs.vfat' not in mkfs) or b'command not found' in mkfs: + raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}") self.filesystem = 'vfat' elif filesystem == 'ext4': if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: - raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") self.filesystem = 'ext4' elif filesystem == 'xfs': if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: - raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") self.filesystem = 'xfs' elif filesystem == 'f2fs': if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: - raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") self.filesystem = 'f2fs' elif filesystem == 'crypto_LUKS': @@ -393,9 +484,9 @@ class Partition: try: if options: - SysCommand(f'/usr/bin/mount -o {options} {self.path} {target}') + SysCommand(f"/usr/bin/mount -o {options} {self.path} {target}") else: - SysCommand(f'/usr/bin/mount {self.path} {target}') + SysCommand(f"/usr/bin/mount {self.path} {target}") except SysCallError as err: raise err @@ -404,7 +495,7 @@ class Partition: def unmount(self): try: - exit_code = SysCommand(f'/usr/bin/umount {self.path}').exit_code + SysCommand(f"/usr/bin/umount {self.path}") except SysCallError as err: exit_code = err.exit_code @@ -476,17 +567,29 @@ class Filesystem: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] - b''.join(SysCommand('sync')) + SysCommand('sync') return True + def load_layout(self, layout :dict): + for partition in layout: + # We don't want to re-add an existing partition (those containing a UUID already) + if 'UUID' not in partition: + self.add_partition(partition.get('type', 'primary'), + start=partition.get('start', '1MiB'), # TODO: Revisit sane block starts (4MB for memorycards for instance) + end=partition.get('size', '100%'), + partition_format=partition.get('filesystem', {}).get('format', 'btrfs')) + + + + exit(0) + def find_partition(self, mountpoint): for partition in self.blockdevice: if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: return partition def raw_parted(self, string: str): - x = SysCommand(f'/usr/bin/parted -s {string}') - return x + return SysCommand(f'/usr/bin/parted -s {string}') def parted(self, string: str): """ @@ -582,9 +685,10 @@ def device_state(name, *args, **kwargs): # lsblk --json -l -n -o path def all_disks(*args, **kwargs): kwargs.setdefault("partitions", False) - drives = OrderedDict() - # for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']: - for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model')).decode('UTF_8'))['blockdevices']: + drives = {} + + lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8')) + for drive in lsblk['blockdevices']: if not kwargs['partitions'] and drive['type'] == 'part': continue @@ -617,12 +721,10 @@ def harddrive(size=None, model=None, fuzzy=False): def get_mount_info(path) -> dict: try: - output = SysCommand(f'/usr/bin/findmnt --json {path}') + output = SysCommand(f'/usr/bin/findmnt --json {path}').decode('UTF-8') except SysCallError: return {} - output = output.decode('UTF-8') - if not output: return {} @@ -636,14 +738,12 @@ def get_mount_info(path) -> dict: def get_partitions_in_use(mountpoint) -> list: try: - output = SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}') + output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') except SysCallError: return [] mounts = [] - output = output.decode('UTF-8') - if not output: return [] @@ -659,16 +759,25 @@ def get_partitions_in_use(mountpoint) -> list: def get_filesystem_type(path): try: - handle = SysCommand(f"blkid -o value -s TYPE {path}") - return b''.join(handle).strip().decode('UTF-8') + return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip() except SysCallError: return None def disk_layouts(): try: - handle = SysCommand("lsblk -f -o+TYPE,SIZE -J") - return json.loads(b''.join(handle).decode('UTF-8')) + return json.loads(SysCommand("lsblk -f -o+TYPE,SIZE -J").decode('UTF-8')) except SysCallError as err: log(f"Could not return disk layouts: {err}") return None + + +def encrypted_partitions(blockdevices :dict) -> bool: + for partition in blockdevices.values(): + if partition.get('encrypted', False): + yield partition + +def find_partition_by_mountpoint(partitions, relative_mountpoint :str): + for partition in partitions: + if partition.get('mountpoint', None) == relative_mountpoint: + return partition
\ No newline at end of file |