index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Anton Hvornum <anton@hvornum.se> | 2022-01-14 08:11:30 +0100 |
---|---|---|
committer | Anton Hvornum <anton@hvornum.se> | 2022-01-14 08:11:30 +0100 |
commit | 4bd07ea19f17ef8c78bf12f0d3d50f71c2306c19 (patch) | |
tree | f417000cc16087dca1aed81391431ab85de7f513 /archinstall/lib/disk/filesystem.py | |
parent | 0bc3e94c795fdde55ccc9b233b897498dc7b498e (diff) | |
parent | e8b6b1b334fffe5c5de8c2951a974b0126ffd2b0 (diff) |
-rw-r--r-- | archinstall/lib/disk/filesystem.py | 59 |
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 51ef949b..3b09ec6c 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,7 +1,13 @@ +from __future__ import annotations import time import logging import json import pathlib +from typing import Optional, Dict, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .partition import Partition from .validators import valid_fs_type from ..exceptions import DiskError @@ -16,24 +22,25 @@ class Filesystem: # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them # as well as close any crypto handles. - def __init__(self, blockdevice, mode): + def __init__(self, blockdevice :BlockDevice, mode :int): self.blockdevice = blockdevice self.mode = mode - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': return self - def __repr__(self): + def __repr__(self) -> str: return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] + SysCommand('sync') return True - def partuuid_to_index(self, uuid): + def partuuid_to_index(self, uuid :str) -> Optional[int]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() time.sleep(5) @@ -50,7 +57,7 @@ class Filesystem: raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") - def load_layout(self, layout :dict): + def load_layout(self, layout :Dict[str, Any]) -> None: from ..luks import luks2 # If the layout tells us to wipe the drive, we do so @@ -83,6 +90,8 @@ class Filesystem: raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition_uuid)}).") if partition.get('filesystem', {}).get('format', False): + # needed for backward compatibility with the introduction of the new "format_options" + format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[]) if partition.get('encrypted', False): if not partition.get('!password'): if not storage['arguments'].get('!encryption-password'): @@ -93,15 +102,12 @@ class Filesystem: storage['arguments']['!encryption-password'] = get_password(f"Enter a encryption password for {partition['device_instance']}") partition['!password'] = storage['arguments']['!encryption-password'] - # to be able to generate an unique name in case the partition will not be mounted + if partition.get('mountpoint',None): - ppath = partition['mountpoint'] + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: - ppath = partition['device_instance'].path - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop" - + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" partition['device_instance'].encrypt(password=partition['!password']) - # Immediately unlock the encrypted device to format the inner volume with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=True) as unlocked_device: if not partition.get('format'): @@ -119,29 +125,29 @@ class Filesystem: continue break - unlocked_device.format(partition['filesystem']['format'], options=partition.get('options', [])) + unlocked_device.format(partition['filesystem']['format'], options=format_options) elif partition.get('format', False): - partition['device_instance'].format(partition['filesystem']['format'], options=partition.get('options', [])) + partition['device_instance'].format(partition['filesystem']['format'], options=format_options) if partition.get('boot', False): log(f"Marking partition {partition['device_instance']} as bootable.") self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on') - def find_partition(self, mountpoint): + def find_partition(self, mountpoint :str) -> Partition: for partition in self.blockdevice: if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: return partition - def partprobe(self): - SysCommand(f'bash -c "partprobe"') + def partprobe(self) -> bool: + return SysCommand(f'bash -c "partprobe"').exit_code == 0 - def raw_parted(self, string: str): + def raw_parted(self, string: str) -> SysCommand: if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0: log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red") time.sleep(0.5) return cmd_handle - def parted(self, string: str): + def parted(self, string: str) -> bool: """ Performs a parted execution of the given string @@ -149,16 +155,17 @@ class Filesystem: :type string: str """ if (parted_handle := self.raw_parted(string)).exit_code == 0: - self.partprobe() - return True + if self.partprobe(): + return True + return False else: raise DiskError(f"Parted failed to add a partition: {parted_handle}") - def use_entire_disk(self, root_filesystem_type='ext4') -> Partition: + def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type, start, end, partition_format=None): + def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()} @@ -197,14 +204,14 @@ class Filesystem: log("Add partition is exiting due to excessive wait time",level=logging.INFO) raise DiskError(f"New partition never showed up after adding new partition on {self}.") - def set_name(self, partition: int, name: str): + def set_name(self, partition: int, name: str) -> bool: return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 - def set(self, partition: int, string: str): + def set(self, partition: int, string: str) -> bool: log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 - def parted_mklabel(self, device: str, disk_label: str): + def parted_mklabel(self, device: str, disk_label: str) -> bool: log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") # Try to unmount devices before attempting to run mklabel try: |