index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/installer.py | 148 |
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 4f46e458..b1570e16 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -180,80 +180,101 @@ class Installer: return True - def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: - from .luks import luks2 - - mountpoints = {} - for blockdevice in layouts: - for partition in layouts[blockdevice]['partitions']: - if (subvolumes := partition.get('btrfs', {}).get('subvolumes', {})): - if partition.get('encrypted',False): - if partition.get('mountpoint',None): - ppath = partition['mountpoint'] - else: - ppath = partition['device_instance'].path - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop" - # Immediately unlock the encrypted device to format the inner volume - with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=False) as unlocked_device: - unlocked_device.mount(f"{self.target}/") - try: - manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes,unlocked_device) - except Exception as e: - # every exception unmounts the physical volume. Otherwise we let the system in an unstable state - unlocked_device.unmount() - raise e - unlocked_device.unmount() - # TODO generate key - else: - self.mount(partition['device_instance'],"/") - try: - manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes) - except Exception as e: - # every exception unmounts the physical volume. Otherwise we let the system in an unstable state - partition['device_instance'].unmount() - raise e - partition['device_instance'].unmount() - else: - mountpoints[partition['mountpoint']] = partition - for mountpoint in sorted([mnt_dest for mnt_dest in mountpoints.keys() if mnt_dest is not None]): - partition = mountpoints[mountpoint] - if partition.get('encrypted', False) and not partition.get('subvolume',None): - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" - if not (password := partition.get('!password', None)): - raise RequirementError(f"Missing mountpoint {mountpoint} encryption password in layout: {partition}") - - with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: - if partition.get('generate-encryption-key-file'): - if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): - cryptkey_dir.mkdir(parents=True) - - # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key - # if we name the device to "xyzloop". - encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" - with open(f"{self.target}{encryption_key_path}", "w") as keyfile: - keyfile.write(generate_password(length=512)) + def _create_keyfile(self,luks_handle , partition :dict, password :str): + """ roiutine to create keyfiles, so it can be moved elsewere + """ + if partition.get('generate-encryption-key-file'): + if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): + cryptkey_dir.mkdir(parents=True) + # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key + # if we name the device to "xyzloop". + if partition.get('mountpoint',None): + encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" + else: + encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key" + with open(f"{self.target}{encryption_key_path}", "w") as keyfile: + keyfile.write(generate_password(length=512)) - os.chmod(f"{self.target}{encryption_key_path}", 0o400) + os.chmod(f"{self.target}{encryption_key_path}", 0o400) - luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) - luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) + luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) + luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {unlocked_device}", level=logging.INFO) - unlocked_device.mount(f"{self.target}{mountpoint}") + def _has_root(self, partition :dict) -> bool: + """ + Determine if an encrypted partition contains root in it + """ + if partition.get("mountpoint") is None: + if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})): + for mountpoint in [sub_list[subvolume] if isinstance(sub_list[subvolume],str) else sub_list[subvolume].get("mountpoint") for subvolume in sub_list]: + if mountpoint == '/': + return True + return False + else: + return False + elif partition.get("mountpoint") == '/': + return True + else: + return False + def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: + from .luks import luks2 + # set the partitions as a list not part of a tree (which we don't need anymore (i think) + list_part = [] + list_luks_handles = [] + for blockdevice in layouts: + list_part.extend(layouts[blockdevice]['partitions']) + + # we manage the encrypted partititons + for partition in [entry for entry in list_part if entry.get('encrypted',False)]: + # open the luks device and all associate stuff + if not (password := partition.get('!password', None)): + raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}") + # i change a bit the naming conventions for the loop device + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) - if partition.get('options',[]): - mount_options = ','.join(partition['options']) - partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options) - else: - partition['device_instance'].mount(f"{self.target}{mountpoint}") + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" + # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used + with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: + if partition.get('generate-encryption-key-file',False) and not self._has_root(partition): + list_luks_handles.append([luks_handle,partition,password]) + # this way all the requesrs will be to the dm_crypt device and not to the physical partition + partition['device_instance'] = unlocked_device + + # we manage the btrfs partitions + for partition in [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]: + self.mount(partition['device_instance'],"/") + try: + new_mountpoints = manage_btrfs_subvolumes(self,partition) + except Exception as e: + # every exception unmounts the physical volume. Otherwise we let the system in an unstable state + partition['device_instance'].unmount() + raise e + partition['device_instance'].unmount() + if new_mountpoints: + list_part.extend(new_mountpoints) + + # we mount. We need to sort by mountpoint to get a good working order + for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']): + mountpoint = partition['mountpoint'] + log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) + if partition.get('filesystem',{}).get('mount_options',[]): + mount_options = ','.join(partition['filesystem']['mount_options']) + partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options) + else: + partition['device_instance'].mount(f"{self.target}{mountpoint}") time.sleep(1) try: get_mount_info(f"{self.target}{mountpoint}", traverse=False) except DiskError: raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).") + # once everything is mounted, we generate the key files in the correct place + for handle in list_luks_handles: + ppath = handle[1]['device_instance'].path + log(f"creating key-file for {ppath}",level=logging.INFO) + self._create_keyfile(handle[0],handle[1],handle[2]) + def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None: if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'): os.makedirs(f'{self.target}{mountpoint}') @@ -692,6 +713,7 @@ class Installer: base_path,bind_path = split_bind_name(str(root_partition.path)) if bind_path is not None: # and root_fs_type == 'btrfs': options_entry = f"rootflags=subvol={bind_path} " + options_entry + if real_device := self.detect_encryption(root_partition): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) |