index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/partitioning_menu.py | 429 |
diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py new file mode 100644 index 00000000..fb1eb74b --- /dev/null +++ b/archinstall/lib/disk/partitioning_menu.py @@ -0,0 +1,429 @@ +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any, TYPE_CHECKING, List, Optional, Tuple +from dataclasses import dataclass + +from .device_model import ( + PartitionModification, FilesystemType, BDevice, + Size, Unit, PartitionType, PartitionFlag, + ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption +) +from ..hardware import SysInfo +from ..menu import Menu, ListManager, MenuSelection, TextInput +from ..output import FormattedOutput, warn +from .subvolume_menu import SubvolumeMenu + +if TYPE_CHECKING: + _: Any + + +@dataclass +class DefaultFreeSector: + start: Size + end: Size + + +class PartitioningList(ListManager): + """ + subclass of ListManager for the managing of user accounts + """ + def __init__(self, prompt: str, device: BDevice, device_partitions: List[PartitionModification]): + self._device = device + self._actions = { + 'create_new_partition': str(_('Create a new partition')), + 'suggest_partition_layout': str(_('Suggest partition layout')), + 'remove_added_partitions': str(_('Remove all newly added partitions')), + 'assign_mountpoint': str(_('Assign mountpoint')), + 'mark_formatting': str(_('Mark/Unmark to be formatted (wipes data)')), + 'mark_bootable': str(_('Mark/Unmark as bootable')), + 'set_filesystem': str(_('Change filesystem')), + 'btrfs_mark_compressed': str(_('Mark/Unmark as compressed')), # btrfs only + 'btrfs_mark_nodatacow': str(_('Mark/Unmark as nodatacow')), # btrfs only + 'btrfs_set_subvolumes': str(_('Set subvolumes')), # btrfs only + 'delete_partition': str(_('Delete partition')) + } + + display_actions = list(self._actions.values()) + super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:]) + + def selected_action_display(self, partition: PartitionModification) -> str: + return str(_('Partition')) + + def filter_options(self, selection: PartitionModification, options: List[str]) -> List[str]: + not_filter = [] + + # only display formatting if the partition exists already + if not selection.exists(): + not_filter += [self._actions['mark_formatting']] + else: + # only allow options if the existing partition + # was marked as formatting, otherwise we run into issues where + # 1. select a new fs -> potentially mark as wipe now + # 2. Switch back to old filesystem -> should unmark wipe now, but + # how do we know it was the original one? + not_filter += [ + self._actions['set_filesystem'], + self._actions['mark_bootable'], + self._actions['btrfs_mark_compressed'], + self._actions['btrfs_mark_nodatacow'], + self._actions['btrfs_set_subvolumes'] + ] + + # non btrfs partitions shouldn't get btrfs options + if selection.fs_type != FilesystemType.Btrfs: + not_filter += [ + self._actions['btrfs_mark_compressed'], + self._actions['btrfs_mark_nodatacow'], + self._actions['btrfs_set_subvolumes'] + ] + else: + not_filter += [self._actions['assign_mountpoint']] + + return [o for o in options if o not in not_filter] + + def handle_action( + self, + action: str, + entry: Optional[PartitionModification], + data: List[PartitionModification] + ) -> List[PartitionModification]: + action_key = [k for k, v in self._actions.items() if v == action][0] + + match action_key: + case 'create_new_partition': + new_partition = self._create_new_partition() + data += [new_partition] + case 'suggest_partition_layout': + new_partitions = self._suggest_partition_layout(data) + if len(new_partitions) > 0: + data = new_partitions + case 'remove_added_partitions': + choice = self._reset_confirmation() + if choice.value == Menu.yes(): + data = [part for part in data if part.is_exists_or_modify()] + case 'assign_mountpoint' if entry: + entry.mountpoint = self._prompt_mountpoint() + if entry.mountpoint == Path('/boot'): + entry.set_flag(PartitionFlag.Boot) + if SysInfo.has_uefi(): + entry.set_flag(PartitionFlag.ESP) + case 'mark_formatting' if entry: + self._prompt_formatting(entry) + case 'mark_bootable' if entry: + entry.invert_flag(PartitionFlag.Boot) + if SysInfo.has_uefi(): + entry.invert_flag(PartitionFlag.ESP) + case 'set_filesystem' if entry: + fs_type = self._prompt_partition_fs_type() + if fs_type: + entry.fs_type = fs_type + # btrfs subvolumes will define mountpoints + if fs_type == FilesystemType.Btrfs: + entry.mountpoint = None + case 'btrfs_mark_compressed' if entry: + self._toggle_mount_option(entry, BtrfsMountOption.compress) + case 'btrfs_mark_nodatacow' if entry: + self._toggle_mount_option(entry, BtrfsMountOption.nodatacow) + case 'btrfs_set_subvolumes' if entry: + self._set_btrfs_subvolumes(entry) + case 'delete_partition' if entry: + data = self._delete_partition(entry, data) + + return data + + def _delete_partition( + self, + entry: PartitionModification, + data: List[PartitionModification] + ) -> List[PartitionModification]: + if entry.is_exists_or_modify(): + entry.status = ModificationStatus.Delete + return data + else: + return [d for d in data if d != entry] + + def _toggle_mount_option( + self, + partition: PartitionModification, + option: BtrfsMountOption + ): + if option.value not in partition.mount_options: + if option == BtrfsMountOption.compress: + partition.mount_options = [ + o for o in partition.mount_options + if o != BtrfsMountOption.nodatacow.value + ] + + partition.mount_options = [ + o for o in partition.mount_options + if not o.startswith(BtrfsMountOption.compress.name) + ] + + partition.mount_options.append(option.value) + else: + partition.mount_options = [ + o for o in partition.mount_options if o != option.value + ] + + def _set_btrfs_subvolumes(self, partition: PartitionModification): + partition.btrfs_subvols = SubvolumeMenu( + _("Manage btrfs subvolumes for current partition"), + partition.btrfs_subvols + ).run() + + def _prompt_formatting(self, partition: PartitionModification): + # an existing partition can toggle between Exist or Modify + if partition.is_modify(): + partition.status = ModificationStatus.Exist + return + elif partition.exists(): + partition.status = ModificationStatus.Modify + + # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really + # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, + # it's safe to change the filesystem for this partition. + if partition.fs_type == FilesystemType.Crypto_luks: + prompt = str(_('This partition is currently encrypted, to format it a filesystem has to be specified')) + fs_type = self._prompt_partition_fs_type(prompt) + partition.fs_type = fs_type + + if fs_type == FilesystemType.Btrfs: + partition.mountpoint = None + + def _prompt_mountpoint(self) -> Path: + header = str(_('Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + '\n' + header += str(_('If mountpoint /boot is set, then the partition will also be marked as bootable.')) + '\n' + prompt = str(_('Mountpoint: ')) + + print(header) + + while True: + value = TextInput(prompt).run().strip() + + if value: + mountpoint = Path(value) + break + + return mountpoint + + def _prompt_partition_fs_type(self, prompt: str = '') -> FilesystemType: + options = {fs.value: fs for fs in FilesystemType if fs != FilesystemType.Crypto_luks} + + prompt = prompt + '\n' + str(_('Enter a desired filesystem type for the partition')) + choice = Menu(prompt, options, sort=False, skip=False).run() + return options[choice.single_value] + + def _validate_value( + self, + sector_size: SectorSize, + total_size: Size, + text: str, + start: Optional[Size] + ) -> Optional[Size]: + match = re.match(r'([0-9]+)([a-zA-Z|%]*)', text, re.I) + + if match: + str_value, unit = match.groups() + + if unit == '%' and start: + available = total_size - start + value = int(available.value * (int(str_value) / 100)) + unit = available.unit.name + else: + value = int(str_value) + + if unit and unit not in Unit.get_all_units(): + return None + + unit = Unit[unit] if unit else Unit.sectors + return Size(value, unit, sector_size) + + return None + + def _enter_size( + self, + sector_size: SectorSize, + total_size: Size, + prompt: str, + default: Size, + start: Optional[Size], + ) -> Size: + while True: + value = TextInput(prompt).run().strip() + size: Optional[Size] = None + if not value: + size = default + else: + size = self._validate_value(sector_size, total_size, value, start) + + if size: + return size + + warn(f'Invalid value: {value}') + + def _prompt_size(self) -> Tuple[Size, Size]: + device_info = self._device.device_info + + text = str(_('Current free sectors on device {}:')).format(device_info.path) + '\n\n' + free_space_table = FormattedOutput.as_table(device_info.free_space_regions) + prompt = text + free_space_table + '\n' + + total_sectors = device_info.total_size.format_size(Unit.sectors, device_info.sector_size) + total_bytes = device_info.total_size.format_size(Unit.B) + + prompt += str(_('Total: {} / {}')).format(total_sectors, total_bytes) + '\n\n' + prompt += str(_('All entered values can be suffixed with a unit: %, B, KB, KiB, MB, MiB...')) + '\n' + prompt += str(_('If no unit is provided, the value is interpreted as sectors')) + '\n' + print(prompt) + + default_free_sector = self._find_default_free_space() + + if not default_free_sector: + default_free_sector = DefaultFreeSector( + Size(0, Unit.sectors, self._device.device_info.sector_size), + Size(0, Unit.sectors, self._device.device_info.sector_size) + ) + + # prompt until a valid start sector was entered + start_prompt = str(_('Enter start (default: sector {}): ')).format(default_free_sector.start.value) + + start_size = self._enter_size( + device_info.sector_size, + device_info.total_size, + start_prompt, + default_free_sector.start, + None + ) + + if start_size.value == default_free_sector.start.value and default_free_sector.end.value != 0: + end_size = default_free_sector.end + else: + end_size = device_info.total_size + + # prompt until valid end sector was entered + end_prompt = str(_('Enter end (default: {}): ')).format(end_size.as_text()) + end_size = self._enter_size( + device_info.sector_size, + device_info.total_size, + end_prompt, + end_size, + start_size + ) + + return start_size, end_size + + def _find_default_free_space(self) -> Optional[DefaultFreeSector]: + device_info = self._device.device_info + + largest_free_area: Optional[DeviceGeometry] = None + largest_deleted_area: Optional[PartitionModification] = None + + if len(device_info.free_space_regions) > 0: + largest_free_area = max(device_info.free_space_regions, key=lambda r: r.get_length()) + + deleted_partitions = list(filter(lambda x: x.status == ModificationStatus.Delete, self._data)) + if len(deleted_partitions) > 0: + largest_deleted_area = max(deleted_partitions, key=lambda p: p.length) + + def _free_space(space: DeviceGeometry) -> DefaultFreeSector: + start = Size(space.start, Unit.sectors, device_info.sector_size) + end = Size(space.end, Unit.sectors, device_info.sector_size) + return DefaultFreeSector(start, end) + + def _free_deleted(space: PartitionModification) -> DefaultFreeSector: + start = space.start.convert(Unit.sectors, self._device.device_info.sector_size) + end = space.end.convert(Unit.sectors, self._device.device_info.sector_size) + return DefaultFreeSector(start, end) + + if not largest_deleted_area and largest_free_area: + return _free_space(largest_free_area) + elif not largest_free_area and largest_deleted_area: + return _free_deleted(largest_deleted_area) + elif not largest_deleted_area and not largest_free_area: + return None + elif largest_free_area and largest_deleted_area: + free_space = _free_space(largest_free_area) + if free_space.start > largest_deleted_area.start: + return free_space + else: + return _free_deleted(largest_deleted_area) + + return None + + def _create_new_partition(self) -> PartitionModification: + fs_type = self._prompt_partition_fs_type() + + start_size, end_size = self._prompt_size() + length = end_size - start_size + + # new line for the next prompt + print() + + mountpoint = None + if fs_type != FilesystemType.Btrfs: + mountpoint = self._prompt_mountpoint() + + partition = PartitionModification( + status=ModificationStatus.Create, + type=PartitionType.Primary, + start=start_size, + length=length, + fs_type=fs_type, + mountpoint=mountpoint + ) + + if partition.mountpoint == Path('/boot'): + partition.set_flag(PartitionFlag.Boot) + if SysInfo.has_uefi(): + partition.set_flag(PartitionFlag.ESP) + + return partition + + def _reset_confirmation(self) -> MenuSelection: + prompt = str(_('This will remove all newly added partitions, continue?')) + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run() + return choice + + def _suggest_partition_layout(self, data: List[PartitionModification]) -> List[PartitionModification]: + # if modifications have been done already, inform the user + # that this operation will erase those modifications + if any([not entry.exists() for entry in data]): + choice = self._reset_confirmation() + if choice.value == Menu.no(): + return [] + + from ..interactions.disk_conf import suggest_single_disk_layout + + device_modification = suggest_single_disk_layout(self._device) + return device_modification.partitions + + +def manual_partitioning( + device: BDevice, + prompt: str = '', + preset: List[PartitionModification] = [] +) -> List[PartitionModification]: + if not prompt: + prompt = str(_('Partition management: {}')).format(device.device_info.path) + '\n' + prompt += str(_('Total length: {}')).format(device.device_info.total_size.format_size(Unit.MiB)) + + manual_preset = [] + + if not preset: + # we'll display the existing partitions of the device + for partition in device.partition_infos: + manual_preset.append( + PartitionModification.from_existing_partition(partition) + ) + else: + manual_preset = preset + + menu_list = PartitioningList(prompt, device, manual_preset) + partitions: List[PartitionModification] = menu_list.run() + + if menu_list.is_last_choice_cancel(): + return preset + + return partitions |