1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
import pathlib
import datetime
import logging
import string
import random
import shutil
from dataclasses import dataclass
from typing import Optional, List# , TYPE_CHECKING
from functools import cached_property
# if TYPE_CHECKING:
# from ..blockdevice import BlockDevice
from ...exceptions import DiskError
from ...general import SysCommand
from ...output import log
from ...storage import storage
@dataclass
class BtrfsSubvolume:
full_path :pathlib.Path
name :str
uuid :str
parent_uuid :str
creation_time :datetime.datetime
subvolume_id :int
generation :int
gen_at_creation :int
parent_id :int
top_level_id :int
send_transid :int
send_time :datetime.datetime
receive_transid :int
received_uuid :Optional[str] = None
flags :Optional[str] = None
receive_time :Optional[datetime.datetime] = None
snapshots :Optional[List] = None
def __post_init__(self):
self.full_path = pathlib.Path(self.full_path)
# Convert "-" entries to `None`
if self.parent_uuid == "-":
self.parent_uuid = None
if self.received_uuid == "-":
self.received_uuid = None
if self.flags == "-":
self.flags = None
if self.receive_time == "-":
self.receive_time = None
if self.snapshots == "":
self.snapshots = []
# Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats)
self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time))
self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time))
if self.receive_time:
self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time))
@property
def parent_subvolume(self):
from .btrfs_helpers import find_parent_subvolume
return find_parent_subvolume(self.full_path)
@property
def root(self) -> bool:
from .btrfs_helpers import subvolume_info_from_path
# TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
# occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
# It would also be nice if it could use findmnt(self.full_path) and traverse backwards
# finding the last occurrence of a subvolume which 'self' belongs to.
if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
return self.full_path == volume.full_path
return False
@cached_property
def partition(self):
from ..helpers import findmnt, get_parent_of_partition, all_blockdevices
from ..partition import Partition
from ..blockdevice import BlockDevice
from ..mapperdev import MapperDev
from .btrfspartition import BTRFSPartition
from .btrfs_helpers import subvolume_info_from_path
try:
# If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device.
if filesystem := findmnt(self.full_path).get('filesystems', []):
if source := filesystem[0].get('source', None):
# Strip away subvolume definitions from findmnt
if '[' in source:
source = source[:source.find('[')]
if filesystem[0].get('fstype', '') == 'btrfs':
return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
elif filesystem[0].get('source', '').startswith('/dev/mapper'):
return MapperDev(source)
else:
return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
except DiskError:
# Subvolume has never been mounted, we have no reliable way of finding where it is.
# But we have the UUID of the partition, and can begin looking for it by mounting
# all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices.
log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING)
for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items():
if type(instance) in (Partition, MapperDev):
we_mounted_it = False
detection_mountpoint = instance.mountpoint
if not detection_mountpoint:
if type(instance) == Partition and instance.encrypted:
# TODO: Perhaps support unlocking encrypted volumes?
# This will cause a lot of potential user interactions tho.
log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG)
continue
detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}")
detection_mountpoint.mkdir(parents=True, exist_ok=True)
instance.mount(str(detection_mountpoint))
we_mounted_it = True
if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])):
if subvolume := subvolume_info_from_path(filesystem[0]['target']):
if subvolume.uuid == self.uuid:
# The top level subvolume matched of ourselves,
# which means the instance we're iterating has the subvol we're looking for.
log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
return instance
def iterate_children(struct):
for child in struct.get('children', []):
if '[' in child.get('source', ''):
yield subvolume_info_from_path(child['target'])
for sub_child in iterate_children(child):
yield sub_child
for child in iterate_children(filesystem[0]):
if child.uuid == self.uuid:
# We found a child within the instance that has the subvol we're looking for.
log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
return instance
if we_mounted_it:
instance.unmount()
shutil.rmtree(detection_mountpoint)
@cached_property
def mount_options(self) -> Optional[List[str]]:
from ..helpers import findmnt
if filesystem := findmnt(self.full_path).get('filesystems', []):
return filesystem[0].get('options').split(',')
def convert_to_ISO_format(self, time_string):
time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '')
iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}"
return iso_string
def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True):
from ..helpers import findmnt
try:
if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False):
log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}")
SysCommand(f"umount {mountpoint}")
except DiskError:
# No previously mounted device at the mountpoint
pass
if not options:
options = []
try:
if include_previously_known_options and (cached_options := self.mount_options):
options += cached_options
except DiskError:
pass
if not any('subvol=' in x for x in options):
options += f'subvol={self.name}'
SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}")
log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray")
def unmount(self, recurse :bool = True):
SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
|