index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/btrfs/__init__.py | 56 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs/btrfs_helpers.py | 136 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs/btrfspartition.py | 109 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py | 192 |
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py deleted file mode 100644 index a26e0160..00000000 --- a/archinstall/lib/disk/btrfs/__init__.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import annotations -import pathlib -import glob -import logging -from typing import Union, Dict, TYPE_CHECKING - -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from ...installer import Installer - -from .btrfs_helpers import ( - subvolume_info_from_path as subvolume_info_from_path, - find_parent_subvolume as find_parent_subvolume, - setup_subvolumes as setup_subvolumes, - mount_subvolume as mount_subvolume -) -from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume -from .btrfspartition import BTRFSPartition as BTRFSPartition - -from ...exceptions import DiskError, Deprecated -from ...general import SysCommand -from ...output import log - - -def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: - """ - This function uses btrfs to create a subvolume. - - @installation: archinstall.Installer instance - @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot - """ - - installation_mountpoint = installation.target - if type(installation_mountpoint) == str: - installation_mountpoint = pathlib.Path(installation_mountpoint) - # Set up the required physical structure - if type(subvolume_location) == str: - subvolume_location = pathlib.Path(subvolume_location) - - target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor) - - # Difference from mount_subvolume: - # We only check if the parent exists, since we'll run in to "target path already exists" otherwise - if not target.parent.exists(): - target.parent.mkdir(parents=True) - - if glob.glob(str(target / '*')): - raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)") - - # Remove the target if it exists - if target.exists(): - target.rmdir() - - log(f"Creating a subvolume on {target}", level=logging.INFO) - if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: - raise DiskError(f"Could not create a subvolume at {target}: {cmd}") diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py deleted file mode 100644 index f6d2734a..00000000 --- a/archinstall/lib/disk/btrfs/btrfs_helpers.py +++ /dev/null @@ -1,136 +0,0 @@ -import logging -import re -from pathlib import Path -from typing import Optional, Dict, Any, TYPE_CHECKING - -from ...models.subvolume import Subvolume -from ...exceptions import SysCallError, DiskError -from ...general import SysCommand -from ...output import log -from ...plugins import plugins -from ..helpers import get_mount_info -from .btrfssubvolumeinfo import BtrfsSubvolumeInfo - -if TYPE_CHECKING: - from .btrfspartition import BTRFSPartition - from ...installer import Installer - - -class fstab_btrfs_compression_plugin(): - def __init__(self, partition_dict): - self.partition_dict = partition_dict - - def on_genfstab(self, installation): - with open(f"{installation.target}/etc/fstab", 'r') as fh: - fstab = fh.read() - - # Replace the {installation}/etc/fstab with entries - # using the compress=zstd where the mountpoint has compression set. - with open(f"{installation.target}/etc/fstab", 'w') as fh: - for line in fstab.split('\n'): - # So first we grab the mount options by using subvol=.*? as a locator. - # And we also grab the mountpoint for the entry, for instance /var/log - if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)): - for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []): - # We then locate the correct subvolume and check if it's compressed - if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip(): - # We then sneak in the compress=zstd option if it doesn't already exist: - # We skip entries where compression is already defined - if ',compress=zstd,' not in line: - line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}") - break - - fh.write(f"{line}\n") - - return True - - -def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume): - # we normalize the subvolume name (getting rid of slash at the start if exists. - # In our implementation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = subvolume.name.lstrip('/') - mountpoint = Path(subvolume.mountpoint) - installation_target = Path(installation.target) - - mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) - mountpoint.mkdir(parents=True, exist_ok=True) - mount_options = subvolume.options + [f'subvol={name}'] - - log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") - SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") - - -def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]): - log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") - - for subvolume in partition_dict['btrfs']['subvolumes']: - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = subvolume.name.lstrip('/') - - # We create the subvolume using the BTRFSPartition instance. - # That way we ensure not only easy access, but also accurate mount locations etc. - partition_dict['device_instance'].create_subvolume(name, installation=installation) - - # Make the nodatacow processing now - # It will be the main cause of creation of subvolumes which are not to be mounted - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if subvolume.nodatacow: - if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - - # Make the compress processing now - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - # in this way only zstd compression is activaded - # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - - if subvolume.compress: - if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - - if 'fstab_btrfs_compression_plugin' not in plugins: - plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict) - - -def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]: - try: - subvolume_name = '' - result = {} - for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): - if index == 0: - subvolume_name = line.strip().decode('UTF-8') - continue - - if b':' in line: - key, value = line.strip().decode('UTF-8').split(':', 1) - - # A bit of a hack, until I figure out how @dataclass - # allows for hooking in a pre-processor to do this we have to do it here: - result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() - - return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore - except SysCallError as error: - log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange") - - return None - - -def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]: - # A root path cannot have a parent - if str(path) == '/': - return None - - if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters): - if not (subvolume := subvolume_info_from_path(found_mount['target'])): - if found_mount['target'] == '/': - return None - - return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']]) - - return subvolume - - return None diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py deleted file mode 100644 index d04c9b98..00000000 --- a/archinstall/lib/disk/btrfs/btrfspartition.py +++ /dev/null @@ -1,109 +0,0 @@ -import glob -import pathlib -import logging -from typing import Optional, TYPE_CHECKING - -from ...exceptions import DiskError -from ...storage import storage -from ...output import log -from ...general import SysCommand -from ..partition import Partition -from ..helpers import findmnt -from .btrfs_helpers import ( - subvolume_info_from_path -) - -if TYPE_CHECKING: - from ...installer import Installer - from .btrfssubvolumeinfo import BtrfsSubvolumeInfo - - -class BTRFSPartition(Partition): - def __init__(self, *args, **kwargs): - Partition.__init__(self, *args, **kwargs) - - @property - def subvolumes(self): - for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): - if '[' in filesystem.get('source', ''): - yield subvolume_info_from_path(filesystem['target']) - - def iterate_children(struct): - for c in struct.get('children', []): - if '[' in child.get('source', ''): - yield subvolume_info_from_path(c['target']) - - for sub_child in iterate_children(c): - yield sub_child - - for child in iterate_children(filesystem): - yield child - - def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo': - """ - Subvolumes have to be created within a mountpoint. - This means we need to get the current installation target. - After we get it, we need to verify it is a btrfs subvolume filesystem. - Finally, the destination must be empty. - """ - - # Allow users to override the installation session - if not installation: - installation = storage.get('installation_session') - - # Determain if the path given, is an absolute path or a relative path. - # We do this by checking if the path contains a known mountpoint. - if str(subvolume)[0] == '/': - if filesystems := findmnt(subvolume, traverse=True).get('filesystems'): - if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target): - # Path starts with a known mountpoint which isn't / - # Which means it's an absolute path to a mounted location. - pass - else: - # Since it's not an absolute position with a known start. - # We omit the anchor ('/' basically) and make sure it's appendable - # to the installation.target later - subvolume = subvolume.relative_to(subvolume.anchor) - # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is. - - # If the subvolume is not absolute, then we do two checks: - # 1. Check if the partition itself is mounted somewhere, and use that as a root - # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs - # If both above fail, we need to warn the user that such setup is not supported. - if str(subvolume)[0] != '/': - if self.mountpoint is None and installation is None: - raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.") - elif self.mountpoint: - subvolume = self.mountpoint / subvolume - elif installation: - ongoing_installation_destination = installation.target - if type(ongoing_installation_destination) == str: - ongoing_installation_destination = pathlib.Path(ongoing_installation_destination) - - subvolume = ongoing_installation_destination / subvolume - - subvolume.parent.mkdir(parents=True, exist_ok=True) - - # <!-- - # We perform one more check from the given absolute position. - # And we traverse backwards in order to locate any if possible subvolumes above - # our new btrfs subvolume. This is because it needs to be mounted under it to properly - # function. - # if btrfs_parent := find_parent_subvolume(subvolume): - # print('Found parent:', btrfs_parent) - # --> - - log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey") - - if glob.glob(str(subvolume / '*')): - raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)") - # Ideally we would like to check if the destination is already a subvolume. - # But then we would need the mount-point at this stage as well. - # So we'll comment out this check: - # elif subvolinfo := subvolume_info_from_path(subvolume): - # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") - - # And deal with it here: - SysCommand(f"btrfs subvolume create {subvolume}") - - return subvolume_info_from_path(subvolume) diff --git a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py deleted file mode 100644 index 5f5bdea6..00000000 --- a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py +++ /dev/null @@ -1,192 +0,0 @@ -import pathlib -import datetime -import logging -import string -import random -import shutil -from dataclasses import dataclass -from typing import Optional, List# , TYPE_CHECKING -from functools import cached_property - -# if TYPE_CHECKING: -# from ..blockdevice import BlockDevice - -from ...exceptions import DiskError -from ...general import SysCommand -from ...output import log -from ...storage import storage - - -@dataclass -class BtrfsSubvolumeInfo: - full_path :pathlib.Path - name :str - uuid :str - parent_uuid :str - creation_time :datetime.datetime - subvolume_id :int - generation :int - gen_at_creation :int - parent_id :int - top_level_id :int - send_transid :int - send_time :datetime.datetime - receive_transid :int - received_uuid :Optional[str] = None - flags :Optional[str] = None - receive_time :Optional[datetime.datetime] = None - snapshots :Optional[List] = None - - def __post_init__(self): - self.full_path = pathlib.Path(self.full_path) - - # Convert "-" entries to `None` - if self.parent_uuid == "-": - self.parent_uuid = None - if self.received_uuid == "-": - self.received_uuid = None - if self.flags == "-": - self.flags = None - if self.receive_time == "-": - self.receive_time = None - if self.snapshots == "": - self.snapshots = [] - - # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats) - self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time)) - self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time)) - if self.receive_time: - self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time)) - - @property - def parent_subvolume(self): - from .btrfs_helpers import find_parent_subvolume - - return find_parent_subvolume(self.full_path) - - @property - def root(self) -> bool: - from .btrfs_helpers import subvolume_info_from_path - - # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first - # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. - # It would also be nice if it could use findmnt(self.full_path) and traverse backwards - # finding the last occurrence of a subvolume which 'self' belongs to. - if volume := subvolume_info_from_path(storage['MOUNT_POINT']): - return self.full_path == volume.full_path - - return False - - @cached_property - def partition(self): - from ..helpers import findmnt, get_parent_of_partition, all_blockdevices - from ..partition import Partition - from ..blockdevice import BlockDevice - from ..mapperdev import MapperDev - from .btrfspartition import BTRFSPartition - from .btrfs_helpers import subvolume_info_from_path - - try: - # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device. - if filesystem := findmnt(self.full_path).get('filesystems', []): - if source := filesystem[0].get('source', None): - # Strip away subvolume definitions from findmnt - if '[' in source: - source = source[:source.find('[')] - - if filesystem[0].get('fstype', '') == 'btrfs': - return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - elif filesystem[0].get('source', '').startswith('/dev/mapper'): - return MapperDev(source) - else: - return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - except DiskError: - # Subvolume has never been mounted, we have no reliable way of finding where it is. - # But we have the UUID of the partition, and can begin looking for it by mounting - # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices. - - log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING) - for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items(): - if type(instance) in (Partition, MapperDev): - we_mounted_it = False - detection_mountpoint = instance.mountpoint - if not detection_mountpoint: - if type(instance) == Partition and instance.encrypted: - # TODO: Perhaps support unlocking encrypted volumes? - # This will cause a lot of potential user interactions tho. - log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG) - continue - - detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}") - detection_mountpoint.mkdir(parents=True, exist_ok=True) - - instance.mount(str(detection_mountpoint)) - we_mounted_it = True - - if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])): - if subvolume := subvolume_info_from_path(filesystem[0]['target']): - if subvolume.uuid == self.uuid: - # The top level subvolume matched of ourselves, - # which means the instance we're iterating has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - def iterate_children(struct): - for child in struct.get('children', []): - if '[' in child.get('source', ''): - yield subvolume_info_from_path(child['target']) - - for sub_child in iterate_children(child): - yield sub_child - - for child in iterate_children(filesystem[0]): - if child.uuid == self.uuid: - # We found a child within the instance that has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - if we_mounted_it: - instance.unmount() - shutil.rmtree(detection_mountpoint) - - @cached_property - def mount_options(self) -> Optional[List[str]]: - from ..helpers import findmnt - - if filesystem := findmnt(self.full_path).get('filesystems', []): - return filesystem[0].get('options').split(',') - - def convert_to_ISO_format(self, time_string): - time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '') - iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}" - return iso_string - - def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True): - from ..helpers import findmnt - - try: - if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False): - log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}") - SysCommand(f"umount {mountpoint}") - except DiskError: - # No previously mounted device at the mountpoint - pass - - if not options: - options = [] - - try: - if include_previously_known_options and (cached_options := self.mount_options): - options += cached_options - except DiskError: - pass - - if not any('subvol=' in x for x in options): - options += f'subvol={self.name}' - - SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}") - log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray") - - def unmount(self, recurse :bool = True): - SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") - log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") |