1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
import logging
import re
from pathlib import Path
from typing import Optional, Dict, Any, TYPE_CHECKING
from ...models.subvolume import Subvolume
from ...exceptions import SysCallError, DiskError
from ...general import SysCommand
from ...output import log
from ...plugins import plugins
from ..helpers import get_mount_info
from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
if TYPE_CHECKING:
from .btrfspartition import BTRFSPartition
from ...installer import Installer
class fstab_btrfs_compression_plugin():
def __init__(self, partition_dict):
self.partition_dict = partition_dict
def on_genfstab(self, installation):
with open(f"{installation.target}/etc/fstab", 'r') as fh:
fstab = fh.read()
# Replace the {installation}/etc/fstab with entries
# using the compress=zstd where the mountpoint has compression set.
with open(f"{installation.target}/etc/fstab", 'w') as fh:
for line in fstab.split('\n'):
# So first we grab the mount options by using subvol=.*? as a locator.
# And we also grab the mountpoint for the entry, for instance /var/log
if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)):
for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []):
# We then locate the correct subvolume and check if it's compressed
if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip():
# We then sneak in the compress=zstd option if it doesn't already exist:
# We skip entries where compression is already defined
if ',compress=zstd,' not in line:
line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}")
break
fh.write(f"{line}\n")
return True
def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
# we normalize the subvolume name (getting rid of slash at the start if exists.
# In our implementation has no semantic load.
# Every subvolume is created from the top of the hierarchy- and simplifies its further use
name = subvolume.name.lstrip('/')
mountpoint = Path(subvolume.mountpoint)
installation_target = Path(installation.target)
mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
mountpoint.mkdir(parents=True, exist_ok=True)
mount_options = subvolume.options + [f'subvol={name}']
log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
for subvolume in partition_dict['btrfs']['subvolumes']:
# we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
# Every subvolume is created from the top of the hierarchy- and simplifies its further use
name = subvolume.name.lstrip('/')
# We create the subvolume using the BTRFSPartition instance.
# That way we ensure not only easy access, but also accurate mount locations etc.
partition_dict['device_instance'].create_subvolume(name, installation=installation)
# Make the nodatacow processing now
# It will be the main cause of creation of subvolumes which are not to be mounted
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
if subvolume.nodatacow:
if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
# Make the compress processing now
# it is not an options which can be established by subvolume (but for whole file systems), and can be
# set up via a simple attribute change in a directory (if empty). And here the directories are brand new
# in this way only zstd compression is activaded
# TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
if subvolume.compress:
if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
if 'fstab_btrfs_compression_plugin' not in plugins:
plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict)
def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
try:
subvolume_name = ''
result = {}
for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
if index == 0:
subvolume_name = line.strip().decode('UTF-8')
continue
if b':' in line:
key, value = line.strip().decode('UTF-8').split(':', 1)
# A bit of a hack, until I figure out how @dataclass
# allows for hooking in a pre-processor to do this we have to do it here:
result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
except SysCallError as error:
log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
return None
def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
# A root path cannot have a parent
if str(path) == '/':
return None
if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters):
if not (subvolume := subvolume_info_from_path(found_mount['target'])):
if found_mount['target'] == '/':
return None
return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
return subvolume
return None
|