Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/btrfs
diff options
context:
space:
mode:
authorAndreas Baumann <mail@andreasbaumann.cc>2022-09-30 17:25:43 +0200
committerAndreas Baumann <mail@andreasbaumann.cc>2022-09-30 17:25:43 +0200
commit6408c9dce00aa70ad3c6614d2d793dba9a99aff6 (patch)
treebff1889dfebde0c74e30e9de427a2c122513684b /archinstall/lib/disk/btrfs
parentfaf925de1882be722d2994d697a802918282e509 (diff)
parent53a2797af6ac0832bf7dd00dfe96b8ea1867db2e (diff)
merged from upstream for ISO 2022-10
Diffstat (limited to 'archinstall/lib/disk/btrfs')
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py132
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py126
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py37
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py (renamed from archinstall/lib/disk/btrfs/btrfssubvolume.py)9
4 files changed, 88 insertions, 216 deletions
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py
index 84b9c0f6..a26e0160 100644
--- a/archinstall/lib/disk/btrfs/__init__.py
+++ b/archinstall/lib/disk/btrfs/__init__.py
@@ -2,8 +2,7 @@ from __future__ import annotations
import pathlib
import glob
import logging
-import re
-from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+from typing import Union, Dict, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -15,30 +14,15 @@ from .btrfs_helpers import (
setup_subvolumes as setup_subvolumes,
mount_subvolume as mount_subvolume
)
-from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume
from .btrfspartition import BTRFSPartition as BTRFSPartition
-from ..helpers import get_mount_info
from ...exceptions import DiskError, Deprecated
from ...general import SysCommand
from ...output import log
-from ...exceptions import SysCallError
-def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
- try:
- output = SysCommand(f"btrfs subvol show {path}").decode()
- except SysCallError as error:
- print('Error:', error)
- result = {}
- for line in output.replace('\r\n', '\n').split('\n'):
- if ':' in line:
- key, val = line.replace('\t', '').split(':', 1)
- result[key.strip().lower().replace(' ', '_')] = val.strip()
-
- return result
-
-def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
+def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -70,113 +54,3 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.
log(f"Creating a subvolume on {target}", level=logging.INFO)
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
-
-def _has_option(option :str,options :list) -> bool:
- """ auxiliary routine to check if an option is present in a list.
- we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...)
- """
- if not options:
- return False
-
- for item in options:
- if option in item:
- return True
-
- return False
-
-def manage_btrfs_subvolumes(installation :Installer,
- partition :Dict[str, str],) -> list:
-
- raise Deprecated("Use setup_subvolumes() instead.")
-
- from copy import deepcopy
- """ we do the magic with subvolumes in a centralized place
- parameters:
- * the installation object
- * the partition dictionary entry which represents the physical partition
- returns
- * mountpoinst, the list which contains all the "new" partititon to be mounted
-
- We expect the partition has been mounted as / , and it to be unmounted after the processing
- Then we create all the subvolumes inside btrfs as demand
- We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
- Then we return a list of "new" partitions to be processed as "normal" partitions
- # TODO For encrypted devices we need some special processing prior to it
- """
- # We process each of the pairs <subvolume name: mount point | None | mount info dict>
- # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
- # of mount options (or similar used by brtfs)
- mountpoints = []
- subvolumes = partition['btrfs']['subvolumes']
- for name, right_hand in subvolumes.items():
- try:
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
- if name.startswith('/'):
- name = name[1:]
- # renormalize the right hand.
- location = None
- subvol_options = []
- # no contents, so it is not to be mounted
- if not right_hand:
- location = None
- # just a string. per backward compatibility the mount point
- elif isinstance(right_hand,str):
- location = right_hand
- # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
- elif isinstance(right_hand,dict):
- location = right_hand.get('mountpoint',None)
- subvol_options = right_hand.get('options',[])
- # we create the subvolume
- create_subvolume(installation,name)
- # Make the nodatacow processing now
- # It will be the main cause of creation of subvolumes which are not to be mounted
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
- if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
- # Make the compress processing now
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- # in this way only zstd compression is activaded
- # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
- if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])):
- if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
- # END compress processing.
- # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
- if not partition['mountpoint'] and location is not None:
- # we begin to create a fake partition entry. First we copy the original -the one that corresponds to
- # the primary partition. We make a deepcopy to avoid altering the original content in any case
- fake_partition = deepcopy(partition)
- # we start to modify entries in the "fake partition" to match the needs of the subvolumes
- # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
- del fake_partition['btrfs']
- fake_partition['encrypted'] = False
- fake_partition['generate-encryption-key-file'] = False
- # Mount destination. As of now the right hand part
- fake_partition['mountpoint'] = location
- # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
- fake_partition['subvolume'] = name
- # here we add the special mount options for the subvolume, if any.
- # if the original partition['options'] is not a list might give trouble
- if fake_partition.get('filesystem',{}).get('mount_options',[]):
- fake_partition['filesystem']['mount_options'].extend(subvol_options)
- else:
- fake_partition['filesystem']['mount_options'] = subvol_options
- # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
- # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
- # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
- fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
-
- # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
- # as "normal" ones
- mountpoints.append(fake_partition)
- except Exception as e:
- raise e
- return mountpoints
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
index d577d82b..f6d2734a 100644
--- a/archinstall/lib/disk/btrfs/btrfs_helpers.py
+++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py
@@ -1,72 +1,73 @@
-import pathlib
import logging
-from typing import Optional
+import re
+from pathlib import Path
+from typing import Optional, Dict, Any, TYPE_CHECKING
+from ...models.subvolume import Subvolume
from ...exceptions import SysCallError, DiskError
from ...general import SysCommand
from ...output import log
+from ...plugins import plugins
from ..helpers import get_mount_info
-from .btrfssubvolume import BtrfsSubvolume
+from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+if TYPE_CHECKING:
+ from .btrfspartition import BTRFSPartition
+ from ...installer import Installer
-def mount_subvolume(installation, device, name, subvolume_information):
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
- # renormalize the right hand.
- mountpoint = subvolume_information.get('mountpoint', None)
- if not mountpoint:
- return None
+class fstab_btrfs_compression_plugin():
+ def __init__(self, partition_dict):
+ self.partition_dict = partition_dict
+
+ def on_genfstab(self, installation):
+ with open(f"{installation.target}/etc/fstab", 'r') as fh:
+ fstab = fh.read()
+
+ # Replace the {installation}/etc/fstab with entries
+ # using the compress=zstd where the mountpoint has compression set.
+ with open(f"{installation.target}/etc/fstab", 'w') as fh:
+ for line in fstab.split('\n'):
+ # So first we grab the mount options by using subvol=.*? as a locator.
+ # And we also grab the mountpoint for the entry, for instance /var/log
+ if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)):
+ for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []):
+ # We then locate the correct subvolume and check if it's compressed
+ if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip():
+ # We then sneak in the compress=zstd option if it doesn't already exist:
+ # We skip entries where compression is already defined
+ if ',compress=zstd,' not in line:
+ line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}")
+ break
+
+ fh.write(f"{line}\n")
- if type(mountpoint) == str:
- mountpoint = pathlib.Path(mountpoint)
+ return True
- installation_target = installation.target
- if type(installation_target) == str:
- installation_target = pathlib.Path(installation_target)
+
+def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
+ # we normalize the subvolume name (getting rid of slash at the start if exists.
+ # In our implementation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = subvolume.name.lstrip('/')
+ mountpoint = Path(subvolume.mountpoint)
+ installation_target = Path(installation.target)
mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
mountpoint.mkdir(parents=True, exist_ok=True)
-
- mount_options = subvolume_information.get('options', [])
- if not any('subvol=' in x for x in mount_options):
- mount_options += [f'subvol={name}']
+ mount_options = subvolume.options + [f'subvol={name}']
log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
-def setup_subvolumes(installation, partition_dict):
- """
- Taken from: ..user_guides.py
-
- partition['btrfs'] = {
- "subvolumes" : {
- "@": "/",
- "@home": "/home",
- "@log": "/var/log",
- "@pkg": "/var/cache/pacman/pkg",
- "@.snapshots": "/.snapshots"
- }
- }
- """
+def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
- for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = name.lstrip('/')
- # renormalize the right hand.
- # mountpoint = None
- subvol_options = []
-
- match right_hand:
- # case str(): # backwards-compatability
- # mountpoint = right_hand
- case dict():
- # mountpoint = right_hand.get('mountpoint', None)
- subvol_options = right_hand.get('options', [])
+ for subvolume in partition_dict['btrfs']['subvolumes']:
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = subvolume.name.lstrip('/')
# We create the subvolume using the BTRFSPartition instance.
# That way we ensure not only easy access, but also accurate mount locations etc.
@@ -76,27 +77,28 @@ def setup_subvolumes(installation, partition_dict):
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if 'nodatacow' in subvol_options:
+ if subvolume.nodatacow:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so nodatacow doesn't propagate to the mount options
- del subvol_options[subvol_options.index('nodatacow')]
+
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
- if 'compress' in subvol_options:
+ if subvolume.compress:
if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
- # entry is deleted so compress doesn't propagate to the mount options
- del subvol_options[subvol_options.index('compress')]
-def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
+ if 'fstab_btrfs_compression_plugin' not in plugins:
+ plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict)
+
+
+def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
try:
- subvolume_name = None
+ subvolume_name = ''
result = {}
for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
if index == 0:
@@ -110,14 +112,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
# allows for hooking in a pre-processor to do this we have to do it here:
result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
- return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
-
+ return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
except SysCallError as error:
log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
return None
-def find_parent_subvolume(path :pathlib.Path, filters=[]):
+
+def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
# A root path cannot have a parent
if str(path) == '/':
return None
@@ -127,6 +129,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]):
if found_mount['target'] == '/':
return None
- return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
+ return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
- return subvolume \ No newline at end of file
+ return subvolume
+
+ return None
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
index 5020133d..d04c9b98 100644
--- a/archinstall/lib/disk/btrfs/btrfspartition.py
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -15,24 +15,13 @@ from .btrfs_helpers import (
if TYPE_CHECKING:
from ...installer import Installer
- from .btrfssubvolume import BtrfsSubvolume
+ from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+
class BTRFSPartition(Partition):
def __init__(self, *args, **kwargs):
Partition.__init__(self, *args, **kwargs)
- def __repr__(self, *args :str, **kwargs :str) -> str:
- mount_repr = ''
- if self.mountpoint:
- mount_repr = f", mounted={self.mountpoint}"
- elif self.target_mountpoint:
- mount_repr = f", rel_mountpoint={self.target_mountpoint}"
-
- if self._encrypted:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
- else:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
-
@property
def subvolumes(self):
for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
@@ -40,17 +29,17 @@ class BTRFSPartition(Partition):
yield subvolume_info_from_path(filesystem['target'])
def iterate_children(struct):
- for child in struct.get('children', []):
+ for c in struct.get('children', []):
if '[' in child.get('source', ''):
- yield subvolume_info_from_path(child['target'])
+ yield subvolume_info_from_path(c['target'])
- for sub_child in iterate_children(child):
+ for sub_child in iterate_children(c):
yield sub_child
for child in iterate_children(filesystem):
yield child
- def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
+ def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo':
"""
Subvolumes have to be created within a mountpoint.
This means we need to get the current installation target.
@@ -62,13 +51,13 @@ class BTRFSPartition(Partition):
if not installation:
installation = storage.get('installation_session')
- # Determain if the path given, is an absolute path or a releative path.
+ # Determain if the path given, is an absolute path or a relative path.
# We do this by checking if the path contains a known mountpoint.
if str(subvolume)[0] == '/':
if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
# Path starts with a known mountpoint which isn't /
- # Which means it's an absolut path to a mounted location.
+ # Which means it's an absolute path to a mounted location.
pass
else:
# Since it's not an absolute position with a known start.
@@ -108,9 +97,13 @@ class BTRFSPartition(Partition):
if glob.glob(str(subvolume / '*')):
raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
- elif subvolinfo := subvolume_info_from_path(subvolume):
- raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+ # Ideally we would like to check if the destination is already a subvolume.
+ # But then we would need the mount-point at this stage as well.
+ # So we'll comment out this check:
+ # elif subvolinfo := subvolume_info_from_path(subvolume):
+ # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+ # And deal with it here:
SysCommand(f"btrfs subvolume create {subvolume}")
- return subvolume_info_from_path(subvolume) \ No newline at end of file
+ return subvolume_info_from_path(subvolume)
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
index a96e2a94..5f5bdea6 100644
--- a/archinstall/lib/disk/btrfs/btrfssubvolume.py
+++ b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
@@ -16,8 +16,9 @@ from ...general import SysCommand
from ...output import log
from ...storage import storage
+
@dataclass
-class BtrfsSubvolume:
+class BtrfsSubvolumeInfo:
full_path :pathlib.Path
name :str
uuid :str
@@ -68,9 +69,9 @@ class BtrfsSubvolume:
from .btrfs_helpers import subvolume_info_from_path
# TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
- # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
+ # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
# It would also be nice if it could use findmnt(self.full_path) and traverse backwards
- # finding the last occurance of a subvolume which 'self' belongs to.
+ # finding the last occurrence of a subvolume which 'self' belongs to.
if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
return self.full_path == volume.full_path
@@ -188,4 +189,4 @@ class BtrfsSubvolume:
def unmount(self, recurse :bool = True):
SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
- log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file
+ log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")