Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk.py')
-rw-r--r--archinstall/lib/disk.py79
1 files changed, 58 insertions, 21 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index 211bbcb5..4fb3d1e0 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -13,10 +13,19 @@ GPT = 0b00000001
MBR = 0b00000010
-# import ctypes
-# import ctypes.util
-# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
-# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
+def valid_fs_type(fstype :str) -> bool:
+ # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
+
+ return fstype in [
+ "ext2",
+ "fat16", "fat32",
+ "hfs", "hfs+", "hfsx",
+ "linux-swap",
+ "NTFS",
+ "reiserfs",
+ "ufs",
+ "btrfs",
+ ]
class BlockDevice:
@@ -177,6 +186,11 @@ class BlockDevice:
def flush_cache(self):
self.part_cache = {}
+ def get_partition(self, uuid):
+ for partition in self:
+ if partition.uuid == uuid:
+ return partition
+
class Partition:
def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
@@ -425,34 +439,34 @@ class Partition:
if filesystem == 'btrfs':
if b'UUID' not in (mkfs := SysCommand(f'/usr/bin/mkfs.btrfs -f {path}')):
raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
- self.filesystem = 'btrfs'
+ self.filesystem = filesystem
- elif filesystem == 'vfat':
+ elif filesystem == 'fat32':
mkfs = SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}')
if (b'mkfs.fat' not in mkfs and b'mkfs.vfat' not in mkfs) or b'command not found' in mkfs:
raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}")
- self.filesystem = 'vfat'
+ self.filesystem = filesystem
elif filesystem == 'ext4':
if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = 'ext4'
+ self.filesystem = filesystem
elif filesystem == 'xfs':
if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = 'xfs'
+ self.filesystem = filesystem
elif filesystem == 'f2fs':
if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = 'f2fs'
+ self.filesystem = filesystem
elif filesystem == 'crypto_LUKS':
# from .luks import luks2
# encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path)
- self.filesystem = 'crypto_LUKS'
+ self.filesystem = filesystem
else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
@@ -583,12 +597,32 @@ class Filesystem:
# We then iterate the partitions in order
for partition in layout.get('partitions', []):
# We don't want to re-add an existing partition (those containing a UUID already)
- if 'UUID' not in partition:
- self.add_partition(partition.get('type', 'primary'),
- start=partition.get('start', '1MiB'), # TODO: Revisit sane block starts (4MB for memorycards for instance)
- end=partition.get('size', '100%'),
- partition_format=partition.get('filesystem', {}).get('format', 'btrfs'))
+ if partition.get('format', False) and not partition.get('uuid', None):
+ partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
+ start=partition.get('start', '1MiB'), # TODO: Revisit sane block starts (4MB for memorycards for instance)
+ end=partition.get('size', '100%'),
+ partition_format=partition.get('filesystem', {}).get('format', 'btrfs'))
+
+ elif partition_uuid := partition.get('uuid'):
+ if partition_instance := self.blockdevice.get_partition(uuid=partition_uuid):
+ partition['device_instance'] = partition_instance
+ else:
+ raise ValueError("BlockDevice().load_layout() doesn't know how to continue without either a UUID or creation of partition.")
+
+ if partition.get('filesystem', {}).get('format', None):
+ if partition.get('encrypt', False):
+ assert partition.get('password')
+ partition['device_instance'].encrypt(password=partition['password'])
+ with archinstall.luks2(partition['device_instance'], 'luksloop', partition['password']) as unlocked_device:
+ unlocked_device.format(partition['filesystem']['format'], allow_formatting=partition.get('format', False))
+ else:
+ partition['device_instance'].format(partition['filesystem']['format'], allow_formatting=partition.get('format', False))
+
+ def mount_ordered_layout(self, layout :dict):
+ mountpoints = {}
+ for partition in layout['partitions']:
+ print(partition)
exit(0)
def find_partition(self, mountpoint):
@@ -610,17 +644,19 @@ class Filesystem:
"""
return self.raw_parted(string).exit_code
- def use_entire_disk(self, root_filesystem_type='ext4'):
+ def use_entire_disk(self, root_filesystem_type='ext4') -> Partition:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
def add_partition(self, partition_type, start, end, partition_format=None):
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
- previous_partitions = self.blockdevice.partitions
+ previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
+
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions")
+
if partition_format:
partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}') == 0
else:
@@ -628,12 +664,13 @@ class Filesystem:
if partitioning:
start_wait = time.time()
- while previous_partitions == self.blockdevice.partitions:
- time.sleep(0.025) # Let the new partition come up in the kernel
+ while previous_partition_uuids == {partition.uuid for partition in self.blockdevice.partitions.values()}:
if time.time() - start_wait > 10:
raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).")
+ time.sleep(0.025)
- return True
+ time.sleep(0.5) # Let the kernel catch up with quick block devices (nvme for instance)
+ return self.blockdevice.get_partition(uuid=(previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()}).pop())
def set_name(self, partition: int, name: str):
return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0