index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk.py | 118 | ||||
-rw-r--r-- | archinstall/lib/user_interaction.py | 280 |
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py index d0e3f6a2..3fda5da6 100644 --- a/archinstall/lib/disk.py +++ b/archinstall/lib/disk.py @@ -31,10 +31,10 @@ class BlockDevice: self.info = info self.keep_partitions = True self.part_cache = OrderedDict() + # TODO: Currently disk encryption is a BIT misleading. # It's actually partition-encryption, but for future-proofing this # I'm placing the encryption password on a BlockDevice level. - self.encryption_password = None def __repr__(self, *args, **kwargs): return f"BlockDevice({self.device})" @@ -48,6 +48,9 @@ class BlockDevice: raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] + def __len__(self): + return len(self.partitions) + def json(self): """ json() has precedence over __dump__, so this is a way @@ -61,12 +64,22 @@ class BlockDevice: def __dump__(self): return { - 'path': self.path, - 'info': self.info, - 'partition_cache': self.part_cache + self.path : { + 'partuuid' : self.uuid, + 'wipe' : self.info.get('wipe', None), + 'partitions' : [part.__dump__() for part in self.partitions.values()] + } } @property + def partition_type(self): + output = b"".join(sys_command(f"lsblk --json -o+PTTYPE {self.path}")) + output = json.loads(output.decode('UTF-8')) + + for device in output['blockdevices']: + return device['pttype'] + + @property def device(self): """ Returns the actual device-endpoint of the BlockDevice. @@ -144,6 +157,16 @@ class BlockDevice: for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: return partition.get('uuid', None) + @property + def size(self): + output = b"".join(sys_command(f"lsblk --json -o+SIZE {self.path}")) + output = json.loads(output.decode('UTF-8')) + + for device in output['blockdevices']: + assert device['size'][-1] == 'G' # Make sure we're counting in Gigabytes, otherwise the next logic fails. + + return float(device['size'][:-1]) + def has_partitions(self): return len(self.partitions) @@ -210,6 +233,82 @@ class Partition: else: return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})' + def __dump__(self): + return { + 'type' : 'primary', + 'PARTUUID' : self.uuid, + 'wipe' : self.allow_formatting, + 'boot' : self.boot, + 'ESP' : self.boot, + 'mountpoint' : self.target_mountpoint, + 'encrypted' : self._encrypted, + 'start' : self.start, + 'size' : self.end, + 'filesystem' : { + 'format' : get_filesystem_type(self.path) + } + } + + @property + def sector_size(self): + output = b"".join(sys_command(f"lsblk --json -o+LOG-SEC {self.path}")) + output = json.loads(output.decode('UTF-8')) + + for device in output['blockdevices']: + return device.get('log-sec', None) + + @property + def start(self): + output = b"".join(sys_command(f"sfdisk --json {self.block_device.path}")) + output = json.loads(output.decode('UTF-8')) + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition['start']# * self.sector_size + + @property + def end(self): + # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. + output = b"".join(sys_command(f"sfdisk --json {self.block_device.path}")) + output = json.loads(output.decode('UTF-8')) + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition['size']# * self.sector_size + + @property + def boot(self): + output = b"".join(sys_command(f"sfdisk --json {self.block_device.path}")) + output = json.loads(output.decode('UTF-8')) + + # Get the bootable flag from the sfdisk output: + # { + # "partitiontable": { + # "label":"dos", + # "id":"0xd202c10a", + # "device":"/dev/loop0", + # "unit":"sectors", + # "sectorsize":512, + # "partitions": [ + # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true} + # ] + # } + # } + + for partition in output.get('partitiontable', {}).get('partitions', []): + if partition['node'] == self.path: + return partition.get('bootable', False) + + return False + + @property + def partition_type(self): + output = b"".join(sys_command(f"lsblk --json -o+PTTYPE {self.path}")) + output = json.loads(output.decode('UTF-8')) + + for device in output['blockdevices']: + return device['pttype'] + @property def uuid(self) -> Optional[str]: """ @@ -664,3 +763,14 @@ def disk_layouts(): except SysCallError as err: log(f"Could not return disk layouts: {err}") return None + + +def encrypted_partitions(blockdevices :dict) -> bool: + for partition in blockdevices.values(): + if partition.get('encrypted', False): + yield partition + +def find_partition_by_mountpoint(partitions, relative_mountpoint :str): + for partition in partitions: + if partition.get('mountpoint', None) == relative_mountpoint: + return partition
\ No newline at end of file diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index b52267d9..e28b66b2 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -188,8 +188,24 @@ def generic_multi_select(options, text="Select one or more of the options above except RequirementError as e: log(f" * {e} * ", fg='red') + sys.stdout.write('\n') + sys.stdout.flush() return selected_options +def select_encrypted_partitions(blockdevices :dict) -> dict: + print(blockdevices[0]) + + if len(blockdevices) == 1: + if len(blockdevices[0]['partitions']) == 2: + root = find_partition_by_mountpoint(blockdevices[0]['partitions'], '/') + blockdevices[0]['partitions'][root]['encrypted'] = True + return True + + options = [] + for partition in blockdevices.values(): + options.append({key: val for key, val in partition.items() if val}) + + print(generic_multi_select(options, f"Choose which partitions to encrypt (leave blank when done): ")) class MiniCurses: def __init__(self, width, height): @@ -535,6 +551,270 @@ def generic_select(options, input_text="Select one of the above by index or abso return selected_option +def select_partition_layout(block_device): + return { + "/dev/sda": { # Block Device level + "wipe": False, # Safety flags + "partitions" : [ # Affected / New partitions + { + "PARTUUID" : "654bb317-1b73-4339-9a00-7222792f4ba9", # If existing partition + "wipe" : False, # Safety flags + "boot" : True, # Safety flags / new flags + "ESP" : True, # Safety flags / new flags + "mountpoint" : "/mnt/boot" + } + ] + }, + "/dev/sdb" : { + "wipe" : True, + "partitions" : [ + { + # No PARTUUID required here since it's a new partition + "type" : "primary", # parted options + "size" : "100%", + "filesystem" : { + "encrypted" : True, # TODO: Not sure about this here + "format": "btrfs", # mkfs options + }, + "mountpoint" : "/mnt" + } + ] + } + } + +def valid_fs_type(fstype :str) -> bool: + # https://www.gnu.org/software/parted/manual/html_node/mkpart.html + + return fstype in [ + "ext2", + "fat16", "fat32", + "hfs", "hfs+", "hfsx", + "linux-swap", + "NTFS", + "reiserfs", + "ufs", + "btrfs", + ] + +def valid_parted_position(pos :str): + if not len(pos): + return False + + if pos.isdigit(): + return True + + if pos[-1] == '%' and pos[:-1].isdigit(): + return True + + if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].isdigit(): + return True + + return False + +def partition_overlap(partitions :list, start :str, end :str) -> bool: + # TODO: Implement sanity check + return False + +def get_default_partition_layout(block_devices): + if len(block_devices) == 1: + return { + block_devices[0] : [ + { # Boot + "type" : "primary", + "start" : "0MiB", + "size" : "513MiB", + "boot" : True, + "mountpoint" : "/boot", + "filesystem" : { + "format" : "fat32" + } + }, + { # Root + "type" : "primary", + "start" : "513MiB", + "encrypted" : True, + "size" : f"{max(block_devices[0].size*0.2, 20)}GiB", + "mountpoint" : "", + "filesystem" : { + "format" : "btrfs" + } + }, + { # Home + "type" : "primary", + "encrypted" : True, + "start" : f"{max(block_devices[0].size*0.2, 20)}GiB", + "size" : "100%", + "mountpoint" : "/home", + "filesystem" : { + "format" : "btrfs" + } + } + ] + } + +def wipe_and_create_partitions(block_device): + if hasUEFI(): + partition_type = 'gpt' + else: + partition_type = 'msdos' + + partitions_result = [] # Test code: [part.__dump__() for part in block_device.partitions.values()] + suggested_layout = [ + { # Boot + "type" : "primary", + "start" : "0MiB", + "size" : "513MiB", + "boot" : True, + "mountpoint" : "/boot", + "filesystem" : { + "format" : "fat32" + } + }, + { # Root + "type" : "primary", + "start" : "513MiB", + "encrypted" : True, + "size" : f"{max(block_device.size*0.2, 20)}GiB", + "mountpoint" : "", + "filesystem" : { + "format" : "btrfs" + } + }, + { # Home + "type" : "primary", + "encrypted" : True, + "start" : f"{max(block_device.size*0.2, 20)}GiB", + "size" : "100%", + "mountpoint" : "/home", + "filesystem" : { + "format" : "btrfs" + } + } + ] + # TODO: Squeeze in BTRFS subvolumes here + + while True: + modes = [ + "Create new partition", + "Suggest partition layout", + "Delete partition" if len(partitions_result) else "", + "Assign mount-point for partition" if len(partitions_result) else "", + "Mark/Unmark a partition as encrypted" if len(partitions_result) else "", + "Mark/Unmark a partition as bootable (automatic for /boot)" if len(partitions_result) else "" + ] + + # Print current partition layout: + if len(partitions_result): + print('Current partition layout:') + for partition in partitions_result: + print({key: val for key, val in partition.items() if val}) + print() + + task = generic_select(modes, + input_text=f"Select what to do with {block_device} (leave blank when done): ") + + if task == 'Create new partition': + if partition_type == 'gpt': + # https://www.gnu.org/software/parted/manual/html_node/mkpart.html + # https://www.gnu.org/software/parted/manual/html_node/mklabel.html + name = input("Enter a desired name for the partition: ").strip() + fstype = input("Enter a desired filesystem type for the partition: ").strip() + start = input("Enter the start sector of the partition (percentage or block number, ex: 0%): ").strip() + end = input("Enter the end sector of the partition (percentage or block number, ex: 100%): ").strip() + + if valid_parted_position(start) and valid_parted_position(end) and valid_fs_type(fstype): + if partition_overlap(partitions_result, start, end): + log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", fg="red") + continue + + partitions_result.append({ + "type" : "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject + "start" : start, + "size" : end, + "mountpoint" : None, + "filesystem" : { + "format" : fstype + } + }) + else: + log(f"Invalid start, end or fstype for this partition. Ignoring this partition creation.", fg="red") + continue + elif task == "Suggest partition layout": + if len(partitions_result): + if input(f"{block_device} contains queued partitions, this will remove those, are you sure? y/N: ").strip().lower() in ('', 'n'): + continue + + partitions_result = [*suggested_layout] + elif task is None: + return partitions_result + else: + for index, partition in enumerate(partitions_result): + print(f"{index}: Start: {partition['start']}, End: {partition['size']} ({partition['filesystem']['format']}{', mounting at: '+partition['mountpoint'] if partition['mountpoint'] else ''})") + + if task == "Delete partition": + if (partition := generic_select(partitions_result, 'Select which partition to delete: ', options_output=False)): + del(partitions_result[partitions_result.index(partition)]) + elif task == "Assign mount-point for partition": + if (partition := generic_select(partitions_result, 'Select which partition to mount where: ', options_output=False)): + print(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.') + mountpoint = input('Select where to mount partition (leave blank to remove mountpoint): ').strip() + + if len(mountpoint): + partitions_result[partitions_result.index(partition)]['mountpoint'] = mountpoint + if mountpoint == '/boot': + log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow") + partitions_result[partitions_result.index(partition)]['boot'] = True + else: + del(partitions_result[partitions_result.index(partition)]['mountpoint']) + + elif task == "Mark/Unmark a partition as encrypted": + if (partition := generic_select(partitions_result, 'Select which partition to mark as encrypted: ', options_output=False)): + # Negate the current encryption marking + partitions_result[partitions_result.index(partition)]['encrypted'] = not partitions_result[partitions_result.index(partition)].get('encrypted', False) + + elif task == "Mark/Unmark a partition as bootable (automatic for /boot)": + if (partition := generic_select(partitions_result, 'Select which partition to mark as bootable: ', options_output=False)): + partitions_result[partitions_result.index(partition)]['boot'] = not partitions_result[partitions_result.index(partition)].get('boot', False) + + return partitions_result + +def select_individual_blockdevice_usage(block_devices :list): + result = {} + + for device in block_devices: + log(f'Select what to do with {device}', fg="yellow") + modes = [ + "Wipe and create new partitions", + "Re-use partitions" + ] + + device_mode = generic_select(modes) + + if device_mode == "Re-use partitions": + layout = select_partition_layout(device) + elif device_mode == "Wipe and create new partitions": + layout = wipe_and_create_partitions(device) + else: + continue + + result[device] = layout + + return result + + +def select_disk_layout(block_devices :list): + modes = [ + "Wipe all selected drives and use a best-effort default partition layout", + "Select what to do with each individual drive (followed by partition usage)" + ] + + mode = generic_select(modes, input_text=f"Select what you wish to do with the selected block devices: ") + + if mode == 'Wipe all selected drives and use a best-effort default partition layout': + return get_default_partition_layout(block_devices) + else: + return select_individual_blockdevice_usage(block_devices) + def select_disk(dict_o_disks): """ |