index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk.py | 280 |
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py index caf5c4e1..c05ba757 100644 --- a/archinstall/lib/disk.py +++ b/archinstall/lib/disk.py @@ -1,4 +1,5 @@ -import glob, re, os, json, time # Time is only used to gracefully wait for new paritions to come online +import glob, re, os, json, time, hashlib +import pathlib from collections import OrderedDict from .exceptions import DiskError from .general import * @@ -7,6 +8,7 @@ from .storage import storage ROOT_DIR_PATTERN = re.compile('^.*?/devices') GPT = 0b00000001 +MBR = 0b00000010 #import ctypes #import ctypes.util @@ -14,14 +16,27 @@ GPT = 0b00000001 #libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p) class BlockDevice(): - def __init__(self, path, info): + def __init__(self, path, info=None): + if not info: + # If we don't give any information, we need to auto-fill it. + # Otherwise any subsequent usage will break. + info = all_disks()[path].info + self.path = path self.info = info self.part_cache = OrderedDict() + # TODO: Currently disk encryption is a BIT missleading. + # It's actually partition-encryption, but for future-proofing this + # I'm placing the encryption password on a BlockDevice level. + self.encryption_passwoed = None def __repr__(self, *args, **kwargs): return f"BlockDevice({self.device})" + def __iter__(self): + for partition in self.partitions: + yield self.partitions[partition] + def __getitem__(self, key, *args, **kwargs): if key not in self.info: raise KeyError(f'{self} does not contain information: "{key}"') @@ -91,7 +106,8 @@ class BlockDevice(): part_id = part['name'][len(os.path.basename(self.path)):] if part_id not in self.part_cache: ## TODO: Force over-write even if in cache? - self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size']) + if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: + self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size']) return {k: self.part_cache[k] for k in sorted(self.part_cache)} @@ -100,50 +116,177 @@ class BlockDevice(): all_partitions = self.partitions return [all_partitions[k] for k in all_partitions] + @property + def partition_table_type(self): + return GPT + + def has_partitions(self): + return len(self.partitions) + + def has_mount_point(self, mountpoint): + for partition in self.partitions: + if self.partitions[partition].mountpoint == mountpoint: + return True + return False class Partition(): - def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False): + def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): if not part_id: part_id = os.path.basename(path) self.path = path self.part_id = part_id self.mountpoint = mountpoint - self.filesystem = filesystem # TODO: Autodetect if we're reusing a partition + self.target_mountpoint = mountpoint + self.filesystem = filesystem self.size = size # TODO: Refresh? self.encrypted = encrypted + self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions. + + if mountpoint: + self.mount(mountpoint) + + mount_information = get_mount_info(self.path) + + if self.mountpoint != mount_information.get('target', None) and mountpoint: + raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}") + + if (target := mount_information.get('target', None)): + self.mountpoint = target + + if not self.filesystem and autodetect_filesystem: + if (fstype := mount_information.get('fstype', get_filesystem_type(self.real_device))): + self.filesystem = fstype + + if self.filesystem == 'crypto_LUKS': + self.encrypted = True + + def __lt__(self, left_comparitor): + if type(left_comparitor) == Partition: + left_comparitor = left_comparitor.path + else: + left_comparitor = str(left_comparitor) + return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. def __repr__(self, *args, **kwargs): + mount_repr = '' + if self.mountpoint: + mount_repr = f", mounted={self.mountpoint}" + elif self.target_mountpoint: + mount_repr = f", rel_mountpoint={self.target_mountpoint}" + if self.encrypted: - return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}, mounted={self.mountpoint})' + return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})' + else: + return f'Partition(path={self.path}, fs={self.filesystem}{mount_repr})' + + @property + def real_device(self): + if not self.encrypted: + return self.path else: - return f'Partition(path={self.path}, fs={self.filesystem}, mounted={self.mountpoint})' + for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']: + if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))): + return f"/dev/{parent}" + raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') + + def detect_inner_filesystem(self, password): + log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=LOG_LEVELS.Info) + from .luks import luks2 + with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device: + return unlocked_device.filesystem + + def has_content(self): + temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest() + temporary_path = pathlib.Path(temporary_mountpoint) + + temporary_path.mkdir(parents=True, exist_ok=True) + if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: + raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') + + files = len(glob.glob(f"{temporary_mountpoint}/*")) + sys_command(f'/usr/bin/umount {temporary_mountpoint}') + + temporary_path.rmdir() + + return True if files > 0 else False + + def safe_to_format(self): + if self.allow_formatting is False: + return False + elif self.target_mountpoint == '/boot' and self.has_content(): + return False + + return True + + def encrypt(self, *args, **kwargs): + """ + A wrapper function for luks2() instances and the .encrypt() method of that instance. + """ + from .luks import luks2 + + if not self.encrypted: + raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}") + + if not self.safe_to_format(): + return False + + handle = luks2(self, None, None) + return handle.encrypt(self, *args, **kwargs) + + def format(self, filesystem=None, path=None, allow_formatting=None, log_formating=True): + """ + Format can be given an overriding path, for instance /dev/null to test + the formating functionality and in essence the support for the given filesystem. + """ + if filesystem is None: + filesystem = self.filesystem + + if path is None: + path = self.path + if allow_formatting is None: + allow_formatting = self.allow_formatting + + if not allow_formatting: + raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})") + + if log_formating: + log(f'Formatting {path} -> {filesystem}', level=LOG_LEVELS.Info) - def format(self, filesystem): - log(f'Formatting {self} -> {filesystem}', level=LOG_LEVELS.Info) if filesystem == 'btrfs': - o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {self.path}')) + o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}')) if b'UUID' not in o: - raise DiskError(f'Could not format {self.path} with {filesystem} because: {o}') + raise DiskError(f'Could not format {path} with {filesystem} because: {o}') self.filesystem = 'btrfs' - elif filesystem == 'fat32': - o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {self.path}')) + + elif filesystem == 'vfat': + o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {path}')) if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: - raise DiskError(f'Could not format {self.path} with {filesystem} because: {o}') - self.filesystem = 'fat32' + raise DiskError(f'Could not format {path} with {filesystem} because: {o}') + self.filesystem = 'vfat' + elif filesystem == 'ext4': - if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {self.path}')).exit_code != 0: - raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}') + if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: + raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'ext4' + elif filesystem == 'xfs': - if (handle:= sys_command(f'/usr/bin/mkfs.xfs -f {self.path}')).exit_code != 0: - raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}') + if (handle := sys_command(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: + raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'xfs' + elif filesystem == 'f2fs': - if (handle:= sys_command(f'/usr/bin/mkfs.f2fs -f {self.path}')).exit_code != 0: - raise DiskError(f'Could not format {self.path} with {filesystem} because: {b"".join(handle)}') + if (handle := sys_command(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: + raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'f2fs' + + elif filesystem == 'crypto_LUKS': + # from .luks import luks2 + # encrypted_partition = luks2(self, None, None) + # encrypted_partition.format(path) + self.filesystem = 'crypto_LUKS' + else: - raise DiskError(f'Fileformat {filesystem} is not yet implemented.') + raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") return True def find_parent_of(self, data, name, parent=None): @@ -154,16 +297,6 @@ class Partition(): if (parent := self.find_parent_of(child, name, parent=data['name'])): return parent - @property - def real_device(self): - if not self.encrypted: - return self.path - else: - for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']: - if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))): - return f"/dev/{parent}" - raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') - def mount(self, target, fs=None, options=''): if not self.mountpoint: log(f'Mounting {self} to {target}', level=LOG_LEVELS.Info) @@ -179,6 +312,21 @@ class Partition(): self.mountpoint = target return True + def filesystem_supported(self): + """ + The support for a filesystem (this partition) is tested by calling + partition.format() with a path set to '/dev/null' which returns two exceptions: + 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported + 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type + """ + try: + self.format(self.filesystem, '/dev/null', log_formating=False, allow_formatting=True) + except SysCallError: + pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code + except UnknownFilesystemFormat as err: + raise err + return True + class Filesystem(): # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them @@ -188,13 +336,23 @@ class Filesystem(): self.mode = mode def __enter__(self, *args, **kwargs): - if self.mode == GPT: - if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0: - return self + if self.blockdevice.keep_partitions is False: + log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=LOG_LEVELS.Debug) + if self.mode == GPT: + if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0: + return self + else: + raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') else: - raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') + raise DiskError(f'Unknown mode selected to format in: {self.mode}') + + # TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed. + elif self.mode == self.blockdevice.partition_table_type: + log(f'Kept partition format {self.mode} for {self.blockdevice}', level=LOG_LEVELS.Debug) else: - raise DiskError(f'Unknown mode selected to format in: {self.mode}') + raise DiskError(f'The selected partition table format {self.mode} does not match that of {self.blockdevice}.') + + return self def __repr__(self): return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" @@ -206,6 +364,11 @@ class Filesystem(): b''.join(sys_command(f'sync')) return True + def find_partition(self, mountpoint): + for partition in self.blockdevice: + if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: + return partition + def raw_parted(self, string:str): x = sys_command(f'/usr/bin/parted -s {string}') o = b''.join(x) @@ -220,15 +383,23 @@ class Filesystem(): """ return self.raw_parted(string).exit_code - def use_entire_disk(self, prep_mode=None): - self.add_partition('primary', start='1MiB', end='513MiB', format='fat32') + def use_entire_disk(self, root_filesystem_type='ext4', encrypt_root_partition=True): + self.add_partition('primary', start='1MiB', end='513MiB', format='vfat') self.set_name(0, 'EFI') self.set(0, 'boot on') - self.set(0, 'esp on') # TODO: Redundant, as in GPT mode it's an alias for "boot on"? https://www.gnu.org/software/parted/manual/html_node/set.html - if prep_mode == 'luks2': - self.add_partition('primary', start='513MiB', end='100%') - else: - self.add_partition('primary', start='513MiB', end='100%', format='ext4') + # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"? + # https://www.gnu.org/software/parted/manual/html_node/set.html + self.set(0, 'esp on') + self.add_partition('primary', start='513MiB', end='100%') + + self.blockdevice.partition[0].filesystem = 'vfat' + self.blockdevice.partition[1].filesystem = root_filesystem_type + + self.blockdevice.partition[0].target_mountpoint = '/boot' + self.blockdevice.partition[1].target_mountpoint = '/' + + if encrypt_root_partition: + self.blockdevice.partition[1].encrypted = True def add_partition(self, type, start, end, format=None): log(f'Adding partition to {self.blockdevice}', level=LOG_LEVELS.Info) @@ -302,3 +473,24 @@ def harddrive(size=None, model=None, fuzzy=False): continue return collection[drive] + +def get_mount_info(path): + try: + output = b''.join(sys_command(f'/usr/bin/findmnt --json {path}')) + except SysCallError: + return {} + + output = output.decode('UTF-8') + output = json.loads(output) + if 'filesystems' in output: + if len(output['filesystems']) > 1: + raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}") + + return output['filesystems'][0] + +def get_filesystem_type(path): + try: + handle = sys_command(f"blkid -o value -s TYPE {path}") + return b''.join(handle).strip().decode('UTF-8') + except SysCallError: + return None
\ No newline at end of file |