Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/__init__.py2
-rw-r--r--archinstall/lib/disk/blockdevice.py43
-rw-r--r--archinstall/lib/disk/btrfs.py13
-rw-r--r--archinstall/lib/disk/filesystem.py16
-rw-r--r--archinstall/lib/disk/helpers.py5
-rw-r--r--archinstall/lib/disk/partition.py37
-rw-r--r--archinstall/lib/disk/user_guides.py5
-rw-r--r--archinstall/lib/disk/validators.py2
8 files changed, 78 insertions, 45 deletions
diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py
index 352d04b9..bb6eb815 100644
--- a/archinstall/lib/disk/__init__.py
+++ b/archinstall/lib/disk/__init__.py
@@ -4,4 +4,4 @@ from .blockdevice import BlockDevice
from .filesystem import Filesystem, MBR, GPT
from .partition import *
from .user_guides import *
-from .validators import * \ No newline at end of file
+from .validators import *
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index 57cbcfa6..fad04c7b 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -1,12 +1,17 @@
+# flake8: noqa
+# The above ignore, see issue: https://github.com/archlinux/archinstall/pull/650#issuecomment-961995949
import os
import json
import logging
+from .exceptions import DiskError
+from .helpers import all_disks
from ..output import log
from ..general import SysCommand
class BlockDevice:
def __init__(self, path, info=None):
if not info:
+ from .helpers import all_disks
# If we don't give any information, we need to auto-fill it.
# Otherwise any subsequent usage will break.
info = all_disks()[path].info
@@ -21,7 +26,7 @@ class BlockDevice:
# I'm placing the encryption password on a BlockDevice level.
def __repr__(self, *args, **kwargs):
- return f"BlockDevice({self.device}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
+ return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
def __iter__(self):
for partition in self.partitions:
@@ -57,28 +62,38 @@ class BlockDevice:
@property
def partition_type(self):
output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
-
+
for device in output['blockdevices']:
return device['pttype']
@property
- def device(self):
+ def device_or_backfile(self):
"""
Returns the actual device-endpoint of the BlockDevice.
If it's a loop-back-device it returns the back-file,
- If it's a ATA-drive it returns the /dev/X device
- And if it's a crypto-device it returns the parent device
+ For other types it return self.device
"""
- if "type" not in self.info:
- raise DiskError(f'Could not locate backplane info for "{self.path}"')
-
if self.info['type'] == 'loop':
for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
if not drive['name'] == self.path:
continue
return drive['back-file']
- elif self.info['type'] == 'disk':
+ else:
+ return self.device
+
+ @property
+ def device(self):
+ """
+ Returns the device file of the BlockDevice.
+ If it's a loop-back-device it returns the /dev/X device,
+ If it's a ATA-drive it returns the /dev/X device
+ And if it's a crypto-device it returns the parent device
+ """
+ if "type" not in self.info:
+ raise DiskError(f'Could not locate backplane info for "{self.path}"')
+
+ if self.info['type'] in ['disk','loop']:
return self.path
elif self.info['type'][:4] == 'raid':
# This should catch /dev/md## raid devices
@@ -154,21 +169,21 @@ class BlockDevice:
@property
def size(self):
output = json.loads(SysCommand(f"lsblk --json -o+SIZE {self.path}").decode('UTF-8'))
-
+
for device in output['blockdevices']:
return self.convert_size_to_gb(device['size'])
@property
def bus_type(self):
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
-
+
for device in output['blockdevices']:
return device['tran']
-
+
@property
def spinning(self):
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
-
+
for device in output['blockdevices']:
return device['rota'] is True
@@ -210,4 +225,4 @@ class BlockDevice:
def get_partition(self, uuid):
for partition in self:
if partition.uuid == uuid:
- return partition \ No newline at end of file
+ return partition
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py
index 6fafab34..7ae4f6a6 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs.py
@@ -1,4 +1,5 @@
-import pathlib, glob
+import pathlib
+import glob
import logging
from typing import Union
from .helpers import get_mount_info
@@ -27,9 +28,9 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str],
if not target.exists():
target.mkdir(parents=True)
- if glob.glob(str(target/'*')) and force is False:
+ if glob.glob(str(target / '*')) and force is False:
raise DiskError(f"Cannot mount subvolume to {target} because it contains data (non-empty folder target)")
-
+
log(f"Mounting {target} as a subvolume", level=logging.INFO)
# Mount the logical volume to the physical structure
mount_information, mountpoint_device_real_path = get_mount_info(target, traverse=True, return_real_path=True)
@@ -46,7 +47,7 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str])
@installation: archinstall.Installer instance
@subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
"""
-
+
installation_mountpoint = installation.target
if type(installation_mountpoint) == str:
installation_mountpoint = pathlib.Path(installation_mountpoint)
@@ -61,7 +62,7 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str])
if not target.parent.exists():
target.parent.mkdir(parents=True)
- if glob.glob(str(target/'*')) and force is False:
+ if glob.glob(str(target / '*')):
raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)")
# Remove the target if it exists
@@ -70,4 +71,4 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str])
log(f"Creating a subvolume on {target}", level=logging.INFO)
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
- raise DiskError(f"Could not create a subvolume at {target}: {cmd}") \ No newline at end of file
+ raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 0328cd83..69593d08 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,8 +1,9 @@
import time
import logging
import json
+from .exceptions import DiskError
from .partition import Partition
-from .blockdevice import BlockDevice
+from .validators import valid_fs_type
from ..general import SysCommand
from ..output import log
from ..storage import storage
@@ -55,7 +56,7 @@ class Filesystem:
def partuuid_to_index(self, uuid):
output = json.loads(SysCommand(f"lsblk --json -o+PARTUUID {self.blockdevice.device}").decode('UTF-8'))
-
+
for device in output['blockdevices']:
for index, partition in enumerate(device['children']):
if partition['partuuid'].lower() == uuid:
@@ -101,7 +102,7 @@ class Filesystem:
partition['password'] = get_password(f"Enter a encryption password for {partition['device_instance']}")
partition['device_instance'].encrypt(password=partition['password'])
- with luks2(partition['device_instance'], storage.get('ENC_IDENTIFIER', 'ai')+'loop', partition['password']) as unlocked_device:
+ with luks2(partition['device_instance'], storage.get('ENC_IDENTIFIER', 'ai') + 'loop', partition['password']) as unlocked_device:
if not partition.get('format'):
if storage['arguments'] == 'silent':
raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}")
@@ -113,13 +114,13 @@ class Filesystem:
while True:
partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
- pint("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")
+ print("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")
continue
break
- unlocked_device.format(partition['filesystem']['format'])
+ unlocked_device.format(partition['filesystem']['format'], options=partition.get('options', []))
elif partition.get('format', False):
- partition['device_instance'].format(partition['filesystem']['format'])
+ partition['device_instance'].format(partition['filesystem']['format'], options=partition.get('options', []))
if partition.get('boot', False):
self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
@@ -170,7 +171,6 @@ class Filesystem:
raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).")
time.sleep(0.025)
-
# Todo: Find a better way to detect if the new UUID of the partition has showed up.
# But this will address (among other issues)
time.sleep(float(storage['arguments'].get('disk-sleep', 2.0))) # Let the kernel catch up with quick block devices (nvme for instance)
@@ -190,4 +190,4 @@ class Filesystem:
SysCommand(f'bash -c "umount {device}?"')
except:
pass
- return self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0 \ No newline at end of file
+ return self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 341b732f..8e044ce3 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -1,10 +1,11 @@
import re
+import os
import json
import logging
import pathlib
from typing import Union
from .blockdevice import BlockDevice
-from ..exceptions import SysCallError
+from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
from ..output import log
@@ -199,4 +200,4 @@ def find_partition_by_mountpoint(block_devices, relative_mountpoint :str):
for device in block_devices:
for partition in block_devices[device]['partitions']:
if partition.get('mountpoint', None) == relative_mountpoint:
- return partition \ No newline at end of file
+ return partition
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 3bb2982b..afe46db3 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -4,9 +4,12 @@ import time
import logging
import json
import os
+import hashlib
from typing import Optional
from .blockdevice import BlockDevice
+from .exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from .helpers import get_mount_info, get_filesystem_type
+from .storage import storage
from ..output import log
from ..general import SysCommand
@@ -82,14 +85,14 @@ class Partition:
@property
def sector_size(self):
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.path}").decode('UTF-8'))
-
+
for device in output['blockdevices']:
return device.get('log-sec', None)
@property
def start(self):
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
-
+
for partition in output.get('partitiontable', {}).get('partitions', []):
if partition['node'] == self.path:
return partition['start']# * self.sector_size
@@ -173,7 +176,7 @@ class Partition:
from .luks import luks2
try:
- with luks2(self, storage.get('ENC_IDENTIFIER', 'ai')+'loop', password, auto_unmount=True) as unlocked_device:
+ with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device:
return unlocked_device.filesystem
except SysCallError:
return None
@@ -208,7 +211,7 @@ class Partition:
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem=None, path=None, log_formatting=True):
+ def format(self, filesystem=None, path=None, log_formatting=True, options=[]):
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@@ -228,33 +231,45 @@ class Partition:
log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
if filesystem == 'btrfs':
- if 'UUID:' not in (mkfs := SysCommand(f'/usr/bin/mkfs.btrfs -f {path}').decode('UTF-8')):
+ options = ['-f'] + options
+
+ if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
self.filesystem = filesystem
elif filesystem == 'fat32':
- mkfs = SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}').decode('UTF-8')
+ options = ['-F32'] + options
+
+ mkfs = SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}").decode('UTF-8')
if ('mkfs.fat' not in mkfs and 'mkfs.vfat' not in mkfs) or 'command not found' in mkfs:
raise DiskError(f"Could not format {path} with {filesystem} because: {mkfs}")
self.filesystem = filesystem
elif filesystem == 'ext4':
- if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
+ options = ['-F'] + options
+
+ if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
elif filesystem == 'ext2':
- if (handle := SysCommand(f'/usr/bin/mkfs.ext2 -F {path}')).exit_code != 0:
+ options = ['-F'] + options
+
+ if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'ext2'
elif filesystem == 'xfs':
- if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
+ options = ['-f'] + options
+
+ if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
elif filesystem == 'f2fs':
- if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
+ options = ['-f'] + options
+
+ if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
self.filesystem = filesystem
@@ -334,4 +349,4 @@ class Partition:
pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err:
raise err
- return True \ No newline at end of file
+ return True
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 93013fbc..5354fe2a 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -1,4 +1,5 @@
import logging
+from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
from ..output import log
def suggest_single_disk_layout(block_device, default_filesystem=None):
@@ -55,7 +56,7 @@ def suggest_single_disk_layout(block_device, default_filesystem=None):
}
}
else:
- pass #... implement a guided setup
+ pass # ... implement a guided setup
elif block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART:
# If we don't want to use subvolumes,
@@ -146,4 +147,4 @@ def suggest_multi_disk_layout(block_devices, default_filesystem=None):
}
})
- return layout \ No newline at end of file
+ return layout
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
index 0c9f5a74..e0ab6a86 100644
--- a/archinstall/lib/disk/validators.py
+++ b/archinstall/lib/disk/validators.py
@@ -39,4 +39,4 @@ def valid_fs_type(fstype :str) -> bool:
"reiserfs",
"udf", # "ufs", not included in `man parted`
"xfs", # `man parted` allows this
- ] \ No newline at end of file
+ ]