From e32cf71ae7dacbf9674262705cb2e8e1a5a2d206 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Thu, 6 Jan 2022 22:01:15 +0100 Subject: Added type annotations to all functions (#845) * Added type annotations for 1/5 of the files. There's bound to be some issues with type miss-match, will sort that out later. * Added type hints for 4/5 of the code * Added type hints for 4.7/5 of the code * Added type hints for 5/5 of the code base * Split the linters into individual files This should help with more clearly show which runner is breaking since they don't share a single common name any longer. Also moved mypy settings into pyproject.toml * Fixed some of the last flake8 issues * Missing parameter * Fixed invalid lookahead types * __future__ had to be at the top * Fixed last flake8 issues --- archinstall/lib/luks.py | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) (limited to 'archinstall/lib/luks.py') diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 255c75d9..26f2bc1b 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -1,9 +1,15 @@ +from __future__ import annotations import json import logging import os import pathlib import shlex import time +from typing import Optional, List,TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .installer import Installer + from .disk import Partition, convert_device_to_uuid from .general import SysCommand, SysCommandWorker from .output import log @@ -11,7 +17,15 @@ from .exceptions import SysCallError, DiskError from .storage import storage class luks2: - def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs): + def __init__(self, + partition :Partition, + mountpoint :str, + password :str, + key_file :Optional[str] = None, + auto_unmount :bool = False, + *args :str, + **kwargs :str): + self.password = password self.partition = partition self.mountpoint = mountpoint @@ -22,7 +36,7 @@ class luks2: self.filesystem = 'crypto_LUKS' self.mapdev = None - def __enter__(self): + def __enter__(self) -> Partition: if not self.key_file: self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique? @@ -34,16 +48,23 @@ class luks2: return self.unlock(self.partition, self.mountpoint, self.key_file) - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if self.auto_unmount: self.close() if len(args) >= 2 and args[1]: raise args[1] + return True - def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None): + def encrypt(self, partition :Partition, + password :Optional[str] = None, + key_size :int = 512, + hash_type :str = 'sha512', + iter_time :int = 10000, + key_file :Optional[str] = None) -> str: + log(f'Encrypting {partition} (This might take a while)', level=logging.INFO) if not key_file: @@ -119,7 +140,7 @@ class luks2: return key_file - def unlock(self, partition, mountpoint, key_file): + def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition: """ Mounts a luks2 compatible partition to a certain mountpoint. Keyfile must be specified as there's no way to interact with the pw-prompt atm. @@ -142,18 +163,18 @@ class luks2: unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False) return unlocked_partition - def close(self, mountpoint=None): + def close(self, mountpoint :Optional[str] = None) -> bool: if not mountpoint: mountpoint = self.mapdev SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}') return os.path.islink(self.mapdev) is False - def format(self, path): + def format(self, path :str) -> None: if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') - def add_key(self, path :pathlib.Path, password :str): + def add_key(self, path :pathlib.Path, password :str) -> bool: if not path.exists(): raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path)) @@ -169,7 +190,9 @@ class luks2: if worker.exit_code != 0: raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}') - def crypttab(self, installation, key_path :str, options=["luks", "key-slot=1"]): + return True + + def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None: log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO) with open(f"{installation.target}/etc/crypttab", "a") as crypttab: crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n") -- cgit v1.2.3-70-g09d2 From a8862e9b2420cb1e3919288db624fae676b7e1c3 Mon Sep 17 00:00:00 2001 From: Werner Llácer Date: Thu, 6 Jan 2022 23:26:49 +0100 Subject: Define an standard locale for all the commands executed (#827) * Define an standard locale for all the commands executed inside the application * Allow LC_ALL to be overriden during call * flake8 corrections --- archinstall/lib/general.py | 5 +++-- archinstall/lib/luks.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'archinstall/lib/luks.py') diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index 96c9d50c..f69242c6 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -188,7 +188,8 @@ class SysCommandWorker: self.cmd = cmd self.callbacks = callbacks self.peak_output = peak_output - self.environment_vars = environment_vars + # define the standard locale for command outputs. For now the C ascii one. Can be overriden + self.environment_vars = {'LC_ALL':'C' , **environment_vars} self.logfile = logfile self.working_directory = working_directory @@ -368,7 +369,7 @@ class SysCommand: peak_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, working_directory :Optional[str] = './'): - + _callbacks = {} if callbacks: for hook, func in callbacks.items(): diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 26f2bc1b..d39bce0f 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -179,8 +179,8 @@ class luks2: raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path)) log(f'Adding additional key-file {path} for {self.partition}', level=logging.INFO) - - worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}") + worker = SysCommandWorker(f"/usr/bin/cryptsetup -q -v luksAddKey {self.partition.path} {path}", + environment_vars={'LC_ALL':'C'}) pw_injected = False while worker.is_alive(): if b'Enter any existing passphrase' in worker and pw_injected is False: -- cgit v1.2.3-70-g09d2