Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk.py')
-rw-r--r--archinstall/lib/disk.py105
1 files changed, 86 insertions, 19 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index d05588a6..fbc11ca3 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -1,5 +1,5 @@
import glob, re, os, json, time, hashlib
-import pathlib
+import pathlib, traceback
from collections import OrderedDict
from .exceptions import DiskError
from .general import *
@@ -108,7 +108,7 @@ class BlockDevice():
if part_id not in self.part_cache:
## TODO: Force over-write even if in cache?
if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']:
- self.part_cache[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size'])
+ self.part_cache[part_id] = Partition(root_path + part_id, self, part_id=part_id, size=part['size'])
return {k: self.part_cache[k] for k in sorted(self.part_cache)}
@@ -130,16 +130,22 @@ class BlockDevice():
return True
return False
+ def flush_cache(self):
+ self.part_cache = OrderedDict()
+
class Partition():
- def __init__(self, path, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
+ def __init__(self, path :str, block_device :BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
if not part_id:
part_id = os.path.basename(path)
+
+ self.block_device = block_device
self.path = path
self.part_id = part_id
self.mountpoint = mountpoint
self.target_mountpoint = mountpoint
self.filesystem = filesystem
self.size = size # TODO: Refresh?
+ self._encrypted = None
self.encrypted = encrypted
self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions.
@@ -175,28 +181,48 @@ class Partition():
elif self.target_mountpoint:
mount_repr = f", rel_mountpoint={self.target_mountpoint}"
- if self.encrypted:
+ if self._encrypted:
return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
else:
return f'Partition(path={self.path}, fs={self.filesystem}{mount_repr})'
@property
+ def encrypted(self):
+ return self._encrypted
+
+ @encrypted.setter
+ def encrypted(self, value :bool):
+ if value:
+ log(f'Marking {self} as encrypted: {value}', level=LOG_LEVELS.Debug)
+ log(f"Callstrack when marking the partition: {''.join(traceback.format_stack())}", level=LOG_LEVELS.Debug)
+
+ self._encrypted = value
+
+ @property
def real_device(self):
- if not self.encrypted:
+ if not self._encrypted:
return self.path
else:
for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
return f"/dev/{parent}"
- raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
+ # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
+ return self.path
def detect_inner_filesystem(self, password):
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=LOG_LEVELS.Info)
from .luks import luks2
- with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device:
- return unlocked_device.filesystem
+
+ try:
+ with luks2(self, 'luksloop', password, auto_unmount=True) as unlocked_device:
+ return unlocked_device.filesystem
+ except SysCallError:
+ return None
def has_content(self):
+ if not get_filesystem_type(self.path):
+ return False
+
temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest()
temporary_path = pathlib.Path(temporary_mountpoint)
@@ -213,8 +239,10 @@ class Partition():
def safe_to_format(self):
if self.allow_formatting is False:
+ log(f"Partition {self} is not marked for formatting.", level=LOG_LEVELS.Debug)
return False
elif self.target_mountpoint == '/boot' and self.has_content():
+ log(f"Partition {self} is a boot partition and has content inside.", level=LOG_LEVELS.Debug)
return False
return True
@@ -225,10 +253,11 @@ class Partition():
"""
from .luks import luks2
- if not self.encrypted:
+ if not self._encrypted:
raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}")
if not self.safe_to_format():
+ log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=LOG_LEVELS.Error, fg="red")
return False
handle = luks2(self, None, None)
@@ -247,6 +276,11 @@ class Partition():
if allow_formatting is None:
allow_formatting = self.allow_formatting
+ # To avoid "unable to open /dev/x: No such file or directory"
+ start_wait = time.time()
+ while pathlib.Path(path).exists() is False and time.time() - start_wait < 10:
+ time.sleep(0.025)
+
if not allow_formatting:
raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
@@ -288,6 +322,12 @@ class Partition():
else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
+
+ if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
+ self.encrypted = True
+ else:
+ self.encrypted = False
+
return True
def find_parent_of(self, data, name, parent=None):
@@ -313,6 +353,24 @@ class Partition():
self.mountpoint = target
return True
+ def unmount(self):
+ try:
+ exit_code = sys_command(f'/usr/bin/umount {self.path}').exit_code
+ except SysCallError as err:
+ exit_code = err.exit_code
+
+ # Without to much research, it seams that low error codes are errors.
+ # And above 8k is indicators such as "/dev/x not mounted.".
+ # So anything in between 0 and 8k are errors (?).
+ if exit_code > 0 and exit_code < 8000:
+ raise err
+
+ self.mountpoint = None
+ return True
+
+ def umount(self):
+ return self.unmount()
+
def filesystem_supported(self):
"""
The support for a filesystem (this partition) is tested by calling
@@ -343,7 +401,8 @@ class Filesystem():
if self.blockdevice.keep_partitions is False:
log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=LOG_LEVELS.Debug)
if self.mode == GPT:
- if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt',).exit_code == 0:
+ if self.raw_parted(f'{self.blockdevice.device} mklabel gpt').exit_code == 0:
+ self.blockdevice.flush_cache()
return self
else:
raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
@@ -380,7 +439,7 @@ class Filesystem():
def raw_parted(self, string:str):
x = sys_command(f'/usr/bin/parted -s {string}')
- o = b''.join(x)
+ log(f"'parted -s {string}' returned: {b''.join(x)}", level=LOG_LEVELS.Debug)
return x
def parted(self, string:str):
@@ -392,25 +451,33 @@ class Filesystem():
"""
return self.raw_parted(string).exit_code
- def use_entire_disk(self, root_filesystem_type='ext4', encrypt_root_partition=True):
- self.add_partition('primary', start='1MiB', end='513MiB', format='vfat')
- #TODO: figure out what do for bios, we don't need a seprate partion for the bootloader
+ def use_entire_disk(self, root_filesystem_type='ext4'):
+ log(f"Using and formatting the entire {self.blockdevice}.", level=LOG_LEVELS.Debug)
if hasUEFI():
+ self.add_partition('primary', start='1MiB', end='513MiB', format='fat32')
self.set_name(0, 'EFI')
self.set(0, 'boot on')
- # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
- # https://www.gnu.org/software/parted/manual/html_node/set.html
+ # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
+ # https://www.gnu.org/software/parted/manual/html_node/set.html
self.set(0, 'esp on')
self.add_partition('primary', start='513MiB', end='100%')
self.blockdevice.partition[0].filesystem = 'vfat'
self.blockdevice.partition[1].filesystem = root_filesystem_type
+ log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=LOG_LEVELS.Debug)
self.blockdevice.partition[0].target_mountpoint = '/boot'
self.blockdevice.partition[1].target_mountpoint = '/'
- if encrypt_root_partition:
- self.blockdevice.partition[1].encrypted = True
+ self.blockdevice.partition[0].allow_formatting = True
+ self.blockdevice.partition[1].allow_formatting = True
+ else:
+ #we don't need a seprate boot partition it would be a waste of space
+ self.add_partition('primary', start='1MB', end='100%')
+ self.blockdevice.partition[0].filesystem=root_filesystem_type
+ log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=LOG_LEVELS.Debug)
+ self.blockdevice.partition[0].target_mountpoint = '/'
+ self.blockdevice.partition[0].allow_formatting = True
def add_partition(self, type, start, end, format=None):
log(f'Adding partition to {self.blockdevice}', level=LOG_LEVELS.Info)
@@ -507,4 +574,4 @@ def get_filesystem_type(path):
handle = sys_command(f"blkid -o value -s TYPE {path}")
return b''.join(handle).strip().decode('UTF-8')
except SysCallError:
- return None \ No newline at end of file
+ return None