Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/__init__.py2
-rw-r--r--archinstall/lib/disk/blockdevice.py104
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py (renamed from archinstall/lib/disk/btrfs.py)92
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py132
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py116
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolume.py191
-rw-r--r--archinstall/lib/disk/filesystem.py101
-rw-r--r--archinstall/lib/disk/helpers.py31
-rw-r--r--archinstall/lib/disk/mapperdev.py13
-rw-r--r--archinstall/lib/disk/partition.py167
-rw-r--r--archinstall/lib/disk/user_guides.py36
11 files changed, 781 insertions, 204 deletions
diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py
index bb6eb815..352d04b9 100644
--- a/archinstall/lib/disk/__init__.py
+++ b/archinstall/lib/disk/__init__.py
@@ -4,4 +4,4 @@ from .blockdevice import BlockDevice
from .filesystem import Filesystem, MBR, GPT
from .partition import *
from .user_guides import *
-from .validators import *
+from .validators import * \ No newline at end of file
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index 206c3b7e..c7b69205 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -3,6 +3,7 @@ import os
import json
import logging
import time
+from functools import cached_property
from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -32,7 +33,29 @@ class BlockDevice:
# I'm placing the encryption password on a BlockDevice level.
def __repr__(self, *args :str, **kwargs :str) -> str:
- return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
+ return self._str_repr
+
+ @cached_property
+ def _str_repr(self) -> str:
+ return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})"
+
+ @cached_property
+ def display_info(self) -> str:
+ columns = {
+ str(_('Device')): self.device_or_backfile,
+ str(_('Size')): f'{self._safe_size}GB',
+ str(_('Free space')): f'{self._safe_free_space}',
+ str(_('Bus-type')): f'{self.bus_type}'
+ }
+
+ padding = max([len(k) for k in columns.keys()])
+
+ pretty = ''
+ for k, v in columns.items():
+ k = k.ljust(padding, ' ')
+ pretty += f'{k} = {v}\n'
+
+ return pretty.rstrip()
def __iter__(self) -> Iterator[Partition]:
for partition in self.partitions:
@@ -74,7 +97,7 @@ class BlockDevice:
for device in output['blockdevices']:
return device['pttype']
- @property
+ @cached_property
def device_or_backfile(self) -> str:
"""
Returns the actual device-endpoint of the BlockDevice.
@@ -157,7 +180,7 @@ class BlockDevice:
from .filesystem import GPT
return GPT
- @property
+ @cached_property
def uuid(self) -> str:
log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
@@ -167,7 +190,19 @@ class BlockDevice:
"""
return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
- @property
+ @cached_property
+ def _safe_size(self) -> float:
+ from .helpers import convert_size_to_gb
+
+ try:
+ output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
+ except SysCallError:
+ return -1.0
+
+ for device in output['blockdevices']:
+ return convert_size_to_gb(device['size'])
+
+ @cached_property
def size(self) -> float:
from .helpers import convert_size_to_gb
@@ -176,22 +211,29 @@ class BlockDevice:
for device in output['blockdevices']:
return convert_size_to_gb(device['size'])
- @property
+ @cached_property
def bus_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['tran']
- @property
+ @cached_property
def spinning(self) -> bool:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['rota'] is True
- @property
- def free_space(self) -> Tuple[str, str, str]:
+ @cached_property
+ def _safe_free_space(self) -> Tuple[str, ...]:
+ try:
+ return '+'.join(part[2] for part in self.free_space)
+ except SysCallError:
+ return '?'
+
+ @cached_property
+ def free_space(self) -> Tuple[str, ...]:
# NOTE: parted -s will default to `cancel` on prompt, skipping any partition
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
@@ -204,7 +246,7 @@ class BlockDevice:
except SysCallError as error:
log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
- @property
+ @cached_property
def largest_free_space(self) -> List[str]:
info = []
for space_info in self.free_space:
@@ -216,7 +258,7 @@ class BlockDevice:
info = space_info
return info
- @property
+ @cached_property
def first_free_sector(self) -> str:
if info := self.largest_free_space:
start = info[0]
@@ -224,7 +266,7 @@ class BlockDevice:
start = '512MB'
return start
- @property
+ @cached_property
def first_end_sector(self) -> str:
if info := self.largest_free_space:
end = info[1]
@@ -247,19 +289,27 @@ class BlockDevice:
def flush_cache(self) -> None:
self.part_cache = {}
- def get_partition(self, uuid :str) -> Partition:
- count = 0
- while count < 5:
- for partition_uuid, partition in self.partitions.items():
- if partition.uuid.lower() == uuid.lower():
- return partition
- else:
- log(f"uuid {uuid} not found. Waiting for {count +1} time",level=logging.DEBUG)
- time.sleep(float(storage['arguments'].get('disk-sleep', 0.2)))
- count += 1
- else:
- log(f"Could not find {uuid} in disk after 5 retries",level=logging.INFO)
- print(f"Cache: {self.part_cache}")
- print(f"Partitions: {self.partitions.items()}")
- print(f"UUID: {[uuid]}")
- raise DiskError(f"New partition {uuid} never showed up after adding new partition on {self}")
+ def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition:
+ if not uuid and not partuuid:
+ raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.")
+
+ for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)):
+ for partition_index, partition in self.partitions.items():
+ try:
+ if uuid and partition.uuid.lower() == uuid.lower():
+ return partition
+ elif partuuid and partition.part_uuid.lower() == partuuid.lower():
+ return partition
+ except DiskError as error:
+ # Most likely a blockdevice that doesn't support or use UUID's
+ # (like Microsoft recovery partition)
+ log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray")
+ pass
+
+ log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
+ time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
+
+ log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)
+ log(f"Cache: {self.part_cache}")
+ log(f"Partitions: {self.partitions.items()}")
+ raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.")
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs/__init__.py
index 33f59721..84b9c0f6 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs/__init__.py
@@ -4,44 +4,25 @@ import glob
import logging
import re
from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
-from dataclasses import dataclass
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
- from ..installer import Installer
-from .helpers import get_mount_info
-from ..exceptions import DiskError
-from ..general import SysCommand
-from ..output import log
-from ..exceptions import SysCallError
-
-@dataclass
-class BtrfsSubvolume:
- target :str
- source :str
- fstype :str
- name :str
- options :str
- root :bool = False
-
-def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]:
- if '[' in struct['source']:
- subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1]
- struct['source'] = struct['source'].replace(f"[{subvolume}]", "")
- yield BtrfsSubvolume(
- target=struct['target'],
- source=struct['source'],
- fstype=struct['fstype'],
- name=subvolume,
- options=struct['options'],
- root=index == 0
- )
- index += 1
-
- for child in struct.get('children', []):
- for item in get_subvolumes_from_findmnt(child, index=index):
- yield item
- index += 1
+ from ...installer import Installer
+
+from .btrfs_helpers import (
+ subvolume_info_from_path as subvolume_info_from_path,
+ find_parent_subvolume as find_parent_subvolume,
+ setup_subvolumes as setup_subvolumes,
+ mount_subvolume as mount_subvolume
+)
+from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
+from .btrfspartition import BTRFSPartition as BTRFSPartition
+
+from ..helpers import get_mount_info
+from ...exceptions import DiskError, Deprecated
+from ...general import SysCommand
+from ...output import log
+from ...exceptions import SysCallError
def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
try:
@@ -57,42 +38,6 @@ def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
return result
-def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
- """
- This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name.
-
- @installation: archinstall.Installer instance
- @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
- @force: overrides the check for weither or not the subvolume mountpoint is empty or not
-
- This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure.
- Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name]
- """
- log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING)
- installation_mountpoint = installation.target
- if type(installation_mountpoint) == str:
- installation_mountpoint = pathlib.Path(installation_mountpoint)
- # Set up the required physical structure
- if type(subvolume_location) == str:
- subvolume_location = pathlib.Path(subvolume_location)
-
- target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor)
-
- if not target.exists():
- target.mkdir(parents=True)
-
- if glob.glob(str(target / '*')) and force is False:
- raise DiskError(f"Cannot mount subvolume to {target} because it contains data (non-empty folder target)")
-
- log(f"Mounting {target} as a subvolume", level=logging.INFO)
- # Mount the logical volume to the physical structure
- mount_information, mountpoint_device_real_path = get_mount_info(target, traverse=True, return_real_path=True)
- if mountpoint_device_real_path == str(target):
- log(f"Unmounting non-subvolume {mount_information['source']} previously mounted at {target}")
- SysCommand(f"umount {mount_information['source']}")
-
- return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0
-
def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -132,13 +77,18 @@ def _has_option(option :str,options :list) -> bool:
"""
if not options:
return False
+
for item in options:
if option in item:
return True
+
return False
def manage_btrfs_subvolumes(installation :Installer,
partition :Dict[str, str],) -> list:
+
+ raise Deprecated("Use setup_subvolumes() instead.")
+
from copy import deepcopy
""" we do the magic with subvolumes in a centralized place
parameters:
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
new file mode 100644
index 00000000..d577d82b
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py
@@ -0,0 +1,132 @@
+import pathlib
+import logging
+from typing import Optional
+
+from ...exceptions import SysCallError, DiskError
+from ...general import SysCommand
+from ...output import log
+from ..helpers import get_mount_info
+from .btrfssubvolume import BtrfsSubvolume
+
+
+def mount_subvolume(installation, device, name, subvolume_information):
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = name.lstrip('/')
+
+ # renormalize the right hand.
+ mountpoint = subvolume_information.get('mountpoint', None)
+ if not mountpoint:
+ return None
+
+ if type(mountpoint) == str:
+ mountpoint = pathlib.Path(mountpoint)
+
+ installation_target = installation.target
+ if type(installation_target) == str:
+ installation_target = pathlib.Path(installation_target)
+
+ mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
+ mountpoint.mkdir(parents=True, exist_ok=True)
+
+ mount_options = subvolume_information.get('options', [])
+ if not any('subvol=' in x for x in mount_options):
+ mount_options += [f'subvol={name}']
+
+ log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
+ SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
+
+
+def setup_subvolumes(installation, partition_dict):
+ """
+ Taken from: ..user_guides.py
+
+ partition['btrfs'] = {
+ "subvolumes" : {
+ "@": "/",
+ "@home": "/home",
+ "@log": "/var/log",
+ "@pkg": "/var/cache/pacman/pkg",
+ "@.snapshots": "/.snapshots"
+ }
+ }
+ """
+ log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
+ for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = name.lstrip('/')
+
+ # renormalize the right hand.
+ # mountpoint = None
+ subvol_options = []
+
+ match right_hand:
+ # case str(): # backwards-compatability
+ # mountpoint = right_hand
+ case dict():
+ # mountpoint = right_hand.get('mountpoint', None)
+ subvol_options = right_hand.get('options', [])
+
+ # We create the subvolume using the BTRFSPartition instance.
+ # That way we ensure not only easy access, but also accurate mount locations etc.
+ partition_dict['device_instance'].create_subvolume(name, installation=installation)
+
+ # Make the nodatacow processing now
+ # It will be the main cause of creation of subvolumes which are not to be mounted
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ if 'nodatacow' in subvol_options:
+ if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so nodatacow doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('nodatacow')]
+ # Make the compress processing now
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ # in this way only zstd compression is activaded
+ # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
+
+ if 'compress' in subvol_options:
+ if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
+ if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so compress doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('compress')]
+
+def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
+ try:
+ subvolume_name = None
+ result = {}
+ for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
+ if index == 0:
+ subvolume_name = line.strip().decode('UTF-8')
+ continue
+
+ if b':' in line:
+ key, value = line.strip().decode('UTF-8').split(':', 1)
+
+ # A bit of a hack, until I figure out how @dataclass
+ # allows for hooking in a pre-processor to do this we have to do it here:
+ result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
+
+ return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
+
+ except SysCallError as error:
+ log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
+
+ return None
+
+def find_parent_subvolume(path :pathlib.Path, filters=[]):
+ # A root path cannot have a parent
+ if str(path) == '/':
+ return None
+
+ if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters):
+ if not (subvolume := subvolume_info_from_path(found_mount['target'])):
+ if found_mount['target'] == '/':
+ return None
+
+ return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
+
+ return subvolume \ No newline at end of file
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
new file mode 100644
index 00000000..5020133d
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -0,0 +1,116 @@
+import glob
+import pathlib
+import logging
+from typing import Optional, TYPE_CHECKING
+
+from ...exceptions import DiskError
+from ...storage import storage
+from ...output import log
+from ...general import SysCommand
+from ..partition import Partition
+from ..helpers import findmnt
+from .btrfs_helpers import (
+ subvolume_info_from_path
+)
+
+if TYPE_CHECKING:
+ from ...installer import Installer
+ from .btrfssubvolume import BtrfsSubvolume
+
+class BTRFSPartition(Partition):
+ def __init__(self, *args, **kwargs):
+ Partition.__init__(self, *args, **kwargs)
+
+ def __repr__(self, *args :str, **kwargs :str) -> str:
+ mount_repr = ''
+ if self.mountpoint:
+ mount_repr = f", mounted={self.mountpoint}"
+ elif self.target_mountpoint:
+ mount_repr = f", rel_mountpoint={self.target_mountpoint}"
+
+ if self._encrypted:
+ return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
+ else:
+ return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
+
+ @property
+ def subvolumes(self):
+ for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
+ if '[' in filesystem.get('source', ''):
+ yield subvolume_info_from_path(filesystem['target'])
+
+ def iterate_children(struct):
+ for child in struct.get('children', []):
+ if '[' in child.get('source', ''):
+ yield subvolume_info_from_path(child['target'])
+
+ for sub_child in iterate_children(child):
+ yield sub_child
+
+ for child in iterate_children(filesystem):
+ yield child
+
+ def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
+ """
+ Subvolumes have to be created within a mountpoint.
+ This means we need to get the current installation target.
+ After we get it, we need to verify it is a btrfs subvolume filesystem.
+ Finally, the destination must be empty.
+ """
+
+ # Allow users to override the installation session
+ if not installation:
+ installation = storage.get('installation_session')
+
+ # Determain if the path given, is an absolute path or a releative path.
+ # We do this by checking if the path contains a known mountpoint.
+ if str(subvolume)[0] == '/':
+ if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
+ if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
+ # Path starts with a known mountpoint which isn't /
+ # Which means it's an absolut path to a mounted location.
+ pass
+ else:
+ # Since it's not an absolute position with a known start.
+ # We omit the anchor ('/' basically) and make sure it's appendable
+ # to the installation.target later
+ subvolume = subvolume.relative_to(subvolume.anchor)
+ # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is.
+
+ # If the subvolume is not absolute, then we do two checks:
+ # 1. Check if the partition itself is mounted somewhere, and use that as a root
+ # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs
+ # If both above fail, we need to warn the user that such setup is not supported.
+ if str(subvolume)[0] != '/':
+ if self.mountpoint is None and installation is None:
+ raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.")
+ elif self.mountpoint:
+ subvolume = self.mountpoint / subvolume
+ elif installation:
+ ongoing_installation_destination = installation.target
+ if type(ongoing_installation_destination) == str:
+ ongoing_installation_destination = pathlib.Path(ongoing_installation_destination)
+
+ subvolume = ongoing_installation_destination / subvolume
+
+ subvolume.parent.mkdir(parents=True, exist_ok=True)
+
+ # <!--
+ # We perform one more check from the given absolute position.
+ # And we traverse backwards in order to locate any if possible subvolumes above
+ # our new btrfs subvolume. This is because it needs to be mounted under it to properly
+ # function.
+ # if btrfs_parent := find_parent_subvolume(subvolume):
+ # print('Found parent:', btrfs_parent)
+ # -->
+
+ log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey")
+
+ if glob.glob(str(subvolume / '*')):
+ raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
+ elif subvolinfo := subvolume_info_from_path(subvolume):
+ raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+
+ SysCommand(f"btrfs subvolume create {subvolume}")
+
+ return subvolume_info_from_path(subvolume) \ No newline at end of file
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolume.py
new file mode 100644
index 00000000..a96e2a94
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfssubvolume.py
@@ -0,0 +1,191 @@
+import pathlib
+import datetime
+import logging
+import string
+import random
+import shutil
+from dataclasses import dataclass
+from typing import Optional, List# , TYPE_CHECKING
+from functools import cached_property
+
+# if TYPE_CHECKING:
+# from ..blockdevice import BlockDevice
+
+from ...exceptions import DiskError
+from ...general import SysCommand
+from ...output import log
+from ...storage import storage
+
+@dataclass
+class BtrfsSubvolume:
+ full_path :pathlib.Path
+ name :str
+ uuid :str
+ parent_uuid :str
+ creation_time :datetime.datetime
+ subvolume_id :int
+ generation :int
+ gen_at_creation :int
+ parent_id :int
+ top_level_id :int
+ send_transid :int
+ send_time :datetime.datetime
+ receive_transid :int
+ received_uuid :Optional[str] = None
+ flags :Optional[str] = None
+ receive_time :Optional[datetime.datetime] = None
+ snapshots :Optional[List] = None
+
+ def __post_init__(self):
+ self.full_path = pathlib.Path(self.full_path)
+
+ # Convert "-" entries to `None`
+ if self.parent_uuid == "-":
+ self.parent_uuid = None
+ if self.received_uuid == "-":
+ self.received_uuid = None
+ if self.flags == "-":
+ self.flags = None
+ if self.receive_time == "-":
+ self.receive_time = None
+ if self.snapshots == "":
+ self.snapshots = []
+
+ # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats)
+ self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time))
+ self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time))
+ if self.receive_time:
+ self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time))
+
+ @property
+ def parent_subvolume(self):
+ from .btrfs_helpers import find_parent_subvolume
+
+ return find_parent_subvolume(self.full_path)
+
+ @property
+ def root(self) -> bool:
+ from .btrfs_helpers import subvolume_info_from_path
+
+ # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
+ # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
+ # It would also be nice if it could use findmnt(self.full_path) and traverse backwards
+ # finding the last occurance of a subvolume which 'self' belongs to.
+ if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
+ return self.full_path == volume.full_path
+
+ return False
+
+ @cached_property
+ def partition(self):
+ from ..helpers import findmnt, get_parent_of_partition, all_blockdevices
+ from ..partition import Partition
+ from ..blockdevice import BlockDevice
+ from ..mapperdev import MapperDev
+ from .btrfspartition import BTRFSPartition
+ from .btrfs_helpers import subvolume_info_from_path
+
+ try:
+ # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device.
+ if filesystem := findmnt(self.full_path).get('filesystems', []):
+ if source := filesystem[0].get('source', None):
+ # Strip away subvolume definitions from findmnt
+ if '[' in source:
+ source = source[:source.find('[')]
+
+ if filesystem[0].get('fstype', '') == 'btrfs':
+ return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
+ elif filesystem[0].get('source', '').startswith('/dev/mapper'):
+ return MapperDev(source)
+ else:
+ return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
+ except DiskError:
+ # Subvolume has never been mounted, we have no reliable way of finding where it is.
+ # But we have the UUID of the partition, and can begin looking for it by mounting
+ # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices.
+
+ log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING)
+ for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items():
+ if type(instance) in (Partition, MapperDev):
+ we_mounted_it = False
+ detection_mountpoint = instance.mountpoint
+ if not detection_mountpoint:
+ if type(instance) == Partition and instance.encrypted:
+ # TODO: Perhaps support unlocking encrypted volumes?
+ # This will cause a lot of potential user interactions tho.
+ log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG)
+ continue
+
+ detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}")
+ detection_mountpoint.mkdir(parents=True, exist_ok=True)
+
+ instance.mount(str(detection_mountpoint))
+ we_mounted_it = True
+
+ if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])):
+ if subvolume := subvolume_info_from_path(filesystem[0]['target']):
+ if subvolume.uuid == self.uuid:
+ # The top level subvolume matched of ourselves,
+ # which means the instance we're iterating has the subvol we're looking for.
+ log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
+ return instance
+
+ def iterate_children(struct):
+ for child in struct.get('children', []):
+ if '[' in child.get('source', ''):
+ yield subvolume_info_from_path(child['target'])
+
+ for sub_child in iterate_children(child):
+ yield sub_child
+
+ for child in iterate_children(filesystem[0]):
+ if child.uuid == self.uuid:
+ # We found a child within the instance that has the subvol we're looking for.
+ log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
+ return instance
+
+ if we_mounted_it:
+ instance.unmount()
+ shutil.rmtree(detection_mountpoint)
+
+ @cached_property
+ def mount_options(self) -> Optional[List[str]]:
+ from ..helpers import findmnt
+
+ if filesystem := findmnt(self.full_path).get('filesystems', []):
+ return filesystem[0].get('options').split(',')
+
+ def convert_to_ISO_format(self, time_string):
+ time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '')
+ iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}"
+ return iso_string
+
+ def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True):
+ from ..helpers import findmnt
+
+ try:
+ if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False):
+ log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}")
+ SysCommand(f"umount {mountpoint}")
+ except DiskError:
+ # No previously mounted device at the mountpoint
+ pass
+
+ if not options:
+ options = []
+
+ try:
+ if include_previously_known_options and (cached_options := self.mount_options):
+ options += cached_options
+ except DiskError:
+ pass
+
+ if not any('subvol=' in x for x in options):
+ options += f'subvol={self.name}'
+
+ SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}")
+ log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray")
+
+ def unmount(self, recurse :bool = True):
+ SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
+ log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 8a531de0..f94b4b47 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -55,7 +55,7 @@ class Filesystem:
output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8'))
for device in output['blockdevices']:
- for index, partition in enumerate(device['children']):
+ for index, partition in enumerate(device.get('children', [])):
# But we'll use blkid to reliably grab the PARTUUID for that child device (partition)
partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip()
if partition_uuid.lower() == uuid.lower():
@@ -65,6 +65,7 @@ class Filesystem:
def load_layout(self, layout :Dict[str, Any]) -> None:
from ..luks import luks2
+ from .btrfs import BTRFSPartition
# If the layout tells us to wipe the drive, we do so
if layout.get('wipe', False):
@@ -76,27 +77,34 @@ class Filesystem:
raise KeyError(f"Could not create a MSDOS label on {self}")
self.blockdevice.flush_cache()
+ time.sleep(3)
prev_partition = None
# We then iterate the partitions in order
for partition in layout.get('partitions', []):
# We don't want to re-add an existing partition (those containing a UUID already)
if partition.get('wipe', False) and not partition.get('PARTUUID', None):
- print(_("Adding partition...."))
start = partition.get('start') or (
prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START)
partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
start=start,
end=partition.get('size', '100%'),
- partition_format=partition.get('filesystem', {}).get('format', 'btrfs'))
- # TODO: device_instance some times become None
- # print('Device instance:', partition['device_instance'])
-
- elif (partition_uuid := partition.get('PARTUUID')) and (partition_instance := self.blockdevice.get_partition(uuid=partition_uuid)):
- print(_("Re-using partition instance: {}").format(partition_instance))
- partition['device_instance'] = partition_instance
+ partition_format=partition.get('filesystem', {}).get('format', 'btrfs'),
+ skip_mklabel=layout.get('wipe', False) is not False)
+
+ elif (partition_uuid := partition.get('PARTUUID')):
+ # We try to deal with both UUID and PARTUUID of a partition when it's being re-used.
+ # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID')
+ # but for now, lets just attempt to deal with both.
+ try:
+ partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
+ except DiskError:
+ partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)
+
+ log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
else:
- raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition.get('PARTUUID'))}).")
+ log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
+ continue
if partition.get('filesystem', {}).get('format', False):
@@ -137,20 +145,32 @@ class Filesystem:
while True:
partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
- print(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
+ log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
continue
break
unlocked_device.format(partition['filesystem']['format'], options=format_options)
+
elif partition.get('wipe', False):
if not partition['device_instance']:
raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
partition['device_instance'].format(partition['filesystem']['format'], options=format_options)
+ if partition['filesystem']['format'] == 'btrfs':
+ # We upgrade the device instance to a BTRFSPartition if we format it as such.
+ # This is so that we can gain access to more features than otherwise available in Partition()
+ partition['device_instance'] = BTRFSPartition(
+ partition['device_instance'].path,
+ block_device=partition['device_instance'].block_device,
+ encrypted=False,
+ filesystem='btrfs',
+ autodetect_filesystem=False
+ )
+
if partition.get('boot', False):
log(f"Marking partition {partition['device_instance']} as bootable.")
- self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
+ self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on')
prev_partition = partition
@@ -190,10 +210,27 @@ class Filesystem:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> Partition:
+ def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
- previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
+ if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
+ # If it's a completely empty drive, and we're about to add partitions to it
+ # we need to make sure there's a filesystem label.
+ if self.mode == GPT:
+ if not self.parted_mklabel(self.blockdevice.device, "gpt"):
+ raise KeyError(f"Could not create a GPT label on {self}")
+ elif self.mode == MBR:
+ if not self.parted_mklabel(self.blockdevice.device, "msdos"):
+ raise KeyError(f"Could not create a MSDOS label on {self}")
+
+ self.blockdevice.flush_cache()
+
+ previous_partuuids = []
+ for partition in self.blockdevice.partitions.values():
+ try:
+ previous_partuuids.append(partition.part_uuid)
+ except DiskError:
+ pass
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
@@ -207,36 +244,36 @@ class Filesystem:
log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG)
if self.parted(parted_string):
- count = 0
- while count < 10:
- new_uuid = None
- new_uuid_set = (previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()})
+ for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
+ self.partprobe()
- if len(new_uuid_set) > 0:
- new_uuid = new_uuid_set.pop()
+ new_partition_uuids = []
+ for partition in self.blockdevice.partitions.values():
+ try:
+ new_partition_uuids.append(partition.part_uuid)
+ except DiskError:
+ pass
+
+ new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
- if new_uuid:
+ if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
try:
- return self.blockdevice.get_partition(new_uuid)
+ return self.blockdevice.get_partition(partuuid=new_partuuid)
except Exception as err:
log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
- log(f'Partition set: {new_uuid_set}', level=logging.ERROR, fg="red")
- log(f'New UUID: {[new_uuid]}', level=logging.ERROR, fg="red")
+ log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
+ log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
raise err
else:
- count += 1
- log(f"Could not get UUID for partition. Waiting before retry attempt {count} of 10 ...",level=logging.DEBUG)
- time.sleep(float(storage['arguments'].get('disk-sleep', 0.2)))
- else:
- log("Add partition is exiting due to excessive wait time", level=logging.ERROR, fg="red")
- raise DiskError(f"New partition never showed up after adding new partition on {self}.")
+ log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
+ time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
# TODO: This should never be able to happen
log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
- log(f"Previous partitions: {previous_partition_uuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {(previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
+ log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
+ log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
raise DiskError(f"Could not add partition using: {parted_string}")
def set_name(self, partition: int, name: str) -> bool:
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 0799cd49..99856aad 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -291,11 +291,37 @@ def find_mountpoint(device_path :str) -> Dict[str, Any]:
except SysCallError:
return {}
-def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
+def findmnt(path :pathlib.Path, traverse :bool = False, ignore :List = [], recurse :bool = True) -> Dict[str, Any]:
+ for traversal in list(map(str, [str(path)] + list(path.parents))):
+ if traversal in ignore:
+ continue
+
+ try:
+ log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
+ if (output := SysCommand(f"/usr/bin/findmnt --json {'--submounts' if recurse else ''} {traversal}").decode('UTF-8')):
+ return json.loads(output)
+
+ except SysCallError as error:
+ log(f"Could not get mount information on {path} but continuing and ignoring: {error}", level=logging.INFO, fg="gray")
+ pass
+
+ if not traverse:
+ break
+
+ raise DiskError(f"Could not get mount information for path {path}")
+
+
+def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False, ignore :List = []) -> Dict[str, Any]:
+ import traceback
+
+ log(f"Deprecated: archinstall.get_mount_info(). Use archinstall.findmnt() instead, which does not do any automatic parsing. Please change at:\n{''.join(traceback.format_stack())}")
device_path, bind_path = split_bind_name(path)
output = {}
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
+ if traversal in ignore:
+ continue
+
try:
log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
@@ -385,9 +411,8 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]:
def get_filesystem_type(path :str) -> Optional[str]:
- device_name, bind_name = split_bind_name(path)
try:
- return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip()
+ return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip()
except SysCallError:
return None
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
index 32e3ac9b..913dbc13 100644
--- a/archinstall/lib/disk/mapperdev.py
+++ b/archinstall/lib/disk/mapperdev.py
@@ -51,11 +51,11 @@ class MapperDev:
raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device")
@property
- def mountpoint(self) -> Optional[str]:
+ def mountpoint(self) -> Optional[pathlib.Path]:
try:
data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
for filesystem in data['filesystems']:
- return filesystem.get('target')
+ return pathlib.Path(filesystem.get('target'))
except SysCallError as error:
# Not mounted anywhere most likely
@@ -76,8 +76,9 @@ class MapperDev:
@property
def subvolumes(self) -> Iterator['BtrfsSubvolume']:
- from .btrfs import get_subvolumes_from_findmnt
-
+ from .btrfs import subvolume_info_from_path
+
for mountpoint in self.mount_information:
- for result in get_subvolumes_from_findmnt(mountpoint):
- yield result \ No newline at end of file
+ if target := mountpoint.get('target'):
+ if subvolume := subvolume_info_from_path(pathlib.Path(target)):
+ yield subvolume \ No newline at end of file
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index e7568258..73c88597 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -13,7 +13,8 @@ from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
-from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume
+from .btrfs.btrfs_helpers import subvolume_info_from_path
+from .btrfs.btrfssubvolume import BtrfsSubvolume
class Partition:
def __init__(self,
@@ -96,11 +97,11 @@ class Partition:
try:
data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
for filesystem in data['filesystems']:
- return filesystem.get('target')
+ return pathlib.Path(filesystem.get('target'))
except SysCallError as error:
# Not mounted anywhere most likely
- log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG)
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey")
pass
return None
@@ -184,7 +185,7 @@ class Partition:
return device['pttype']
@property
- def uuid(self) -> Optional[str]:
+ def part_uuid(self) -> Optional[str]:
"""
Returns the PARTUUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
@@ -197,6 +198,26 @@ class Partition:
time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
+ partuuid = self._safe_part_uuid
+ if partuuid:
+ return partuuid
+
+ raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
+
+ @property
+ def uuid(self) -> Optional[str]:
+ """
+ Returns the UUID as returned by lsblk for the **partition**.
+ This is more reliable than relying on /dev/disk/by-uuid as
+ it doesn't seam to be able to detect md raid partitions.
+ For bind mounts all the subvolumes share the same uuid
+ """
+ for i in range(storage['DISK_RETRY_ATTEMPTS']):
+ if not self.partprobe():
+ raise DiskError(f"Could not perform partprobe on {self.device_path}")
+
+ time.sleep(storage.get('DISK_TIMEOUTS', 1) * i)
+
partuuid = self._safe_uuid
if partuuid:
return partuuid
@@ -217,6 +238,28 @@ class Partition:
log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
try:
+ return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
+ except SysCallError as error:
+ if self.block_device.info.get('TYPE') == 'iso9660':
+ # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
+ return None
+
+ log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}")
+
+ @property
+ def _safe_part_uuid(self) -> Optional[str]:
+ """
+ A near copy of self.uuid but without any delays.
+ This function should only be used where uuid is not crucial.
+ For instance when you want to get a __repr__ of the class.
+ """
+ if not self.partprobe():
+ if self.block_device.info.get('TYPE') == 'iso9660':
+ return None
+
+ log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
+
+ try:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
except SysCallError as error:
if self.block_device.info.get('TYPE') == 'iso9660':
@@ -262,9 +305,26 @@ class Partition:
@property
def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ from .helpers import findmnt
+
+ def iterate_children_recursively(information):
+ for child in information.get('children', []):
+ if target := child.get('target'):
+ if subvolume := subvolume_info_from_path(pathlib.Path(target)):
+ yield subvolume
+
+ if child.get('children'):
+ for subchild in iterate_children_recursively(child):
+ yield subchild
+
for mountpoint in self.mount_information:
- for result in get_subvolumes_from_findmnt(mountpoint):
- yield result
+ if result := findmnt(pathlib.Path(mountpoint['target'])):
+ for filesystem in result.get('filesystems', []):
+ if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])):
+ yield subvolume
+
+ for child in iterate_children_recursively(filesystem):
+ yield child
def partprobe(self) -> bool:
try:
@@ -315,7 +375,7 @@ class Partition:
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool:
+ def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@@ -337,63 +397,71 @@ class Partition:
if log_formatting:
log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
- if filesystem == 'btrfs':
- options = ['-f'] + options
+ try:
+ if filesystem == 'btrfs':
+ options = ['-f'] + options
- if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
- raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
- self.filesystem = filesystem
+ if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
+ raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
+ self.filesystem = filesystem
- elif filesystem == 'vfat':
- options = ['-F32'] + options
+ elif filesystem == 'vfat':
+ options = ['-F32'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'ext4':
- options = ['-F'] + options
+ elif filesystem == 'ext4':
+ options = ['-F'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'ext2':
- options = ['-F'] + options
+ elif filesystem == 'ext2':
+ options = ['-F'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'ext2'
+ if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
+ self.filesystem = 'ext2'
- elif filesystem == 'xfs':
- options = ['-f'] + options
+ elif filesystem == 'xfs':
+ options = ['-f'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'f2fs':
- options = ['-f'] + options
+ elif filesystem == 'f2fs':
+ options = ['-f'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'ntfs3':
- options = ['-f'] + options
+ elif filesystem == 'ntfs3':
+ options = ['-f'] + options
- if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self.filesystem = filesystem
- elif filesystem == 'crypto_LUKS':
- # from ..luks import luks2
- # encrypted_partition = luks2(self, None, None)
- # encrypted_partition.format(path)
- self.filesystem = filesystem
+ elif filesystem == 'crypto_LUKS':
+ # from ..luks import luks2
+ # encrypted_partition = luks2(self, None, None)
+ # encrypted_partition.format(path)
+ self.filesystem = filesystem
- else:
- raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
+ else:
+ raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
+ except SysCallError as error:
+ log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange")
+ if retry is True:
+ log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
+ time.sleep(storage.get('DISK_TIMEOUTS', 1))
+
+ return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
self.encrypted = True
@@ -413,6 +481,7 @@ class Partition:
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
+
if not fs:
if not self.filesystem:
raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 63ec1d9b..5fa6bfdc 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -5,6 +5,7 @@ from typing import Optional, Dict, Any, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .blockdevice import BlockDevice
+ _: Any
from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
from ..hardware import has_uefi
@@ -26,13 +27,13 @@ def suggest_single_disk_layout(block_device :BlockDevice,
compression = False
if default_filesystem == 'btrfs':
- prompt = 'Would you like to use BTRFS subvolumes with a default structure?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- using_subvolumes = choice == 'yes'
+ prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ using_subvolumes = choice.value == Menu.yes()
- prompt = 'Would you like to use BTRFS compression?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- compression = choice == 'yes'
+ prompt = str(_('Would you like to use BTRFS compression?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ compression = choice.value == Menu.yes()
layout = {
block_device.path : {
@@ -87,9 +88,9 @@ def suggest_single_disk_layout(block_device :BlockDevice,
layout[block_device.path]['partitions'][-1]['start'] = '513MiB'
if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART:
- prompt = 'Would you like to create a separate partition for /home?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- using_home_partition = choice == 'yes'
+ prompt = str(_('Would you like to create a separate partition for /home?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ using_home_partition = choice.value == Menu.yes()
# Set a size for / (/root)
if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition:
@@ -138,9 +139,7 @@ def suggest_single_disk_layout(block_device :BlockDevice,
return layout
-def suggest_multi_disk_layout(block_devices :List[BlockDevice],
- default_filesystem :Optional[str] = None,
- advanced_options :bool = False) -> Dict[str, Any]:
+def suggest_multi_disk_layout(block_devices :List[BlockDevice], default_filesystem :Optional[str] = None, advanced_options :bool = False):
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
@@ -158,6 +157,13 @@ def suggest_multi_disk_layout(block_devices :List[BlockDevice],
home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART)
root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device])
+ if home_device is None or root_device is None:
+ text = _('The selected drives do not have the minimum capacity required for an automatic suggestion\n')
+ text += _('Minimum capacity for /home partition: {}GB\n').format(MIN_SIZE_TO_ALLOW_HOME_PART)
+ text += _('Minimum capacity for Arch Linux partition: {}GB').format(ARCH_LINUX_INSTALLED_SIZE)
+ Menu(str(text), [str(_('Continue'))], skip=False).run()
+ return None
+
compression = False
if default_filesystem == 'btrfs':
@@ -165,9 +171,9 @@ def suggest_multi_disk_layout(block_devices :List[BlockDevice],
# choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
# using_subvolumes = choice == 'yes'
- prompt = 'Would you like to use BTRFS compression?'
- choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- compression = choice == 'yes'
+ prompt = str(_('Would you like to use BTRFS compression?'))
+ choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
+ compression = choice.value == Menu.yes()
log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG)