Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/luks.py
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2022-01-14 08:11:30 +0100
committerAnton Hvornum <anton@hvornum.se>2022-01-14 08:11:30 +0100
commit4bd07ea19f17ef8c78bf12f0d3d50f71c2306c19 (patch)
treef417000cc16087dca1aed81391431ab85de7f513 /archinstall/lib/luks.py
parent0bc3e94c795fdde55ccc9b233b897498dc7b498e (diff)
parente8b6b1b334fffe5c5de8c2951a974b0126ffd2b0 (diff)
Merge branch 'master' of github.com:archlinux/archinstall
Diffstat (limited to 'archinstall/lib/luks.py')
-rw-r--r--archinstall/lib/luks.py45
1 files changed, 34 insertions, 11 deletions
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 255c75d9..d39bce0f 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,9 +1,15 @@
+from __future__ import annotations
import json
import logging
import os
import pathlib
import shlex
import time
+from typing import Optional, List,TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .installer import Installer
+
from .disk import Partition, convert_device_to_uuid
from .general import SysCommand, SysCommandWorker
from .output import log
@@ -11,7 +17,15 @@ from .exceptions import SysCallError, DiskError
from .storage import storage
class luks2:
- def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
+ def __init__(self,
+ partition :Partition,
+ mountpoint :str,
+ password :str,
+ key_file :Optional[str] = None,
+ auto_unmount :bool = False,
+ *args :str,
+ **kwargs :str):
+
self.password = password
self.partition = partition
self.mountpoint = mountpoint
@@ -22,7 +36,7 @@ class luks2:
self.filesystem = 'crypto_LUKS'
self.mapdev = None
- def __enter__(self):
+ def __enter__(self) -> Partition:
if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
@@ -34,16 +48,23 @@ class luks2:
return self.unlock(self.partition, self.mountpoint, self.key_file)
- def __exit__(self, *args, **kwargs):
+ def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if self.auto_unmount:
self.close()
if len(args) >= 2 and args[1]:
raise args[1]
+
return True
- def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
+ def encrypt(self, partition :Partition,
+ password :Optional[str] = None,
+ key_size :int = 512,
+ hash_type :str = 'sha512',
+ iter_time :int = 10000,
+ key_file :Optional[str] = None) -> str:
+
log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
if not key_file:
@@ -119,7 +140,7 @@ class luks2:
return key_file
- def unlock(self, partition, mountpoint, key_file):
+ def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition:
"""
Mounts a luks2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
@@ -142,24 +163,24 @@ class luks2:
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
return unlocked_partition
- def close(self, mountpoint=None):
+ def close(self, mountpoint :Optional[str] = None) -> bool:
if not mountpoint:
mountpoint = self.mapdev
SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
return os.path.islink(self.mapdev) is False
- def format(self, path):
+ def format(self, path :str) -> None:
if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')
- def add_key(self, path :pathlib.Path, password :str):
+ def add_key(self, path :pathlib.Path, password :str) -> bool:
if not path.exists():
raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path))
log(f'Adding additional key-file {path} for {self.partition}', level=logging.INFO)
-
- worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}")
+ worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}",
+ environment_vars={'LC_ALL':'C'})
pw_injected = False
while worker.is_alive():
if b'Enter any existing passphrase' in worker and pw_injected is False:
@@ -169,7 +190,9 @@ class luks2:
if worker.exit_code != 0:
raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}')
- def crypttab(self, installation, key_path :str, options=["luks", "key-slot=1"]):
+ return True
+
+ def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None:
log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO)
with open(f"{installation.target}/etc/crypttab", "a") as crypttab:
crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n")