Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/filesystem.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/filesystem.py')
-rw-r--r--archinstall/lib/disk/filesystem.py59
1 files changed, 33 insertions, 26 deletions
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 51ef949b..3b09ec6c 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,7 +1,13 @@
+from __future__ import annotations
import time
import logging
import json
import pathlib
+from typing import Optional, Dict, Any, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .blockdevice import BlockDevice
+
from .partition import Partition
from .validators import valid_fs_type
from ..exceptions import DiskError
@@ -16,24 +22,25 @@ class Filesystem:
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
- def __init__(self, blockdevice, mode):
+ def __init__(self, blockdevice :BlockDevice, mode :int):
self.blockdevice = blockdevice
self.mode = mode
- def __enter__(self, *args, **kwargs):
+ def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
return self
- def __repr__(self):
+ def __repr__(self) -> str:
return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
- def __exit__(self, *args, **kwargs):
+ def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
+
SysCommand('sync')
return True
- def partuuid_to_index(self, uuid):
+ def partuuid_to_index(self, uuid :str) -> Optional[int]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
time.sleep(5)
@@ -50,7 +57,7 @@ class Filesystem:
raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")
- def load_layout(self, layout :dict):
+ def load_layout(self, layout :Dict[str, Any]) -> None:
from ..luks import luks2
# If the layout tells us to wipe the drive, we do so
@@ -83,6 +90,8 @@ class Filesystem:
raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition_uuid)}).")
if partition.get('filesystem', {}).get('format', False):
+ # needed for backward compatibility with the introduction of the new "format_options"
+ format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[])
if partition.get('encrypted', False):
if not partition.get('!password'):
if not storage['arguments'].get('!encryption-password'):
@@ -93,15 +102,12 @@ class Filesystem:
storage['arguments']['!encryption-password'] = get_password(f"Enter a encryption password for {partition['device_instance']}")
partition['!password'] = storage['arguments']['!encryption-password']
- # to be able to generate an unique name in case the partition will not be mounted
+
if partition.get('mountpoint',None):
- ppath = partition['mountpoint']
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
else:
- ppath = partition['device_instance'].path
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop"
-
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
partition['device_instance'].encrypt(password=partition['!password'])
-
# Immediately unlock the encrypted device to format the inner volume
with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=True) as unlocked_device:
if not partition.get('format'):
@@ -119,29 +125,29 @@ class Filesystem:
continue
break
- unlocked_device.format(partition['filesystem']['format'], options=partition.get('options', []))
+ unlocked_device.format(partition['filesystem']['format'], options=format_options)
elif partition.get('format', False):
- partition['device_instance'].format(partition['filesystem']['format'], options=partition.get('options', []))
+ partition['device_instance'].format(partition['filesystem']['format'], options=format_options)
if partition.get('boot', False):
log(f"Marking partition {partition['device_instance']} as bootable.")
self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
- def find_partition(self, mountpoint):
+ def find_partition(self, mountpoint :str) -> Partition:
for partition in self.blockdevice:
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
return partition
- def partprobe(self):
- SysCommand(f'bash -c "partprobe"')
+ def partprobe(self) -> bool:
+ return SysCommand(f'bash -c "partprobe"').exit_code == 0
- def raw_parted(self, string: str):
+ def raw_parted(self, string: str) -> SysCommand:
if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0:
log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red")
time.sleep(0.5)
return cmd_handle
- def parted(self, string: str):
+ def parted(self, string: str) -> bool:
"""
Performs a parted execution of the given string
@@ -149,16 +155,17 @@ class Filesystem:
:type string: str
"""
if (parted_handle := self.raw_parted(string)).exit_code == 0:
- self.partprobe()
- return True
+ if self.partprobe():
+ return True
+ return False
else:
raise DiskError(f"Parted failed to add a partition: {parted_handle}")
- def use_entire_disk(self, root_filesystem_type='ext4') -> Partition:
+ def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type, start, end, partition_format=None):
+ def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
@@ -197,14 +204,14 @@ class Filesystem:
log("Add partition is exiting due to excessive wait time",level=logging.INFO)
raise DiskError(f"New partition never showed up after adding new partition on {self}.")
- def set_name(self, partition: int, name: str):
+ def set_name(self, partition: int, name: str) -> bool:
return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
- def set(self, partition: int, string: str):
+ def set(self, partition: int, string: str) -> bool:
log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
- def parted_mklabel(self, device: str, disk_label: str):
+ def parted_mklabel(self, device: str, disk_label: str) -> bool:
log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
# Try to unmount devices before attempting to run mklabel
try: