Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall')
-rw-r--r--archinstall/lib/configuration.py101
-rw-r--r--archinstall/lib/general.py184
-rw-r--r--archinstall/lib/installer.py205
-rw-r--r--archinstall/lib/locale.py23
-rw-r--r--archinstall/lib/networking.py6
-rw-r--r--archinstall/lib/packages/packages.py4
-rw-r--r--archinstall/lib/pacman.py31
-rw-r--r--archinstall/lib/pacman/__init__.py88
-rw-r--r--archinstall/lib/pacman/config.py33
-rw-r--r--archinstall/lib/pacman/repo.py6
-rw-r--r--archinstall/lib/profile/profiles_handler.py66
11 files changed, 348 insertions, 399 deletions
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py
index 8c7b11fa..aeeddbb8 100644
--- a/archinstall/lib/configuration.py
+++ b/archinstall/lib/configuration.py
@@ -26,7 +26,7 @@ class ConfigurationOutput:
self._config = config
self._user_credentials: Dict[str, Any] = {}
self._user_config: Dict[str, Any] = {}
- self._default_save_path = Path(storage.get('LOG_PATH', '.'))
+ self._default_save_path = storage.get('LOG_PATH', Path('.'))
self._user_config_file = 'user_configuration.json'
self._user_creds_file = "user_credentials.json"
@@ -44,17 +44,17 @@ class ConfigurationOutput:
return self._user_config_file
def _process_config(self):
- for key in self._config:
+ for key, value in self._config.items():
if key in self._sensitive:
- self._user_credentials[key] = self._config[key]
+ self._user_credentials[key] = value
elif key in self._ignore:
pass
else:
- self._user_config[key] = self._config[key]
+ self._user_config[key] = value
# special handling for encryption password
- if key == 'disk_encryption' and self._config[key] is not None:
- self._user_credentials['encryption_password'] = self._config[key].encryption_password
+ if key == 'disk_encryption' and value:
+ self._user_credentials['encryption_password'] = value.encryption_password
def user_config_to_json(self) -> str:
return json.dumps({
@@ -72,42 +72,33 @@ class ConfigurationOutput:
print(_('\nThis is your chosen configuration:'))
debug(" -- Chosen configuration --")
- user_conig = self.user_config_to_json()
- info(user_conig)
-
+ info(self.user_config_to_json())
print()
def _is_valid_path(self, dest_path: Path) -> bool:
- if (not dest_path.exists()) or not (dest_path.is_dir()):
+ dest_path_ok = dest_path.exists() and dest_path.is_dir()
+ if not dest_path_ok:
warn(
f'Destination directory {dest_path.resolve()} does not exist or is not a directory\n.',
'Configuration files can not be saved'
)
- return False
- return True
+ return dest_path_ok
def save_user_config(self, dest_path: Path):
if self._is_valid_path(dest_path):
target = dest_path / self._user_config_file
-
- with open(target, 'w') as config_file:
- config_file.write(self.user_config_to_json())
-
- os.chmod(str(dest_path / self._user_config_file), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
+ target.write_text(self.user_config_to_json())
+ os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
def save_user_creds(self, dest_path: Path):
if self._is_valid_path(dest_path):
if user_creds := self.user_credentials_to_json():
target = dest_path / self._user_creds_file
-
- with open(target, 'w') as config_file:
- config_file.write(user_creds)
-
- os.chmod(str(target), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
+ target.write_text(user_creds)
+ os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
def save(self, dest_path: Optional[Path] = None):
- if not dest_path:
- dest_path = self._default_save_path
+ dest_path = dest_path or self._default_save_path
if self._is_valid_path(dest_path):
self.save_user_config(dest_path)
@@ -116,33 +107,33 @@ class ConfigurationOutput:
def save_config(config: Dict):
def preview(selection: str):
- if options["user_config"] == selection:
- serialized = config_output.user_config_to_json()
- return f"{config_output.user_configuration_file}\n{serialized}"
- elif options["user_creds"] == selection:
- if maybe_serial := config_output.user_credentials_to_json():
- return f"{config_output.user_credentials_file}\n{maybe_serial}"
- else:
+ match options[selection]:
+ case "user_config":
+ serialized = config_output.user_config_to_json()
+ return f"{config_output.user_configuration_file}\n{serialized}"
+ case "user_creds":
+ if maybe_serial := config_output.user_credentials_to_json():
+ return f"{config_output.user_credentials_file}\n{maybe_serial}"
return str(_("No configuration"))
- elif options["all"] == selection:
- output = f"{config_output.user_configuration_file}\n"
- if config_output.user_credentials_to_json():
- output += f"{config_output.user_credentials_file}\n"
- return output[:-1]
+ case "all":
+ output = [config_output.user_configuration_file]
+ if config_output.user_credentials_to_json():
+ output.append(config_output.user_credentials_file)
+ return '\n'.join(output)
return None
try:
config_output = ConfigurationOutput(config)
options = {
- "user_config": str(_("Save user configuration (including disk layout)")),
- "user_creds": str(_("Save user credentials")),
- "all": str(_("Save all")),
+ str(_("Save user configuration (including disk layout)")): "user_config",
+ str(_("Save user credentials")): "user_creds",
+ str(_("Save all")): "all",
}
save_choice = Menu(
_("Choose which configuration to save"),
- list(options.values()),
+ list(options),
sort=False,
skip=True,
preview_size=0.75,
@@ -170,27 +161,21 @@ def save_config(config: Dict):
prompt = _(
"Do you want to save {} configuration file(s) in the following location?\n\n{}"
- ).format(
- list(options.keys())[list(options.values()).index(str(save_choice.value))],
- dest_path.absolute(),
- )
+ ).format(options[str(save_choice.value)], dest_path.absolute())
+
save_confirmation = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run()
if save_confirmation == Menu.no():
return
- debug(
- _("Saving {} configuration files to {}").format(
- list(options.keys())[list(options.values()).index(str(save_choice.value))],
- dest_path.absolute(),
- )
- )
-
- if options["user_config"] == save_choice.value:
- config_output.save_user_config(dest_path)
- elif options["user_creds"] == save_choice.value:
- config_output.save_user_creds(dest_path)
- elif options["all"] == save_choice.value:
- config_output.save_user_config(dest_path)
- config_output.save_user_creds(dest_path)
+ debug("Saving {} configuration files to {}".format(options[str(save_choice.value)], dest_path.absolute()))
+
+ match options[str(save_choice.value)]:
+ case "user_config":
+ config_output.save_user_config(dest_path)
+ case "user_creds":
+ config_output.save_user_creds(dest_path)
+ case "all":
+ config_output.save(dest_path)
+
except KeyboardInterrupt:
return
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index f43d4f57..c85208ec 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -11,13 +11,14 @@ import sys
import time
import re
import urllib.parse
-import urllib.request
+from urllib.request import Request, urlopen
import urllib.error
import pathlib
from datetime import datetime, date
from enum import Enum
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
from select import epoll, EPOLLIN, EPOLLHUP
+from shutil import which
from .exceptions import RequirementError, SysCallError
from .output import debug, error, info
@@ -34,28 +35,17 @@ def generate_password(length :int = 64) -> str:
def locate_binary(name :str) -> str:
- for PATH in os.environ['PATH'].split(':'):
- for root, folders, files in os.walk(PATH):
- for file in files:
- if file == name:
- return os.path.join(root, file)
- break # Don't recurse
-
+ if path := which(name):
+ return path
raise RequirementError(f"Binary {name} does not exist.")
def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]:
# https://stackoverflow.com/a/43627833/929999
- if type(data) == bytes:
- byte_vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8')
- data = re.sub(byte_vt100_escape_regex, b'', data)
- elif type(data) == str:
- vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
- data = re.sub(vt100_escape_regex, '', data)
- else:
- raise ValueError(f'Unsupported data type: {type(data)}')
-
- return data
+ vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
+ if isinstance(data, bytes):
+ return re.sub(vt100_escape_regex.encode(), b'', data)
+ return re.sub(vt100_escape_regex, '', data)
def jsonify(obj: Any, safe: bool = True) -> Any:
@@ -120,21 +110,15 @@ class SysCommandWorker:
working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True
):
- if not callbacks:
- callbacks = {}
+ callbacks = callbacks or {}
+ environment_vars = environment_vars or {}
- if not environment_vars:
- environment_vars = {}
-
- if type(cmd) is str:
+ if isinstance(cmd, str):
cmd = shlex.split(cmd)
- cmd = list(cmd) # This is to please mypy
- if cmd[0][0] != '/' and cmd[0][:2] != './':
- # "which" doesn't work as it's a builtin to bash.
- # It used to work, but for whatever reason it doesn't anymore.
- # We there for fall back on manual lookup in os.PATH
- cmd[0] = locate_binary(cmd[0])
+ if cmd:
+ if cmd[0][0] != '/' and cmd[0][:2] != './': # pathlib.Path does not work well
+ cmd[0] = locate_binary(cmd[0])
self.cmd = cmd
self.callbacks = callbacks
@@ -158,29 +142,36 @@ class SysCommandWorker:
Contains will also move the current buffert position forward.
This is to avoid re-checking the same data when looking for output.
"""
- assert type(key) == bytes
+ assert isinstance(key, bytes)
- if (contains := key in self._trace_log[self._trace_log_pos:]):
- self._trace_log_pos += self._trace_log[self._trace_log_pos:].find(key) + len(key)
+ index = self._trace_log.find(key, self._trace_log_pos)
+ if index >= 0:
+ self._trace_log_pos += index + len(key)
+ return True
- return contains
+ return False
def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]:
- for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'):
- if line:
- escaped_line: bytes = line
-
- if self.remove_vt100_escape_codes_from_lines:
- escaped_line = clear_vt100_escape_codes(line) # type: ignore
+ last_line = self._trace_log.rfind(b'\n')
+ lines = filter(None, self._trace_log[self._trace_log_pos:last_line].splitlines())
+ for line in lines:
+ if self.remove_vt100_escape_codes_from_lines:
+ line = clear_vt100_escape_codes(line) # type: ignore
- yield escaped_line + b'\n'
+ yield line + b'\n'
- self._trace_log_pos = self._trace_log.rfind(b'\n')
+ self._trace_log_pos = last_line
def __repr__(self) -> str:
self.make_sure_we_are_executing()
return str(self._trace_log)
+ def __str__(self) -> str:
+ try:
+ return self._trace_log.decode('utf-8')
+ except UnicodeDecodeError:
+ return str(self._trace_log)
+
def __enter__(self) -> 'SysCommandWorker':
return self
@@ -205,7 +196,7 @@ class SysCommandWorker:
if self.exit_code != 0:
raise SysCallError(
- f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self._trace_log[-500:])}",
+ f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self)[-500:]}",
self.exit_code,
worker=self
)
@@ -244,7 +235,7 @@ class SysCommandWorker:
def peak(self, output: Union[str, bytes]) -> bool:
if self.peek_output:
- if type(output) == bytes:
+ if isinstance(output, bytes):
try:
output = output.decode('UTF-8')
except UnicodeDecodeError:
@@ -282,7 +273,7 @@ class SysCommandWorker:
self.ended = time.time()
break
- if self.ended or (got_output is False and _pid_exists(self.pid) is False):
+ if self.ended or (not got_output and not _pid_exists(self.pid)):
self.ended = time.time()
try:
wait_status = os.waitpid(self.pid, 0)[1]
@@ -321,10 +312,8 @@ class SysCommandWorker:
if change_perm:
os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
- except PermissionError:
- pass
+ except (PermissionError, FileNotFoundError):
# If history_logfile does not exist, ignore the error
- except FileNotFoundError:
pass
except Exception as e:
exception_type = type(e).__name__
@@ -355,22 +344,18 @@ class SysCommandWorker:
class SysCommand:
def __init__(self,
cmd :Union[str, List[str]],
- callbacks :Optional[Dict[str, Callable[[Any], Any]]] = None,
+ callbacks :Dict[str, Callable[[Any], Any]] = {},
start_callback :Optional[Callable[[Any], Any]] = None,
peek_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None,
working_directory :Optional[str] = './',
remove_vt100_escape_codes_from_lines :bool = True):
- _callbacks = {}
- if callbacks:
- for hook, func in callbacks.items():
- _callbacks[hook] = func
+ self._callbacks = callbacks.copy()
if start_callback:
- _callbacks['on_start'] = start_callback
+ self._callbacks['on_start'] = start_callback
self.cmd = cmd
- self._callbacks = _callbacks
self.peek_output = peek_output
self.environment_vars = environment_vars
self.working_directory = working_directory
@@ -398,17 +383,15 @@ class SysCommand:
if not self.session:
raise KeyError(f"SysCommand() does not have an active session.")
elif type(key) is slice:
- start = key.start if key.start else 0
- end = key.stop if key.stop else len(self.session._trace_log)
+ start = key.start or 0
+ end = key.stop or len(self.session._trace_log)
return self.session._trace_log[start:end]
else:
raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.")
def __repr__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> str:
- if self.session:
- return self.session._trace_log.decode('UTF-8', errors='backslashreplace')
- return ''
+ return self.decode('UTF-8', errors='backslashreplace') or ''
def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]:
return {
@@ -416,7 +399,7 @@ class SysCommand:
'callbacks': self._callbacks,
'peak': self.peek_output,
'environment_vars': self.environment_vars,
- 'session': True if self.session else False
+ 'session': self.session is not None
}
def create_session(self) -> bool:
@@ -436,10 +419,9 @@ class SysCommand:
remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines,
working_directory=self.working_directory) as session:
- if not self.session:
- self.session = session
+ self.session = session
- while self.session.ended is None:
+ while not self.session.ended:
self.session.poll()
if self.peek_output:
@@ -448,9 +430,9 @@ class SysCommand:
return True
- def decode(self, fmt :str = 'UTF-8') -> Optional[str]:
+ def decode(self, *args, **kwargs) -> Optional[str]:
if self.session:
- return self.session._trace_log.decode(fmt)
+ return self.session._trace_log.decode(*args, **kwargs)
return None
@property
@@ -476,54 +458,52 @@ def _pid_exists(pid: int) -> bool:
def run_custom_user_commands(commands :List[str], installation :Installer) -> None:
for index, command in enumerate(commands):
+ script_path = f"/var/tmp/user-command.{index}.sh"
+ chroot_path = installation.target / script_path
+
info(f'Executing custom command "{command}" ...')
-
- with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script:
- temp_script.write(command)
-
- SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh")
-
- os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")
+ chroot_path.write_text(command)
+ SysCommand(f"arch-chroot {installation.target} bash {script_path}")
+
+ os.unlink(chroot_path)
def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool :
"""
- Function to load a stream (file (as name) or valid JSON string into an existing dictionary
- Returns true if it could be done
- Return false if operation could not be executed
+ Load a JSON encoded dictionary from a stream and merge it into an existing dictionary.
+ A stream can be a filepath, a URL or a raw JSON string.
+ Returns True if the operation succeeded, False otherwise.
+configuration_identifier is just a parameter to get meaningful, but not so long messages
"""
- parsed_url = urllib.parse.urlparse(stream)
-
- if parsed_url.scheme: # The stream is in fact a URL that should be grabbed
+ raw: Optional[str] = None
+ # Try using the stream as a URL that should be grabbed
+ if urllib.parse.urlparse(stream).scheme:
try:
- with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
- target.update(json.loads(response.read()))
+ with urlopen(Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
+ raw = response.read()
except urllib.error.HTTPError as err:
- error(f"Could not load {configuration_identifier} via {parsed_url} due to: {err}")
+ error(f"Could not fetch JSON from {stream} as {configuration_identifier}: {err}")
return False
- else:
- if pathlib.Path(stream).exists():
- try:
- with pathlib.Path(stream).open() as fh:
- target.update(json.load(fh))
- except Exception as err:
- error(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {err}")
- return False
- else:
- # NOTE: This is a rudimentary check if what we're trying parse is a dict structure.
- # Which is the only structure we tolerate anyway.
- if stream.strip().startswith('{') and stream.strip().endswith('}'):
- try:
- target.update(json.loads(stream))
- except Exception as e:
- error(f"{configuration_identifier} Contains an invalid JSON format: {e}")
- return False
- else:
- error(f"{configuration_identifier} is neither a file nor is a JSON string")
- return False
+ # Try using the stream as a filepath that should be read
+ if raw is None and (path := pathlib.Path(stream)).exists():
+ try:
+ raw = path.read_text()
+ except Exception as err:
+ error(f"Could not read file {stream} as {configuration_identifier}: {err}")
+ return False
+
+ try:
+ # We use `or` to try the stream as raw JSON to be parsed
+ structure = json.loads(raw or stream)
+ except Exception as err:
+ error(f"{configuration_identifier} contains an invalid JSON format: {err}")
+ return False
+ if not isinstance(structure, dict):
+ error(f"{stream} passed as {configuration_identifier} is not a JSON encoded dictionary")
+ return False
+ target.update(structure)
return True
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index f1a7f71a..ee546993 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -20,7 +20,8 @@ from .models.bootloader import Bootloader
from .models.network_configuration import NetworkConfiguration
from .models.users import User
from .output import log, error, info, warn, debug
-from .pacman import run_pacman
+from . import pacman
+from .pacman import Pacman
from .plugins import plugins
from .storage import storage
@@ -52,27 +53,16 @@ class Installer:
`Installer()` is the wrapper for most basic installation steps.
It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
"""
- if not base_packages:
- base_packages = __packages__[:3]
-
- if kernels is None:
- self.kernels = ['linux']
- else:
- self.kernels = kernels
-
+ self.base_packages = base_packages or __packages__[:3]
+ self.kernels = kernels or ['linux']
self._disk_config = disk_config
- if disk_encryption is None:
- self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption)
- else:
- self._disk_encryption = disk_encryption
-
+ self._disk_encryption = disk_encryption or disk.DiskEncryption(disk.EncryptionType.NoEncryption)
self.target: Path = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None}
- self.base_packages = base_packages
for kernel in self.kernels:
self.base_packages.append(kernel)
@@ -101,6 +91,7 @@ class Installer:
self._fstab_entries: List[str] = []
self._zram_enabled = False
+ self.pacman = Pacman(self.target, storage['arguments'].get('silent', False))
def __enter__(self) -> 'Installer':
return self
@@ -189,35 +180,33 @@ class Installer:
# partitions have to mounted in the right order on btrfs the mountpoint will
# be empty as the actual subvolumes are getting mounted instead so we'll use
# '/' just for sorting
- sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint if x.mountpoint else Path('/'))
+ sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/'))
+ enc_partitions = []
if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption:
- enc_partitions = list(filter(lambda x: x in self._disk_encryption.partitions, sorted_part_mods))
- else:
- enc_partitions = []
+ enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions))
# attempt to decrypt all luks partitions
luks_handlers = self._prepare_luks_partitions(enc_partitions)
for part_mod in sorted_part_mods:
- if part_mod not in luks_handlers: # partition is not encrypted
+ if luks_handler := luks_handlers.get(part_mod):
+ # mount encrypted partition
+ self._mount_luks_partiton(part_mod, luks_handler)
+ else:
+ # partition is not encrypted
self._mount_partition(part_mod)
- else: # mount encrypted partition
- self._mount_luks_partiton(part_mod, luks_handlers[part_mod])
def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[disk.PartitionModification, Luks2]:
- luks_handlers = {}
-
- for part_mod in partitions:
- if part_mod.mapper_name and part_mod.dev_path:
- luks_handler = disk.device_handler.unlock_luks2_dev(
- part_mod.dev_path,
- part_mod.mapper_name,
- self._disk_encryption.encryption_password
- )
- luks_handlers[part_mod] = luks_handler
-
- return luks_handlers
+ return {
+ part_mod: disk.device_handler.unlock_luks2_dev(
+ part_mod.dev_path,
+ part_mod.mapper_name,
+ self._disk_encryption.encryption_password
+ )
+ for part_mod in partitions
+ if part_mod.mapper_name and part_mod.dev_path
+ }
def _mount_partition(self, part_mod: disk.PartitionModification):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined
@@ -302,93 +291,6 @@ class Installer:
def post_install_check(self, *args :str, **kwargs :str) -> List[str]:
return [step for step, flag in self.helper_flags.items() if flag is False]
- def enable_multilib_repository(self):
- # Set up a regular expression pattern of a commented line containing 'multilib' within []
- pattern = re.compile(r"^#\s*\[multilib\]$")
-
- # This is used to track if the previous line is a match, so we end up uncommenting the line after the block.
- matched = False
-
- # Read in the lines from the original file
- with open("/etc/pacman.conf", "r") as pacman_conf:
- lines = pacman_conf.readlines()
-
- # Open the file again in write mode, to replace the contents
- with open("/etc/pacman.conf", "w") as pacman_conf:
- for line in lines:
- if pattern.match(line):
- # If this is the [] block containing 'multilib', uncomment it and set the matched tracking boolean.
- pacman_conf.write(line.lstrip('#'))
- matched = True
- elif matched:
- # The previous line was a match for [.*multilib.*].
- # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist'
- pacman_conf.write(line.lstrip('#'))
- matched = False # Reset the state of matched to False.
- else:
- pacman_conf.write(line)
-
- def enable_testing_repositories(self, enable_multilib_testing=False):
- # Set up a regular expression pattern of a commented line containing 'testing' within []
- pattern = re.compile("^#\\[.*testing.*\\]$")
-
- # This is used to track if the previous line is a match, so we end up uncommenting the line after the block.
- matched = False
-
- # Read in the lines from the original file
- with open("/etc/pacman.conf", "r") as pacman_conf:
- lines = pacman_conf.readlines()
-
- # Open the file again in write mode, to replace the contents
- with open("/etc/pacman.conf", "w") as pacman_conf:
- for line in lines:
- if pattern.match(line) and (enable_multilib_testing or 'multilib' not in line):
- # If this is the [] block containing 'testing', uncomment it and set the matched tracking boolean.
- pacman_conf.write(line.lstrip('#'))
- matched = True
- elif matched:
- # The previous line was a match for [.*testing.*].
- # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist'
- pacman_conf.write(line.lstrip('#'))
- matched = False # Reset the state of matched to False.
- else:
- pacman_conf.write(line)
-
- def _pacstrap(self, packages: Union[str, List[str]]) -> bool:
- if isinstance(packages, str):
- packages = [packages]
-
- for plugin in plugins.values():
- if hasattr(plugin, 'on_pacstrap'):
- if (result := plugin.on_pacstrap(packages)):
- packages = result
-
- info(f'Installing packages: {packages}')
-
- # TODO: We technically only need to run the -Syy once.
- try:
- run_pacman('-Syy', default_cmd='/usr/bin/pacman')
- except SysCallError as err:
- error(f'Could not sync a new package database: {err}')
-
- if storage['arguments'].get('silent', False) is False:
- if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
- return self._pacstrap(packages)
-
- raise RequirementError(f'Could not sync mirrors: {err}')
-
- try:
- SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True)
- return True
- except SysCallError as err:
- error(f'Could not strap in packages: {err}')
-
- if storage['arguments'].get('silent', False) is False:
- if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
- return self._pacstrap(packages)
-
- raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
-
def set_mirrors(self, mirror_config: MirrorConfiguration):
for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'):
@@ -402,7 +304,8 @@ class Installer:
add_custom_mirrors(mirror_config.custom_mirrors)
def genfstab(self, flags :str = '-pU'):
- info(f"Updating {self.target}/etc/fstab")
+ fstab_path = self.target / "etc" / "fstab"
+ info(f"Updating {fstab_path}")
try:
gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode()
@@ -412,10 +315,10 @@ class Installer:
if not gen_fstab:
raise RequirementError(f'Genrating fstab returned empty value')
- with open(f"{self.target}/etc/fstab", 'a') as fp:
+ with open(fstab_path, 'a') as fp:
fp.write(gen_fstab)
- if not os.path.isfile(f'{self.target}/etc/fstab'):
+ if not fstab_path.is_file():
raise RequirementError(f'Could not create fstab file')
for plugin in plugins.values():
@@ -423,7 +326,7 @@ class Installer:
if plugin.on_genfstab(self) is True:
break
- with open(f"{self.target}/etc/fstab", 'a') as fp:
+ with open(fstab_path, 'a') as fp:
for entry in self._fstab_entries:
fp.write(f'{entry}\n')
@@ -432,9 +335,7 @@ class Installer:
if part_mod.fs_type != disk.FilesystemType.Btrfs:
continue
- fstab_file = Path(f'{self.target}/etc/fstab')
-
- with fstab_file.open('r') as fp:
+ with fstab_path.open('r') as fp:
fstab = fp.readlines()
# Replace the {installation}/etc/fstab with entries
@@ -456,7 +357,7 @@ class Installer:
fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}')
break
- with fstab_file.open('w') as fp:
+ with fstab_path.open('w') as fp:
fp.writelines(fstab)
def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
@@ -486,8 +387,7 @@ class Installer:
with open(f'{self.target}/etc/locale.gen', 'a') as fh:
fh.write(f'{lang}.{encoding}{modifier} {encoding}\n')
- with open(f'{self.target}/etc/locale.conf', 'w') as fh:
- fh.write(f'LANG={lang}.{encoding}{modifier}\n')
+ (self.target / "etc" / "locale.conf").write_text(f'LANG={lang}.{encoding}{modifier}\n')
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen')
@@ -561,16 +461,13 @@ class Installer:
for plugin in plugins.values():
if hasattr(plugin, 'on_configure_nic'):
- new_conf = plugin.on_configure_nic(
+ conf = plugin.on_configure_nic(
network_config.iface,
network_config.dhcp,
network_config.ip,
network_config.gateway,
network_config.dns
- )
-
- if new_conf:
- conf = new_conf
+ ) or conf
with open(f"{self.target}/etc/systemd/network/10-{network_config.iface}.network", "a") as netconf:
netconf.write(str(conf))
@@ -597,7 +494,7 @@ class Installer:
# Otherwise, we can go ahead and add the required package
# and enable it's service:
else:
- self._pacstrap('iwd')
+ self.pacman.strap('iwd')
self.enable_service('iwd')
for psk in psk_files:
@@ -683,7 +580,7 @@ class Installer:
if part in self._disk_encryption.partitions:
if self._disk_encryption.hsm_device:
# Required bby mkinitcpio to add support for fido2-device options
- self._pacstrap('libfido2')
+ self.pacman.strap('libfido2')
if 'sd-encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
@@ -709,24 +606,27 @@ class Installer:
# Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set.
# This action takes place on the host system as pacstrap copies over package repository lists.
+ pacman_conf = pacman.Config(self.target)
if multilib:
info("The multilib flag is set. This system will be installed with the multilib repository enabled.")
- self.enable_multilib_repository()
+ pacman_conf.enable(pacman.Repo.Multilib)
else:
info("The multilib flag is not set. This system will be installed without multilib repositories enabled.")
if testing:
info("The testing flag is set. This system will be installed with testing repositories enabled.")
- self.enable_testing_repositories(multilib)
+ pacman_conf.enable(pacman.Repo.Testing)
+ if multilib:
+ pacman_conf.enable(pacman.Repo.MultilibTesting)
else:
info("The testing flag is not set. This system will be installed without testing repositories enabled.")
- self._pacstrap(self.base_packages)
+ pacman_conf.apply()
+
+ self.pacman.strap(self.base_packages)
self.helper_flags['base-strapped'] = True
- # This handles making sure that the repositories we enabled persist on the installed system
- if multilib or testing:
- shutil.copy2("/etc/pacman.conf", f"{self.target}/etc/pacman.conf")
+ pacman_conf.persist()
# Periodic TRIM may improve the performance and longevity of SSDs whilst
# having no adverse effect on other devices. Most distributions enable
@@ -761,7 +661,7 @@ class Installer:
def setup_swap(self, kind :str = 'zram'):
if kind == 'zram':
info(f"Setting up swap on zram")
- self._pacstrap('zram-generator')
+ self.pacman.strap('zram-generator')
# We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813
# zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example'
@@ -788,7 +688,7 @@ class Installer:
return None
def _add_systemd_bootloader(self, root_partition: disk.PartitionModification):
- self._pacstrap('efibootmgr')
+ self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi():
raise HardwareIncompatibilityError
@@ -897,7 +797,7 @@ class Installer:
boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification
):
- self._pacstrap('grub') # no need?
+ self.pacman.strap('grub') # no need?
_file = "/etc/default/grub"
@@ -916,7 +816,7 @@ class Installer:
info(f"GRUB boot partition: {boot_partition.dev_path}")
if SysInfo.has_uefi():
- self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
+ self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
@@ -955,7 +855,7 @@ class Installer:
boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification
):
- self._pacstrap('efibootmgr')
+ self.pacman.strap('efibootmgr')
if not SysInfo.has_uefi():
raise HardwareIncompatibilityError
@@ -1030,9 +930,6 @@ class Installer:
if plugin.on_add_bootloader(self):
return True
- if type(self.target) == str:
- self.target = Path(self.target)
-
boot_partition = self._get_boot_partition()
root_partition = self._get_root_partition()
@@ -1053,7 +950,7 @@ class Installer:
self._add_efistub_bootloader(boot_partition, root_partition)
def add_additional_packages(self, packages: Union[str, List[str]]) -> bool:
- return self._pacstrap(packages)
+ return self.pacman.strap(packages)
def _enable_users(self, service: str, users: List[User]):
for user in users:
@@ -1214,7 +1111,7 @@ class Installer:
return True
- def _service_started(self, service_name: str) -> str | None:
+ def _service_started(self, service_name: str) -> Optional[str]:
if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'):
service_name += '.service' # Just to be safe
diff --git a/archinstall/lib/locale.py b/archinstall/lib/locale.py
index 0a36c072..ab158984 100644
--- a/archinstall/lib/locale.py
+++ b/archinstall/lib/locale.py
@@ -1,3 +1,5 @@
+from itertools import takewhile
+from pathlib import Path
from typing import Iterator, List
from .exceptions import ServiceException, SysCallError
@@ -11,21 +13,12 @@ def list_keyboard_languages() -> Iterator[str]:
def list_locales() -> List[str]:
- with open('/etc/locale.gen', 'r') as fp:
- locales = []
- # before the list of locales begins there's an empty line with a '#' in front
- # so we'll collect the localels from bottom up and halt when we're donw
- entries = fp.readlines()
- entries.reverse()
-
- for entry in entries:
- text = entry.replace('#', '').strip()
- if text == '':
- break
- locales.append(text)
-
- locales.reverse()
- return locales
+ entries = Path('/etc/locale.gen').read_text().splitlines()
+ # Before the list of locales begins there's an empty line with a '#' in front
+ # so we'll collect the locales from bottom up and halt when we're done.
+ locales = list(takewhile(bool, map(lambda entry: entry.strip('\n\t #'), reversed(entries))))
+ locales.reverse()
+ return locales
def list_x11_keyboard_languages() -> Iterator[str]:
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index 6906c320..2a086f39 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -9,7 +9,7 @@ from urllib.request import urlopen
from .exceptions import SysCallError
from .output import error, info, debug
-from .pacman import run_pacman
+from .pacman import Pacman
def get_hw_addr(ifname :str) -> str:
@@ -35,7 +35,7 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
def check_mirror_reachable() -> bool:
info("Testing connectivity to the Arch Linux mirrors...")
try:
- run_pacman("-Sy")
+ Pacman.run("-Sy")
return True
except SysCallError as err:
if os.geteuid() != 0:
@@ -48,7 +48,7 @@ def check_mirror_reachable() -> bool:
def update_keyring() -> bool:
info("Updating archlinux-keyring ...")
try:
- run_pacman("-Sy --noconfirm archlinux-keyring")
+ Pacman.run("-Sy --noconfirm archlinux-keyring")
return True
except SysCallError:
if os.geteuid() != 0:
diff --git a/archinstall/lib/packages/packages.py b/archinstall/lib/packages/packages.py
index 71818ca5..b71b0ce8 100644
--- a/archinstall/lib/packages/packages.py
+++ b/archinstall/lib/packages/packages.py
@@ -8,7 +8,7 @@ from urllib.request import urlopen
from ..exceptions import PackageError, SysCallError
from ..models.gen import PackageSearch, PackageSearchResult, LocalPackage
-from ..pacman import run_pacman
+from ..pacman import Pacman
BASE_URL_PKG_SEARCH = 'https://archlinux.org/packages/search/json/'
# BASE_URL_PKG_CONTENT = 'https://archlinux.org/packages/search/json/'
@@ -106,7 +106,7 @@ def validate_package_list(packages :list) -> Tuple[list, list]:
def installed_package(package :str) -> LocalPackage:
package_info = {}
try:
- for line in run_pacman(f"-Q --info {package}"):
+ for line in Pacman.run(f"-Q --info {package}"):
if b':' in line:
key, value = line.decode().split(':', 1)
package_info[key.strip().lower().replace(' ', '_')] = value.strip()
diff --git a/archinstall/lib/pacman.py b/archinstall/lib/pacman.py
deleted file mode 100644
index f5514f05..00000000
--- a/archinstall/lib/pacman.py
+++ /dev/null
@@ -1,31 +0,0 @@
-import pathlib
-import time
-from typing import TYPE_CHECKING, Any
-
-from .general import SysCommand
-from .output import warn, error
-
-if TYPE_CHECKING:
- _: Any
-
-
-def run_pacman(args :str, default_cmd :str = 'pacman') -> SysCommand:
- """
- A centralized function to call `pacman` from.
- It also protects us from colliding with other running pacman sessions (if used locally).
- The grace period is set to 10 minutes before exiting hard if another pacman instance is running.
- """
- pacman_db_lock = pathlib.Path('/var/lib/pacman/db.lck')
-
- if pacman_db_lock.exists():
- warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.'))
-
- started = time.time()
- while pacman_db_lock.exists():
- time.sleep(0.25)
-
- if time.time() - started > (60 * 10):
- error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.'))
- exit(1)
-
- return SysCommand(f'{default_cmd} {args}')
diff --git a/archinstall/lib/pacman/__init__.py b/archinstall/lib/pacman/__init__.py
new file mode 100644
index 00000000..6478f0cc
--- /dev/null
+++ b/archinstall/lib/pacman/__init__.py
@@ -0,0 +1,88 @@
+from pathlib import Path
+import time
+import re
+from typing import TYPE_CHECKING, Any, List, Callable, Union
+from shutil import copy2
+
+from ..general import SysCommand
+from ..output import warn, error, info
+from .repo import Repo
+from .config import Config
+from ..exceptions import RequirementError
+from ..plugins import plugins
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class Pacman:
+
+ def __init__(self, target: Path, silent: bool = False):
+ self.synced = False
+ self.silent = silent
+ self.target = target
+
+ @staticmethod
+ def run(args :str, default_cmd :str = 'pacman') -> SysCommand:
+ """
+ A centralized function to call `pacman` from.
+ It also protects us from colliding with other running pacman sessions (if used locally).
+ The grace period is set to 10 minutes before exiting hard if another pacman instance is running.
+ """
+ pacman_db_lock = Path('/var/lib/pacman/db.lck')
+
+ if pacman_db_lock.exists():
+ warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.'))
+
+ started = time.time()
+ while pacman_db_lock.exists():
+ time.sleep(0.25)
+
+ if time.time() - started > (60 * 10):
+ error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.'))
+ exit(1)
+
+ return SysCommand(f'{default_cmd} {args}')
+
+ def ask(self, error_message: str, bail_message: str, func: Callable, *args, **kwargs):
+ while True:
+ try:
+ func(*args, **kwargs)
+ break
+ except Exception as err:
+ error(f'{error_message}: {err}')
+ if not self.silent and input('Would you like to re-try this download? (Y/n): ').lower().strip() in 'y':
+ continue
+ raise RequirementError(f'{bail_message}: {err}')
+
+ def sync(self):
+ if self.synced:
+ return
+ self.ask(
+ 'Could not sync a new package database',
+ 'Could not sync mirrors',
+ self.run,
+ '-Syy',
+ default_cmd='/usr/bin/pacman'
+ )
+ self.synced = True
+
+ def strap(self, packages: Union[str, List[str]]):
+ self.sync()
+ if isinstance(packages, str):
+ packages = [packages]
+
+ for plugin in plugins.values():
+ if hasattr(plugin, 'on_pacstrap'):
+ if (result := plugin.on_pacstrap(packages)):
+ packages = result
+
+ info(f'Installing packages: {packages}')
+
+ self.ask(
+ 'Could not strap in packages',
+ 'Pacstrap failed. See /var/log/archinstall/install.log or above message for error details',
+ SysCommand,
+ f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm',
+ peek_output=True
+ )
diff --git a/archinstall/lib/pacman/config.py b/archinstall/lib/pacman/config.py
new file mode 100644
index 00000000..60d202bc
--- /dev/null
+++ b/archinstall/lib/pacman/config.py
@@ -0,0 +1,33 @@
+import re
+from pathlib import Path
+from shutil import copy2
+from typing import List
+
+from .repo import Repo
+
+
+class Config:
+ def __init__(self, target: Path):
+ self.path = Path("/etc") / "pacman.conf"
+ self.chroot_path = target / "etc" / "pacman.conf"
+ self.patterns: List[re.Pattern] = []
+
+ def enable(self, repo: Repo):
+ self.patterns.append(re.compile(r"^#\s*\[{}\]$".format(repo.value)))
+
+ def apply(self):
+ if not self.patterns:
+ return
+ lines = iter(self.path.read_text().splitlines(keepends=True))
+ with open(self.path, 'w') as f:
+ for line in lines:
+ if any(pattern.match(line) for pattern in self.patterns):
+ # Uncomment this line and the next.
+ f.write(line.lstrip('#'))
+ f.write(next(lines).lstrip('#'))
+ else:
+ f.write(line)
+
+ def persist(self):
+ if self.patterns:
+ copy2(self.path, self.chroot_path)
diff --git a/archinstall/lib/pacman/repo.py b/archinstall/lib/pacman/repo.py
new file mode 100644
index 00000000..b4106f97
--- /dev/null
+++ b/archinstall/lib/pacman/repo.py
@@ -0,0 +1,6 @@
+from enum import Enum
+
+class Repo(Enum):
+ Multilib = "multilib"
+ Testing = "testing"
+ MultilibTesting = "multilib-testing"
diff --git a/archinstall/lib/profile/profiles_handler.py b/archinstall/lib/profile/profiles_handler.py
index 4e7c3d2b..74c21824 100644
--- a/archinstall/lib/profile/profiles_handler.py
+++ b/archinstall/lib/profile/profiles_handler.py
@@ -98,14 +98,19 @@ class ProfileHandler:
profile = self.get_profile_by_name(main) if main else None
valid: List[Profile] = []
+ details: List[str] = profile_config.get('details', [])
+ if details:
+ valid = []
+ invalid = []
- if details := profile_config.get('details', []):
- resolved = {detail: self.get_profile_by_name(detail) for detail in details if detail}
- valid = [p for p in resolved.values() if p is not None]
- invalid = ', '.join([k for k, v in resolved.items() if v is None])
+ for detail in filter(None, details):
+ if profile := self.get_profile_by_name(detail):
+ valid.append(profile)
+ else:
+ invalid.append(detail)
if invalid:
- info(f'No profile definition found: {invalid}')
+ info('No profile definition found: {}'.format(', '.join(invalid)))
custom_settings = profile_config.get('custom_settings', {})
for profile in valid:
@@ -123,14 +128,12 @@ class ProfileHandler:
"""
List of all available default_profiles
"""
- if self._profiles is None:
- self._profiles = self._find_available_profiles()
+ self._profiles = self._profiles or self._find_available_profiles()
return self._profiles
@cached_property
def _local_mac_addresses(self) -> List[str]:
- ifaces = list_interfaces()
- return list(ifaces.keys())
+ return list(list_interfaces())
def add_custom_profiles(self, profiles: Union[TProfile, List[TProfile]]):
if not isinstance(profiles, list):
@@ -190,25 +193,20 @@ class ProfileHandler:
def install_gfx_driver(self, install_session: 'Installer', driver: Optional[GfxDriver]):
try:
- driver_pkgs = driver.packages() if driver else []
- pkg_names = [p.value for p in driver_pkgs]
- additional_pkg = ' '.join(['xorg-server', 'xorg-xinit'] + pkg_names)
if driver is not None:
- # Find the intersection between the set of known nvidia drivers
- # and the selected driver packages. Since valid intesections can
- # only have one element or none, we iterate and try to take the
- # first element.
- if driver_pkg := next(iter({GfxPackage.Nvidia, GfxPackage.NvidiaOpen} & set(driver_pkgs)), None):
- if any(kernel in install_session.base_packages for kernel in ("linux-lts", "linux-zen")):
- for kernel in install_session.kernels:
- # Fixes https://github.com/archlinux/archinstall/issues/585
- install_session.add_additional_packages(f"{kernel}-headers")
+ driver_pkgs = driver.packages()
+ pkg_names = [p.value for p in driver_pkgs]
+ for driver_pkg in {GfxPackage.Nvidia, GfxPackage.NvidiaOpen} & set(driver_pkgs):
+ for kernel in {"linux-lts", "linux-zen"} & set(install_session.kernels):
+ # Fixes https://github.com/archlinux/archinstall/issues/585
+ install_session.add_additional_packages(f"{kernel}-headers")
# I've had kernel regen fail if it wasn't installed before nvidia-dkms
- install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', f'{driver_pkg}-dkms'])
- return
- elif 'amdgpu' in driver_pkgs:
+ install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', f'{driver_pkg.value}-dkms'])
+ # Return after first driver match, since it is impossible to use both simultaneously.
+ return
+ if 'amdgpu' in driver_pkgs:
# The order of these two are important if amdgpu is installed #808
if 'amdgpu' in install_session.modules:
install_session.modules.remove('amdgpu')
@@ -218,23 +216,24 @@ class ProfileHandler:
install_session.modules.remove('radeon')
install_session.modules.append('radeon')
- install_session.add_additional_packages(additional_pkg)
+ install_session.add_additional_packages(pkg_names)
except Exception as err:
warn(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}")
# Prep didn't run, so there's no driver to install
- install_session.add_additional_packages(['xorg-server', 'xorg-xinit'])
+ install_session.add_additional_packages(['xorg-server', 'xorg-xinit'])
def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration):
profile = profile_config.profile
- if profile:
- profile.install(install_session)
+ if not profile:
+ return
- if profile and profile_config.gfx_driver:
- if profile.is_xorg_type_profile() or profile.is_desktop_type_profile():
- self.install_gfx_driver(install_session, profile_config.gfx_driver)
+ profile.install(install_session)
- if profile and profile_config.greeter:
+ if profile_config.gfx_driver and (profile.is_xorg_type_profile() or profile.is_desktop_type_profile()):
+ self.install_gfx_driver(install_session, profile_config.gfx_driver)
+
+ if profile_config.greeter:
self.install_greeter(install_session, profile_config.greeter)
def _import_profile_from_url(self, url: str):
@@ -312,8 +311,7 @@ class ProfileHandler:
debug(f'Importing profile: {file}')
try:
- spec = importlib.util.spec_from_file_location(name, file)
- if spec is not None:
+ if spec := importlib.util.spec_from_file_location(name, file):
imported = importlib.util.module_from_spec(spec)
if spec.loader is not None:
spec.loader.exec_module(imported)