index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk.py | 105 |
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py index d05588a6..fbc11ca3 100644 --- a/archinstall/lib/disk.py +++ b/archinstall/lib/disk.py @@ -1,5 +1,5 @@ import glob, re, os, json, time, hashlib -import pathlib +import pathlib, traceback from collections import OrderedDict from .exceptions import DiskError from .general import * @@ -108,7 +108,7 @@ class BlockDevice(): if part_id not in self.part_cache: ## TODO: Force over-write even if in cache? if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: - self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size']) + self.part_cache[part_id] = Partition(root_path + part_id, self, part_id=part_id, size=part['size']) return {k: self.part_cache[k] for k in sorted(self.part_cache)} @@ -130,16 +130,22 @@ class BlockDevice(): return True return False + def flush_cache(self): + self.part_cache = OrderedDict() + class Partition(): - def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + def __init__(self, path :str, block_device :BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): if not part_id: part_id = os.path.basename(path) + + self.block_device = block_device self.path = path self.part_id = part_id self.mountpoint = mountpoint self.target_mountpoint = mountpoint self.filesystem = filesystem self.size = size # TODO: Refresh? + self._encrypted = None self.encrypted = encrypted self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions. @@ -175,28 +181,48 @@ class Partition(): elif self.target_mountpoint: mount_repr = f", rel_mountpoint={self.target_mountpoint}" - if self.encrypted: + if self._encrypted: return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})' else: return f'Partition(path={self.path}, fs={self.filesystem}{mount_repr})' @property + def encrypted(self): + return self._encrypted + + @encrypted.setter + def encrypted(self, value :bool): + if value: + log(f'Marking {self} as encrypted: {value}', level=LOG_LEVELS.Debug) + log(f"Callstrack when marking the partition: {''.join(traceback.format_stack())}", level=LOG_LEVELS.Debug) + + self._encrypted = value + + @property def real_device(self): - if not self.encrypted: + if not self._encrypted: return self.path else: for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']: if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))): return f"/dev/{parent}" - raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') + # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') + return self.path def detect_inner_filesystem(self, password): log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=LOG_LEVELS.Info) from .luks import luks2 - with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device: - return unlocked_device.filesystem + + try: + with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device: + return unlocked_device.filesystem + except SysCallError: + return None def has_content(self): + if not get_filesystem_type(self.path): + return False + temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest() temporary_path = pathlib.Path(temporary_mountpoint) @@ -213,8 +239,10 @@ class Partition(): def safe_to_format(self): if self.allow_formatting is False: + log(f"Partition {self} is not marked for formatting.", level=LOG_LEVELS.Debug) return False elif self.target_mountpoint == '/boot' and self.has_content(): + log(f"Partition {self} is a boot partition and has content inside.", level=LOG_LEVELS.Debug) return False return True @@ -225,10 +253,11 @@ class Partition(): """ from .luks import luks2 - if not self.encrypted: + if not self._encrypted: raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}") if not self.safe_to_format(): + log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=LOG_LEVELS.Error, fg="red") return False handle = luks2(self, None, None) @@ -247,6 +276,11 @@ class Partition(): if allow_formatting is None: allow_formatting = self.allow_formatting + # To avoid "unable to open /dev/x: No such file or directory" + start_wait = time.time() + while pathlib.Path(path).exists() is False and time.time() - start_wait < 10: + time.sleep(0.025) + if not allow_formatting: raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})") @@ -288,6 +322,12 @@ class Partition(): else: raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") + + if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': + self.encrypted = True + else: + self.encrypted = False + return True def find_parent_of(self, data, name, parent=None): @@ -313,6 +353,24 @@ class Partition(): self.mountpoint = target return True + def unmount(self): + try: + exit_code = sys_command(f'/usr/bin/umount {self.path}').exit_code + except SysCallError as err: + exit_code = err.exit_code + + # Without to much research, it seams that low error codes are errors. + # And above 8k is indicators such as "/dev/x not mounted.". + # So anything in between 0 and 8k are errors (?). + if exit_code > 0 and exit_code < 8000: + raise err + + self.mountpoint = None + return True + + def umount(self): + return self.unmount() + def filesystem_supported(self): """ The support for a filesystem (this partition) is tested by calling @@ -343,7 +401,8 @@ class Filesystem(): if self.blockdevice.keep_partitions is False: log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=LOG_LEVELS.Debug) if self.mode == GPT: - if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0: + if self.raw_parted(f'{self.blockdevice.device} mklabel gpt').exit_code == 0: + self.blockdevice.flush_cache() return self else: raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') @@ -380,7 +439,7 @@ class Filesystem(): def raw_parted(self, string:str): x = sys_command(f'/usr/bin/parted -s {string}') - o = b''.join(x) + log(f"'parted -s {string}' returned: {b''.join(x)}", level=LOG_LEVELS.Debug) return x def parted(self, string:str): @@ -392,25 +451,33 @@ class Filesystem(): """ return self.raw_parted(string).exit_code - def use_entire_disk(self, root_filesystem_type='ext4', encrypt_root_partition=True): - self.add_partition('primary', start='1MiB', end='513MiB', format='vfat') - #TODO: figure out what do for bios, we don't need a seprate partion for the bootloader + def use_entire_disk(self, root_filesystem_type='ext4'): + log(f"Using and formatting the entire {self.blockdevice}.", level=LOG_LEVELS.Debug) if hasUEFI(): + self.add_partition('primary', start='1MiB', end='513MiB', format='fat32') self.set_name(0, 'EFI') self.set(0, 'boot on') - # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"? - # https://www.gnu.org/software/parted/manual/html_node/set.html + # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"? + # https://www.gnu.org/software/parted/manual/html_node/set.html self.set(0, 'esp on') self.add_partition('primary', start='513MiB', end='100%') self.blockdevice.partition[0].filesystem = 'vfat' self.blockdevice.partition[1].filesystem = root_filesystem_type + log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=LOG_LEVELS.Debug) self.blockdevice.partition[0].target_mountpoint = '/boot' self.blockdevice.partition[1].target_mountpoint = '/' - if encrypt_root_partition: - self.blockdevice.partition[1].encrypted = True + self.blockdevice.partition[0].allow_formatting = True + self.blockdevice.partition[1].allow_formatting = True + else: + #we don't need a seprate boot partition it would be a waste of space + self.add_partition('primary', start='1MB', end='100%') + self.blockdevice.partition[0].filesystem=root_filesystem_type + log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=LOG_LEVELS.Debug) + self.blockdevice.partition[0].target_mountpoint = '/' + self.blockdevice.partition[0].allow_formatting = True def add_partition(self, type, start, end, format=None): log(f'Adding partition to {self.blockdevice}', level=LOG_LEVELS.Info) @@ -507,4 +574,4 @@ def get_filesystem_type(path): handle = sys_command(f"blkid -o value -s TYPE {path}") return b''.join(handle).strip().decode('UTF-8') except SysCallError: - return None
\ No newline at end of file + return None |