index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/configuration.py | 101 | ||||
-rw-r--r-- | archinstall/lib/general.py | 184 | ||||
-rw-r--r-- | archinstall/lib/installer.py | 205 | ||||
-rw-r--r-- | archinstall/lib/locale.py | 23 | ||||
-rw-r--r-- | archinstall/lib/networking.py | 6 | ||||
-rw-r--r-- | archinstall/lib/packages/packages.py | 4 | ||||
-rw-r--r-- | archinstall/lib/pacman.py | 31 | ||||
-rw-r--r-- | archinstall/lib/pacman/__init__.py | 88 | ||||
-rw-r--r-- | archinstall/lib/pacman/config.py | 33 | ||||
-rw-r--r-- | archinstall/lib/pacman/repo.py | 6 | ||||
-rw-r--r-- | archinstall/lib/profile/profiles_handler.py | 66 |
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py index 8c7b11fa..aeeddbb8 100644 --- a/archinstall/lib/configuration.py +++ b/archinstall/lib/configuration.py @@ -26,7 +26,7 @@ class ConfigurationOutput: self._config = config self._user_credentials: Dict[str, Any] = {} self._user_config: Dict[str, Any] = {} - self._default_save_path = Path(storage.get('LOG_PATH', '.')) + self._default_save_path = storage.get('LOG_PATH', Path('.')) self._user_config_file = 'user_configuration.json' self._user_creds_file = "user_credentials.json" @@ -44,17 +44,17 @@ class ConfigurationOutput: return self._user_config_file def _process_config(self): - for key in self._config: + for key, value in self._config.items(): if key in self._sensitive: - self._user_credentials[key] = self._config[key] + self._user_credentials[key] = value elif key in self._ignore: pass else: - self._user_config[key] = self._config[key] + self._user_config[key] = value # special handling for encryption password - if key == 'disk_encryption' and self._config[key] is not None: - self._user_credentials['encryption_password'] = self._config[key].encryption_password + if key == 'disk_encryption' and value: + self._user_credentials['encryption_password'] = value.encryption_password def user_config_to_json(self) -> str: return json.dumps({ @@ -72,42 +72,33 @@ class ConfigurationOutput: print(_('\nThis is your chosen configuration:')) debug(" -- Chosen configuration --") - user_conig = self.user_config_to_json() - info(user_conig) - + info(self.user_config_to_json()) print() def _is_valid_path(self, dest_path: Path) -> bool: - if (not dest_path.exists()) or not (dest_path.is_dir()): + dest_path_ok = dest_path.exists() and dest_path.is_dir() + if not dest_path_ok: warn( f'Destination directory {dest_path.resolve()} does not exist or is not a directory\n.', 'Configuration files can not be saved' ) - return False - return True + return dest_path_ok def save_user_config(self, dest_path: Path): if self._is_valid_path(dest_path): target = dest_path / self._user_config_file - - with open(target, 'w') as config_file: - config_file.write(self.user_config_to_json()) - - os.chmod(str(dest_path / self._user_config_file), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) + target.write_text(self.user_config_to_json()) + os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) def save_user_creds(self, dest_path: Path): if self._is_valid_path(dest_path): if user_creds := self.user_credentials_to_json(): target = dest_path / self._user_creds_file - - with open(target, 'w') as config_file: - config_file.write(user_creds) - - os.chmod(str(target), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) + target.write_text(user_creds) + os.chmod(target, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) def save(self, dest_path: Optional[Path] = None): - if not dest_path: - dest_path = self._default_save_path + dest_path = dest_path or self._default_save_path if self._is_valid_path(dest_path): self.save_user_config(dest_path) @@ -116,33 +107,33 @@ class ConfigurationOutput: def save_config(config: Dict): def preview(selection: str): - if options["user_config"] == selection: - serialized = config_output.user_config_to_json() - return f"{config_output.user_configuration_file}\n{serialized}" - elif options["user_creds"] == selection: - if maybe_serial := config_output.user_credentials_to_json(): - return f"{config_output.user_credentials_file}\n{maybe_serial}" - else: + match options[selection]: + case "user_config": + serialized = config_output.user_config_to_json() + return f"{config_output.user_configuration_file}\n{serialized}" + case "user_creds": + if maybe_serial := config_output.user_credentials_to_json(): + return f"{config_output.user_credentials_file}\n{maybe_serial}" return str(_("No configuration")) - elif options["all"] == selection: - output = f"{config_output.user_configuration_file}\n" - if config_output.user_credentials_to_json(): - output += f"{config_output.user_credentials_file}\n" - return output[:-1] + case "all": + output = [config_output.user_configuration_file] + if config_output.user_credentials_to_json(): + output.append(config_output.user_credentials_file) + return '\n'.join(output) return None try: config_output = ConfigurationOutput(config) options = { - "user_config": str(_("Save user configuration (including disk layout)")), - "user_creds": str(_("Save user credentials")), - "all": str(_("Save all")), + str(_("Save user configuration (including disk layout)")): "user_config", + str(_("Save user credentials")): "user_creds", + str(_("Save all")): "all", } save_choice = Menu( _("Choose which configuration to save"), - list(options.values()), + list(options), sort=False, skip=True, preview_size=0.75, @@ -170,27 +161,21 @@ def save_config(config: Dict): prompt = _( "Do you want to save {} configuration file(s) in the following location?\n\n{}" - ).format( - list(options.keys())[list(options.values()).index(str(save_choice.value))], - dest_path.absolute(), - ) + ).format(options[str(save_choice.value)], dest_path.absolute()) + save_confirmation = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run() if save_confirmation == Menu.no(): return - debug( - _("Saving {} configuration files to {}").format( - list(options.keys())[list(options.values()).index(str(save_choice.value))], - dest_path.absolute(), - ) - ) - - if options["user_config"] == save_choice.value: - config_output.save_user_config(dest_path) - elif options["user_creds"] == save_choice.value: - config_output.save_user_creds(dest_path) - elif options["all"] == save_choice.value: - config_output.save_user_config(dest_path) - config_output.save_user_creds(dest_path) + debug("Saving {} configuration files to {}".format(options[str(save_choice.value)], dest_path.absolute())) + + match options[str(save_choice.value)]: + case "user_config": + config_output.save_user_config(dest_path) + case "user_creds": + config_output.save_user_creds(dest_path) + case "all": + config_output.save(dest_path) + except KeyboardInterrupt: return diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index f43d4f57..c85208ec 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -11,13 +11,14 @@ import sys import time import re import urllib.parse -import urllib.request +from urllib.request import Request, urlopen import urllib.error import pathlib from datetime import datetime, date from enum import Enum from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING from select import epoll, EPOLLIN, EPOLLHUP +from shutil import which from .exceptions import RequirementError, SysCallError from .output import debug, error, info @@ -34,28 +35,17 @@ def generate_password(length :int = 64) -> str: def locate_binary(name :str) -> str: - for PATH in os.environ['PATH'].split(':'): - for root, folders, files in os.walk(PATH): - for file in files: - if file == name: - return os.path.join(root, file) - break # Don't recurse - + if path := which(name): + return path raise RequirementError(f"Binary {name} does not exist.") def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]: # https://stackoverflow.com/a/43627833/929999 - if type(data) == bytes: - byte_vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8') - data = re.sub(byte_vt100_escape_regex, b'', data) - elif type(data) == str: - vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]' - data = re.sub(vt100_escape_regex, '', data) - else: - raise ValueError(f'Unsupported data type: {type(data)}') - - return data + vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]' + if isinstance(data, bytes): + return re.sub(vt100_escape_regex.encode(), b'', data) + return re.sub(vt100_escape_regex, '', data) def jsonify(obj: Any, safe: bool = True) -> Any: @@ -120,21 +110,15 @@ class SysCommandWorker: working_directory :Optional[str] = './', remove_vt100_escape_codes_from_lines :bool = True ): - if not callbacks: - callbacks = {} + callbacks = callbacks or {} + environment_vars = environment_vars or {} - if not environment_vars: - environment_vars = {} - - if type(cmd) is str: + if isinstance(cmd, str): cmd = shlex.split(cmd) - cmd = list(cmd) # This is to please mypy - if cmd[0][0] != '/' and cmd[0][:2] != './': - # "which" doesn't work as it's a builtin to bash. - # It used to work, but for whatever reason it doesn't anymore. - # We there for fall back on manual lookup in os.PATH - cmd[0] = locate_binary(cmd[0]) + if cmd: + if cmd[0][0] != '/' and cmd[0][:2] != './': # pathlib.Path does not work well + cmd[0] = locate_binary(cmd[0]) self.cmd = cmd self.callbacks = callbacks @@ -158,29 +142,36 @@ class SysCommandWorker: Contains will also move the current buffert position forward. This is to avoid re-checking the same data when looking for output. """ - assert type(key) == bytes + assert isinstance(key, bytes) - if (contains := key in self._trace_log[self._trace_log_pos:]): - self._trace_log_pos += self._trace_log[self._trace_log_pos:].find(key) + len(key) + index = self._trace_log.find(key, self._trace_log_pos) + if index >= 0: + self._trace_log_pos += index + len(key) + return True - return contains + return False def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]: - for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'): - if line: - escaped_line: bytes = line - - if self.remove_vt100_escape_codes_from_lines: - escaped_line = clear_vt100_escape_codes(line) # type: ignore + last_line = self._trace_log.rfind(b'\n') + lines = filter(None, self._trace_log[self._trace_log_pos:last_line].splitlines()) + for line in lines: + if self.remove_vt100_escape_codes_from_lines: + line = clear_vt100_escape_codes(line) # type: ignore - yield escaped_line + b'\n' + yield line + b'\n' - self._trace_log_pos = self._trace_log.rfind(b'\n') + self._trace_log_pos = last_line def __repr__(self) -> str: self.make_sure_we_are_executing() return str(self._trace_log) + def __str__(self) -> str: + try: + return self._trace_log.decode('utf-8') + except UnicodeDecodeError: + return str(self._trace_log) + def __enter__(self) -> 'SysCommandWorker': return self @@ -205,7 +196,7 @@ class SysCommandWorker: if self.exit_code != 0: raise SysCallError( - f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self._trace_log[-500:])}", + f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self)[-500:]}", self.exit_code, worker=self ) @@ -244,7 +235,7 @@ class SysCommandWorker: def peak(self, output: Union[str, bytes]) -> bool: if self.peek_output: - if type(output) == bytes: + if isinstance(output, bytes): try: output = output.decode('UTF-8') except UnicodeDecodeError: @@ -282,7 +273,7 @@ class SysCommandWorker: self.ended = time.time() break - if self.ended or (got_output is False and _pid_exists(self.pid) is False): + if self.ended or (not got_output and not _pid_exists(self.pid)): self.ended = time.time() try: wait_status = os.waitpid(self.pid, 0)[1] @@ -321,10 +312,8 @@ class SysCommandWorker: if change_perm: os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) - except PermissionError: - pass + except (PermissionError, FileNotFoundError): # If history_logfile does not exist, ignore the error - except FileNotFoundError: pass except Exception as e: exception_type = type(e).__name__ @@ -355,22 +344,18 @@ class SysCommandWorker: class SysCommand: def __init__(self, cmd :Union[str, List[str]], - callbacks :Optional[Dict[str, Callable[[Any], Any]]] = None, + callbacks :Dict[str, Callable[[Any], Any]] = {}, start_callback :Optional[Callable[[Any], Any]] = None, peek_output :Optional[bool] = False, environment_vars :Optional[Dict[str, Any]] = None, working_directory :Optional[str] = './', remove_vt100_escape_codes_from_lines :bool = True): - _callbacks = {} - if callbacks: - for hook, func in callbacks.items(): - _callbacks[hook] = func + self._callbacks = callbacks.copy() if start_callback: - _callbacks['on_start'] = start_callback + self._callbacks['on_start'] = start_callback self.cmd = cmd - self._callbacks = _callbacks self.peek_output = peek_output self.environment_vars = environment_vars self.working_directory = working_directory @@ -398,17 +383,15 @@ class SysCommand: if not self.session: raise KeyError(f"SysCommand() does not have an active session.") elif type(key) is slice: - start = key.start if key.start else 0 - end = key.stop if key.stop else len(self.session._trace_log) + start = key.start or 0 + end = key.stop or len(self.session._trace_log) return self.session._trace_log[start:end] else: raise ValueError("SysCommand() doesn't have key & value pairs, only slices, SysCommand('ls')[:10] as an example.") def __repr__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> str: - if self.session: - return self.session._trace_log.decode('UTF-8', errors='backslashreplace') - return '' + return self.decode('UTF-8', errors='backslashreplace') or '' def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]: return { @@ -416,7 +399,7 @@ class SysCommand: 'callbacks': self._callbacks, 'peak': self.peek_output, 'environment_vars': self.environment_vars, - 'session': True if self.session else False + 'session': self.session is not None } def create_session(self) -> bool: @@ -436,10 +419,9 @@ class SysCommand: remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines, working_directory=self.working_directory) as session: - if not self.session: - self.session = session + self.session = session - while self.session.ended is None: + while not self.session.ended: self.session.poll() if self.peek_output: @@ -448,9 +430,9 @@ class SysCommand: return True - def decode(self, fmt :str = 'UTF-8') -> Optional[str]: + def decode(self, *args, **kwargs) -> Optional[str]: if self.session: - return self.session._trace_log.decode(fmt) + return self.session._trace_log.decode(*args, **kwargs) return None @property @@ -476,54 +458,52 @@ def _pid_exists(pid: int) -> bool: def run_custom_user_commands(commands :List[str], installation :Installer) -> None: for index, command in enumerate(commands): + script_path = f"/var/tmp/user-command.{index}.sh" + chroot_path = installation.target / script_path + info(f'Executing custom command "{command}" ...') - - with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script: - temp_script.write(command) - - SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh") - - os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh") + chroot_path.write_text(command) + SysCommand(f"arch-chroot {installation.target} bash {script_path}") + + os.unlink(chroot_path) def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool : """ - Function to load a stream (file (as name) or valid JSON string into an existing dictionary - Returns true if it could be done - Return false if operation could not be executed + Load a JSON encoded dictionary from a stream and merge it into an existing dictionary. + A stream can be a filepath, a URL or a raw JSON string. + Returns True if the operation succeeded, False otherwise. +configuration_identifier is just a parameter to get meaningful, but not so long messages """ - parsed_url = urllib.parse.urlparse(stream) - - if parsed_url.scheme: # The stream is in fact a URL that should be grabbed + raw: Optional[str] = None + # Try using the stream as a URL that should be grabbed + if urllib.parse.urlparse(stream).scheme: try: - with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: - target.update(json.loads(response.read())) + with urlopen(Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: + raw = response.read() except urllib.error.HTTPError as err: - error(f"Could not load {configuration_identifier} via {parsed_url} due to: {err}") + error(f"Could not fetch JSON from {stream} as {configuration_identifier}: {err}") return False - else: - if pathlib.Path(stream).exists(): - try: - with pathlib.Path(stream).open() as fh: - target.update(json.load(fh)) - except Exception as err: - error(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {err}") - return False - else: - # NOTE: This is a rudimentary check if what we're trying parse is a dict structure. - # Which is the only structure we tolerate anyway. - if stream.strip().startswith('{') and stream.strip().endswith('}'): - try: - target.update(json.loads(stream)) - except Exception as e: - error(f"{configuration_identifier} Contains an invalid JSON format: {e}") - return False - else: - error(f"{configuration_identifier} is neither a file nor is a JSON string") - return False + # Try using the stream as a filepath that should be read + if raw is None and (path := pathlib.Path(stream)).exists(): + try: + raw = path.read_text() + except Exception as err: + error(f"Could not read file {stream} as {configuration_identifier}: {err}") + return False + + try: + # We use `or` to try the stream as raw JSON to be parsed + structure = json.loads(raw or stream) + except Exception as err: + error(f"{configuration_identifier} contains an invalid JSON format: {err}") + return False + if not isinstance(structure, dict): + error(f"{stream} passed as {configuration_identifier} is not a JSON encoded dictionary") + return False + target.update(structure) return True diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index f1a7f71a..ee546993 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -20,7 +20,8 @@ from .models.bootloader import Bootloader from .models.network_configuration import NetworkConfiguration from .models.users import User from .output import log, error, info, warn, debug -from .pacman import run_pacman +from . import pacman +from .pacman import Pacman from .plugins import plugins from .storage import storage @@ -52,27 +53,16 @@ class Installer: `Installer()` is the wrapper for most basic installation steps. It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. """ - if not base_packages: - base_packages = __packages__[:3] - - if kernels is None: - self.kernels = ['linux'] - else: - self.kernels = kernels - + self.base_packages = base_packages or __packages__[:3] + self.kernels = kernels or ['linux'] self._disk_config = disk_config - if disk_encryption is None: - self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption) - else: - self._disk_encryption = disk_encryption - + self._disk_encryption = disk_encryption or disk.DiskEncryption(disk.EncryptionType.NoEncryption) self.target: Path = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} - self.base_packages = base_packages for kernel in self.kernels: self.base_packages.append(kernel) @@ -101,6 +91,7 @@ class Installer: self._fstab_entries: List[str] = [] self._zram_enabled = False + self.pacman = Pacman(self.target, storage['arguments'].get('silent', False)) def __enter__(self) -> 'Installer': return self @@ -189,35 +180,33 @@ class Installer: # partitions have to mounted in the right order on btrfs the mountpoint will # be empty as the actual subvolumes are getting mounted instead so we'll use # '/' just for sorting - sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint if x.mountpoint else Path('/')) + sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/')) + enc_partitions = [] if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption: - enc_partitions = list(filter(lambda x: x in self._disk_encryption.partitions, sorted_part_mods)) - else: - enc_partitions = [] + enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions)) # attempt to decrypt all luks partitions luks_handlers = self._prepare_luks_partitions(enc_partitions) for part_mod in sorted_part_mods: - if part_mod not in luks_handlers: # partition is not encrypted + if luks_handler := luks_handlers.get(part_mod): + # mount encrypted partition + self._mount_luks_partiton(part_mod, luks_handler) + else: + # partition is not encrypted self._mount_partition(part_mod) - else: # mount encrypted partition - self._mount_luks_partiton(part_mod, luks_handlers[part_mod]) def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[disk.PartitionModification, Luks2]: - luks_handlers = {} - - for part_mod in partitions: - if part_mod.mapper_name and part_mod.dev_path: - luks_handler = disk.device_handler.unlock_luks2_dev( - part_mod.dev_path, - part_mod.mapper_name, - self._disk_encryption.encryption_password - ) - luks_handlers[part_mod] = luks_handler - - return luks_handlers + return { + part_mod: disk.device_handler.unlock_luks2_dev( + part_mod.dev_path, + part_mod.mapper_name, + self._disk_encryption.encryption_password + ) + for part_mod in partitions + if part_mod.mapper_name and part_mod.dev_path + } def _mount_partition(self, part_mod: disk.PartitionModification): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined @@ -302,93 +291,6 @@ class Installer: def post_install_check(self, *args :str, **kwargs :str) -> List[str]: return [step for step, flag in self.helper_flags.items() if flag is False] - def enable_multilib_repository(self): - # Set up a regular expression pattern of a commented line containing 'multilib' within [] - pattern = re.compile(r"^#\s*\[multilib\]$") - - # This is used to track if the previous line is a match, so we end up uncommenting the line after the block. - matched = False - - # Read in the lines from the original file - with open("/etc/pacman.conf", "r") as pacman_conf: - lines = pacman_conf.readlines() - - # Open the file again in write mode, to replace the contents - with open("/etc/pacman.conf", "w") as pacman_conf: - for line in lines: - if pattern.match(line): - # If this is the [] block containing 'multilib', uncomment it and set the matched tracking boolean. - pacman_conf.write(line.lstrip('#')) - matched = True - elif matched: - # The previous line was a match for [.*multilib.*]. - # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist' - pacman_conf.write(line.lstrip('#')) - matched = False # Reset the state of matched to False. - else: - pacman_conf.write(line) - - def enable_testing_repositories(self, enable_multilib_testing=False): - # Set up a regular expression pattern of a commented line containing 'testing' within [] - pattern = re.compile("^#\\[.*testing.*\\]$") - - # This is used to track if the previous line is a match, so we end up uncommenting the line after the block. - matched = False - - # Read in the lines from the original file - with open("/etc/pacman.conf", "r") as pacman_conf: - lines = pacman_conf.readlines() - - # Open the file again in write mode, to replace the contents - with open("/etc/pacman.conf", "w") as pacman_conf: - for line in lines: - if pattern.match(line) and (enable_multilib_testing or 'multilib' not in line): - # If this is the [] block containing 'testing', uncomment it and set the matched tracking boolean. - pacman_conf.write(line.lstrip('#')) - matched = True - elif matched: - # The previous line was a match for [.*testing.*]. - # This means we're on a line that looks like '#Include = /etc/pacman.d/mirrorlist' - pacman_conf.write(line.lstrip('#')) - matched = False # Reset the state of matched to False. - else: - pacman_conf.write(line) - - def _pacstrap(self, packages: Union[str, List[str]]) -> bool: - if isinstance(packages, str): - packages = [packages] - - for plugin in plugins.values(): - if hasattr(plugin, 'on_pacstrap'): - if (result := plugin.on_pacstrap(packages)): - packages = result - - info(f'Installing packages: {packages}') - - # TODO: We technically only need to run the -Syy once. - try: - run_pacman('-Syy', default_cmd='/usr/bin/pacman') - except SysCallError as err: - error(f'Could not sync a new package database: {err}') - - if storage['arguments'].get('silent', False) is False: - if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self._pacstrap(packages) - - raise RequirementError(f'Could not sync mirrors: {err}') - - try: - SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) - return True - except SysCallError as err: - error(f'Could not strap in packages: {err}') - - if storage['arguments'].get('silent', False) is False: - if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self._pacstrap(packages) - - raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") - def set_mirrors(self, mirror_config: MirrorConfiguration): for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): @@ -402,7 +304,8 @@ class Installer: add_custom_mirrors(mirror_config.custom_mirrors) def genfstab(self, flags :str = '-pU'): - info(f"Updating {self.target}/etc/fstab") + fstab_path = self.target / "etc" / "fstab" + info(f"Updating {fstab_path}") try: gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode() @@ -412,10 +315,10 @@ class Installer: if not gen_fstab: raise RequirementError(f'Genrating fstab returned empty value') - with open(f"{self.target}/etc/fstab", 'a') as fp: + with open(fstab_path, 'a') as fp: fp.write(gen_fstab) - if not os.path.isfile(f'{self.target}/etc/fstab'): + if not fstab_path.is_file(): raise RequirementError(f'Could not create fstab file') for plugin in plugins.values(): @@ -423,7 +326,7 @@ class Installer: if plugin.on_genfstab(self) is True: break - with open(f"{self.target}/etc/fstab", 'a') as fp: + with open(fstab_path, 'a') as fp: for entry in self._fstab_entries: fp.write(f'{entry}\n') @@ -432,9 +335,7 @@ class Installer: if part_mod.fs_type != disk.FilesystemType.Btrfs: continue - fstab_file = Path(f'{self.target}/etc/fstab') - - with fstab_file.open('r') as fp: + with fstab_path.open('r') as fp: fstab = fp.readlines() # Replace the {installation}/etc/fstab with entries @@ -456,7 +357,7 @@ class Installer: fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}') break - with fstab_file.open('w') as fp: + with fstab_path.open('w') as fp: fp.writelines(fstab) def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: @@ -486,8 +387,7 @@ class Installer: with open(f'{self.target}/etc/locale.gen', 'a') as fh: fh.write(f'{lang}.{encoding}{modifier} {encoding}\n') - with open(f'{self.target}/etc/locale.conf', 'w') as fh: - fh.write(f'LANG={lang}.{encoding}{modifier}\n') + (self.target / "etc" / "locale.conf").write_text(f'LANG={lang}.{encoding}{modifier}\n') try: SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen') @@ -561,16 +461,13 @@ class Installer: for plugin in plugins.values(): if hasattr(plugin, 'on_configure_nic'): - new_conf = plugin.on_configure_nic( + conf = plugin.on_configure_nic( network_config.iface, network_config.dhcp, network_config.ip, network_config.gateway, network_config.dns - ) - - if new_conf: - conf = new_conf + ) or conf with open(f"{self.target}/etc/systemd/network/10-{network_config.iface}.network", "a") as netconf: netconf.write(str(conf)) @@ -597,7 +494,7 @@ class Installer: # Otherwise, we can go ahead and add the required package # and enable it's service: else: - self._pacstrap('iwd') + self.pacman.strap('iwd') self.enable_service('iwd') for psk in psk_files: @@ -683,7 +580,7 @@ class Installer: if part in self._disk_encryption.partitions: if self._disk_encryption.hsm_device: # Required bby mkinitcpio to add support for fido2-device options - self._pacstrap('libfido2') + self.pacman.strap('libfido2') if 'sd-encrypt' not in self._hooks: self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') @@ -709,24 +606,27 @@ class Installer: # Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set. # This action takes place on the host system as pacstrap copies over package repository lists. + pacman_conf = pacman.Config(self.target) if multilib: info("The multilib flag is set. This system will be installed with the multilib repository enabled.") - self.enable_multilib_repository() + pacman_conf.enable(pacman.Repo.Multilib) else: info("The multilib flag is not set. This system will be installed without multilib repositories enabled.") if testing: info("The testing flag is set. This system will be installed with testing repositories enabled.") - self.enable_testing_repositories(multilib) + pacman_conf.enable(pacman.Repo.Testing) + if multilib: + pacman_conf.enable(pacman.Repo.MultilibTesting) else: info("The testing flag is not set. This system will be installed without testing repositories enabled.") - self._pacstrap(self.base_packages) + pacman_conf.apply() + + self.pacman.strap(self.base_packages) self.helper_flags['base-strapped'] = True - # This handles making sure that the repositories we enabled persist on the installed system - if multilib or testing: - shutil.copy2("/etc/pacman.conf", f"{self.target}/etc/pacman.conf") + pacman_conf.persist() # Periodic TRIM may improve the performance and longevity of SSDs whilst # having no adverse effect on other devices. Most distributions enable @@ -761,7 +661,7 @@ class Installer: def setup_swap(self, kind :str = 'zram'): if kind == 'zram': info(f"Setting up swap on zram") - self._pacstrap('zram-generator') + self.pacman.strap('zram-generator') # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' @@ -788,7 +688,7 @@ class Installer: return None def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): - self._pacstrap('efibootmgr') + self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): raise HardwareIncompatibilityError @@ -897,7 +797,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self._pacstrap('grub') # no need? + self.pacman.strap('grub') # no need? _file = "/etc/default/grub" @@ -916,7 +816,7 @@ class Installer: info(f"GRUB boot partition: {boot_partition.dev_path}") if SysInfo.has_uefi(): - self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) @@ -955,7 +855,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self._pacstrap('efibootmgr') + self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): raise HardwareIncompatibilityError @@ -1030,9 +930,6 @@ class Installer: if plugin.on_add_bootloader(self): return True - if type(self.target) == str: - self.target = Path(self.target) - boot_partition = self._get_boot_partition() root_partition = self._get_root_partition() @@ -1053,7 +950,7 @@ class Installer: self._add_efistub_bootloader(boot_partition, root_partition) def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: - return self._pacstrap(packages) + return self.pacman.strap(packages) def _enable_users(self, service: str, users: List[User]): for user in users: @@ -1214,7 +1111,7 @@ class Installer: return True - def _service_started(self, service_name: str) -> str | None: + def _service_started(self, service_name: str) -> Optional[str]: if os.path.splitext(service_name)[1] not in ('.service', '.target', '.timer'): service_name += '.service' # Just to be safe diff --git a/archinstall/lib/locale.py b/archinstall/lib/locale.py index 0a36c072..ab158984 100644 --- a/archinstall/lib/locale.py +++ b/archinstall/lib/locale.py @@ -1,3 +1,5 @@ +from itertools import takewhile +from pathlib import Path from typing import Iterator, List from .exceptions import ServiceException, SysCallError @@ -11,21 +13,12 @@ def list_keyboard_languages() -> Iterator[str]: def list_locales() -> List[str]: - with open('/etc/locale.gen', 'r') as fp: - locales = [] - # before the list of locales begins there's an empty line with a '#' in front - # so we'll collect the localels from bottom up and halt when we're donw - entries = fp.readlines() - entries.reverse() - - for entry in entries: - text = entry.replace('#', '').strip() - if text == '': - break - locales.append(text) - - locales.reverse() - return locales + entries = Path('/etc/locale.gen').read_text().splitlines() + # Before the list of locales begins there's an empty line with a '#' in front + # so we'll collect the locales from bottom up and halt when we're done. + locales = list(takewhile(bool, map(lambda entry: entry.strip('\n\t #'), reversed(entries)))) + locales.reverse() + return locales def list_x11_keyboard_languages() -> Iterator[str]: diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index 6906c320..2a086f39 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -9,7 +9,7 @@ from urllib.request import urlopen from .exceptions import SysCallError from .output import error, info, debug -from .pacman import run_pacman +from .pacman import Pacman def get_hw_addr(ifname :str) -> str: @@ -35,7 +35,7 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: def check_mirror_reachable() -> bool: info("Testing connectivity to the Arch Linux mirrors...") try: - run_pacman("-Sy") + Pacman.run("-Sy") return True except SysCallError as err: if os.geteuid() != 0: @@ -48,7 +48,7 @@ def check_mirror_reachable() -> bool: def update_keyring() -> bool: info("Updating archlinux-keyring ...") try: - run_pacman("-Sy --noconfirm archlinux-keyring") + Pacman.run("-Sy --noconfirm archlinux-keyring") return True except SysCallError: if os.geteuid() != 0: diff --git a/archinstall/lib/packages/packages.py b/archinstall/lib/packages/packages.py index 71818ca5..b71b0ce8 100644 --- a/archinstall/lib/packages/packages.py +++ b/archinstall/lib/packages/packages.py @@ -8,7 +8,7 @@ from urllib.request import urlopen from ..exceptions import PackageError, SysCallError from ..models.gen import PackageSearch, PackageSearchResult, LocalPackage -from ..pacman import run_pacman +from ..pacman import Pacman BASE_URL_PKG_SEARCH = 'https://archlinux.org/packages/search/json/' # BASE_URL_PKG_CONTENT = 'https://archlinux.org/packages/search/json/' @@ -106,7 +106,7 @@ def validate_package_list(packages :list) -> Tuple[list, list]: def installed_package(package :str) -> LocalPackage: package_info = {} try: - for line in run_pacman(f"-Q --info {package}"): + for line in Pacman.run(f"-Q --info {package}"): if b':' in line: key, value = line.decode().split(':', 1) package_info[key.strip().lower().replace(' ', '_')] = value.strip() diff --git a/archinstall/lib/pacman.py b/archinstall/lib/pacman.py deleted file mode 100644 index f5514f05..00000000 --- a/archinstall/lib/pacman.py +++ /dev/null @@ -1,31 +0,0 @@ -import pathlib -import time -from typing import TYPE_CHECKING, Any - -from .general import SysCommand -from .output import warn, error - -if TYPE_CHECKING: - _: Any - - -def run_pacman(args :str, default_cmd :str = 'pacman') -> SysCommand: - """ - A centralized function to call `pacman` from. - It also protects us from colliding with other running pacman sessions (if used locally). - The grace period is set to 10 minutes before exiting hard if another pacman instance is running. - """ - pacman_db_lock = pathlib.Path('/var/lib/pacman/db.lck') - - if pacman_db_lock.exists(): - warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.')) - - started = time.time() - while pacman_db_lock.exists(): - time.sleep(0.25) - - if time.time() - started > (60 * 10): - error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.')) - exit(1) - - return SysCommand(f'{default_cmd} {args}') diff --git a/archinstall/lib/pacman/__init__.py b/archinstall/lib/pacman/__init__.py new file mode 100644 index 00000000..6478f0cc --- /dev/null +++ b/archinstall/lib/pacman/__init__.py @@ -0,0 +1,88 @@ +from pathlib import Path +import time +import re +from typing import TYPE_CHECKING, Any, List, Callable, Union +from shutil import copy2 + +from ..general import SysCommand +from ..output import warn, error, info +from .repo import Repo +from .config import Config +from ..exceptions import RequirementError +from ..plugins import plugins + +if TYPE_CHECKING: + _: Any + + +class Pacman: + + def __init__(self, target: Path, silent: bool = False): + self.synced = False + self.silent = silent + self.target = target + + @staticmethod + def run(args :str, default_cmd :str = 'pacman') -> SysCommand: + """ + A centralized function to call `pacman` from. + It also protects us from colliding with other running pacman sessions (if used locally). + The grace period is set to 10 minutes before exiting hard if another pacman instance is running. + """ + pacman_db_lock = Path('/var/lib/pacman/db.lck') + + if pacman_db_lock.exists(): + warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.')) + + started = time.time() + while pacman_db_lock.exists(): + time.sleep(0.25) + + if time.time() - started > (60 * 10): + error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.')) + exit(1) + + return SysCommand(f'{default_cmd} {args}') + + def ask(self, error_message: str, bail_message: str, func: Callable, *args, **kwargs): + while True: + try: + func(*args, **kwargs) + break + except Exception as err: + error(f'{error_message}: {err}') + if not self.silent and input('Would you like to re-try this download? (Y/n): ').lower().strip() in 'y': + continue + raise RequirementError(f'{bail_message}: {err}') + + def sync(self): + if self.synced: + return + self.ask( + 'Could not sync a new package database', + 'Could not sync mirrors', + self.run, + '-Syy', + default_cmd='/usr/bin/pacman' + ) + self.synced = True + + def strap(self, packages: Union[str, List[str]]): + self.sync() + if isinstance(packages, str): + packages = [packages] + + for plugin in plugins.values(): + if hasattr(plugin, 'on_pacstrap'): + if (result := plugin.on_pacstrap(packages)): + packages = result + + info(f'Installing packages: {packages}') + + self.ask( + 'Could not strap in packages', + 'Pacstrap failed. See /var/log/archinstall/install.log or above message for error details', + SysCommand, + f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', + peek_output=True + ) diff --git a/archinstall/lib/pacman/config.py b/archinstall/lib/pacman/config.py new file mode 100644 index 00000000..60d202bc --- /dev/null +++ b/archinstall/lib/pacman/config.py @@ -0,0 +1,33 @@ +import re +from pathlib import Path +from shutil import copy2 +from typing import List + +from .repo import Repo + + +class Config: + def __init__(self, target: Path): + self.path = Path("/etc") / "pacman.conf" + self.chroot_path = target / "etc" / "pacman.conf" + self.patterns: List[re.Pattern] = [] + + def enable(self, repo: Repo): + self.patterns.append(re.compile(r"^#\s*\[{}\]$".format(repo.value))) + + def apply(self): + if not self.patterns: + return + lines = iter(self.path.read_text().splitlines(keepends=True)) + with open(self.path, 'w') as f: + for line in lines: + if any(pattern.match(line) for pattern in self.patterns): + # Uncomment this line and the next. + f.write(line.lstrip('#')) + f.write(next(lines).lstrip('#')) + else: + f.write(line) + + def persist(self): + if self.patterns: + copy2(self.path, self.chroot_path) diff --git a/archinstall/lib/pacman/repo.py b/archinstall/lib/pacman/repo.py new file mode 100644 index 00000000..b4106f97 --- /dev/null +++ b/archinstall/lib/pacman/repo.py @@ -0,0 +1,6 @@ +from enum import Enum + +class Repo(Enum): + Multilib = "multilib" + Testing = "testing" + MultilibTesting = "multilib-testing" diff --git a/archinstall/lib/profile/profiles_handler.py b/archinstall/lib/profile/profiles_handler.py index 4e7c3d2b..74c21824 100644 --- a/archinstall/lib/profile/profiles_handler.py +++ b/archinstall/lib/profile/profiles_handler.py @@ -98,14 +98,19 @@ class ProfileHandler: profile = self.get_profile_by_name(main) if main else None valid: List[Profile] = [] + details: List[str] = profile_config.get('details', []) + if details: + valid = [] + invalid = [] - if details := profile_config.get('details', []): - resolved = {detail: self.get_profile_by_name(detail) for detail in details if detail} - valid = [p for p in resolved.values() if p is not None] - invalid = ', '.join([k for k, v in resolved.items() if v is None]) + for detail in filter(None, details): + if profile := self.get_profile_by_name(detail): + valid.append(profile) + else: + invalid.append(detail) if invalid: - info(f'No profile definition found: {invalid}') + info('No profile definition found: {}'.format(', '.join(invalid))) custom_settings = profile_config.get('custom_settings', {}) for profile in valid: @@ -123,14 +128,12 @@ class ProfileHandler: """ List of all available default_profiles """ - if self._profiles is None: - self._profiles = self._find_available_profiles() + self._profiles = self._profiles or self._find_available_profiles() return self._profiles @cached_property def _local_mac_addresses(self) -> List[str]: - ifaces = list_interfaces() - return list(ifaces.keys()) + return list(list_interfaces()) def add_custom_profiles(self, profiles: Union[TProfile, List[TProfile]]): if not isinstance(profiles, list): @@ -190,25 +193,20 @@ class ProfileHandler: def install_gfx_driver(self, install_session: 'Installer', driver: Optional[GfxDriver]): try: - driver_pkgs = driver.packages() if driver else [] - pkg_names = [p.value for p in driver_pkgs] - additional_pkg = ' '.join(['xorg-server', 'xorg-xinit'] + pkg_names) if driver is not None: - # Find the intersection between the set of known nvidia drivers - # and the selected driver packages. Since valid intesections can - # only have one element or none, we iterate and try to take the - # first element. - if driver_pkg := next(iter({GfxPackage.Nvidia, GfxPackage.NvidiaOpen} & set(driver_pkgs)), None): - if any(kernel in install_session.base_packages for kernel in ("linux-lts", "linux-zen")): - for kernel in install_session.kernels: - # Fixes https://github.com/archlinux/archinstall/issues/585 - install_session.add_additional_packages(f"{kernel}-headers") + driver_pkgs = driver.packages() + pkg_names = [p.value for p in driver_pkgs] + for driver_pkg in {GfxPackage.Nvidia, GfxPackage.NvidiaOpen} & set(driver_pkgs): + for kernel in {"linux-lts", "linux-zen"} & set(install_session.kernels): + # Fixes https://github.com/archlinux/archinstall/issues/585 + install_session.add_additional_packages(f"{kernel}-headers") # I've had kernel regen fail if it wasn't installed before nvidia-dkms - install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', f'{driver_pkg}-dkms']) - return - elif 'amdgpu' in driver_pkgs: + install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', f'{driver_pkg.value}-dkms']) + # Return after first driver match, since it is impossible to use both simultaneously. + return + if 'amdgpu' in driver_pkgs: # The order of these two are important if amdgpu is installed #808 if 'amdgpu' in install_session.modules: install_session.modules.remove('amdgpu') @@ -218,23 +216,24 @@ class ProfileHandler: install_session.modules.remove('radeon') install_session.modules.append('radeon') - install_session.add_additional_packages(additional_pkg) + install_session.add_additional_packages(pkg_names) except Exception as err: warn(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}") # Prep didn't run, so there's no driver to install - install_session.add_additional_packages(['xorg-server', 'xorg-xinit']) + install_session.add_additional_packages(['xorg-server', 'xorg-xinit']) def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration): profile = profile_config.profile - if profile: - profile.install(install_session) + if not profile: + return - if profile and profile_config.gfx_driver: - if profile.is_xorg_type_profile() or profile.is_desktop_type_profile(): - self.install_gfx_driver(install_session, profile_config.gfx_driver) + profile.install(install_session) - if profile and profile_config.greeter: + if profile_config.gfx_driver and (profile.is_xorg_type_profile() or profile.is_desktop_type_profile()): + self.install_gfx_driver(install_session, profile_config.gfx_driver) + + if profile_config.greeter: self.install_greeter(install_session, profile_config.greeter) def _import_profile_from_url(self, url: str): @@ -312,8 +311,7 @@ class ProfileHandler: debug(f'Importing profile: {file}') try: - spec = importlib.util.spec_from_file_location(name, file) - if spec is not None: + if spec := importlib.util.spec_from_file_location(name, file): imported = importlib.util.module_from_spec(spec) if spec.loader is not None: spec.loader.exec_module(imported) |