index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Daniel Girtler <blackrabbit256@gmail.com> | 2023-05-12 02:30:09 +1000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-11 18:30:09 +0200 |
commit | 89cefb9a1c7d4c4968e7d8645149078e601c9d1c (patch) | |
tree | 12c84bdcef1b0ef3f8a21977e25c7f0f89388138 /archinstall/lib | |
parent | 6e6b850a8f687b193172aaa321d49bd2956c1d4f (diff) |
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/boot.py index 6ccbc5f6..62c50df3 100644 --- a/archinstall/lib/systemd.py +++ b/archinstall/lib/boot.py @@ -1,10 +1,9 @@ -import logging import time from typing import Iterator, Optional from .exceptions import SysCallError from .general import SysCommand, SysCommandWorker, locate_binary from .installer import Installer -from .output import log +from .output import error from .storage import storage @@ -48,16 +47,18 @@ class Boot: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: - log(args[1], level=logging.ERROR, fg='red') - log(f"The error above occurred in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red") + error( + args[1], + f"The error above occurred in a temporary boot-up of the installation {self.instance}" + ) shutdown = None shutdown_exit_code: Optional[int] = -1 try: shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now') - except SysCallError as error: - shutdown_exit_code = error.exit_code + except SysCallError as err: + shutdown_exit_code = err.exit_code if self.session: while self.session.is_alive(): diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py index 22c41c0d..c3af3a83 100644 --- a/archinstall/lib/configuration.py +++ b/archinstall/lib/configuration.py @@ -1,13 +1,13 @@ import os import json import stat -import logging from pathlib import Path from typing import Optional, Dict, Any, TYPE_CHECKING +from .menu import Menu, MenuSelectionType from .storage import storage -from .general import JSON, UNSAFE_JSON -from .output import log +from .general import JSON, UNSAFE_JSON, SysCommand +from .output import debug, info, warn if TYPE_CHECKING: _: Any @@ -69,18 +69,18 @@ class ConfigurationOutput: def show(self): print(_('\nThis is your chosen configuration:')) - log(" -- Chosen configuration --", level=logging.DEBUG) + debug(" -- Chosen configuration --") user_conig = self.user_config_to_json() - log(user_conig, level=logging.INFO) + info(user_conig) print() def _is_valid_path(self, dest_path: Path) -> bool: if (not dest_path.exists()) or not (dest_path.is_dir()): - log( - 'Destination directory {} does not exist or is not a directory,\n Configuration files can not be saved'.format(dest_path.resolve()), - fg="yellow" + warn( + f'Destination directory {dest_path.resolve()} does not exist or is not a directory\n.', + 'Configuration files can not be saved' ) return False return True @@ -111,3 +111,101 @@ class ConfigurationOutput: if self._is_valid_path(dest_path): self.save_user_config(dest_path) self.save_user_creds(dest_path) + + +def save_config(config: Dict): + def preview(selection: str): + if options['user_config'] == selection: + serialized = config_output.user_config_to_json() + return f'{config_output.user_configuration_file}\n{serialized}' + elif options['user_creds'] == selection: + if maybe_serial := config_output.user_credentials_to_json(): + return f'{config_output.user_credentials_file}\n{maybe_serial}' + else: + return str(_('No configuration')) + elif options['all'] == selection: + output = f'{config_output.user_configuration_file}\n' + if config_output.user_credentials_to_json(): + output += f'{config_output.user_credentials_file}\n' + return output[:-1] + return None + + config_output = ConfigurationOutput(config) + + options = { + 'user_config': str(_('Save user configuration')), + 'user_creds': str(_('Save user credentials')), + 'disk_layout': str(_('Save disk layout')), + 'all': str(_('Save all')) + } + + choice = Menu( + _('Choose which configuration to save'), + list(options.values()), + sort=False, + skip=True, + preview_size=0.75, + preview_command=preview + ).run() + + if choice.type_ == MenuSelectionType.Skip: + return + + save_config_value = choice.single_value + saving_key = [k for k, v in options.items() if v == save_config_value][0] + + dirs_to_exclude = [ + '/bin', + '/dev', + '/lib', + '/lib64', + '/lost+found', + '/opt', + '/proc', + '/run', + '/sbin', + '/srv', + '/sys', + '/usr', + '/var', + ] + + debug('Ignore configuration option folders: ' + ','.join(dirs_to_exclude)) + info(_('Finding possible directories to save configuration files ...')) + + find_exclude = '-path ' + ' -prune -o -path '.join(dirs_to_exclude) + ' -prune ' + file_picker_command = f'find / {find_exclude} -o -type d -print0' + + directories = SysCommand(file_picker_command).decode() + + if directories is None: + raise ValueError('Failed to retrieve possible configuration directories') + + possible_save_dirs = list(filter(None, directories.split('\x00'))) + + selection = Menu( + _('Select directory (or directories) for saving configuration files'), + possible_save_dirs, + multi=True, + skip=True, + allow_reset=False, + ).run() + + match selection.type_: + case MenuSelectionType.Skip: + return + + save_dirs = selection.multi_value + + debug(f'Saving {saving_key} configuration files to {save_dirs}') + + if save_dirs is not None: + for save_dir_str in save_dirs: + save_dir = Path(save_dir_str) + if options['user_config'] == save_config_value: + config_output.save_user_config(save_dir) + elif options['user_creds'] == save_config_value: + config_output.save_user_creds(save_dir) + elif options['all'] == save_config_value: + config_output.save_user_config(save_dir) + config_output.save_user_creds(save_dir) diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py index 13bde77a..4341c53c 100644 --- a/archinstall/lib/disk/device_handler.py +++ b/archinstall/lib/disk/device_handler.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -import logging import os import time from pathlib import Path @@ -24,7 +23,7 @@ from .device_model import ( from ..exceptions import DiskError, UnknownFilesystemFormat from ..general import SysCommand, SysCallError, JSON from ..luks import Luks2 -from ..output import log +from ..output import debug, error, info, warn from ..utils.util import is_subpath if TYPE_CHECKING: @@ -48,11 +47,11 @@ class DeviceHandler(object): for device in getAllDevices(): try: disk = Disk(device) - except DiskLabelException as error: - if 'unrecognised disk label' in getattr(error, 'message', str(error)): + except DiskLabelException as err: + if 'unrecognised disk label' in getattr(error, 'message', str(err)): disk = freshDisk(device, PartitionTable.GPT.value) else: - log(f'Unable to get disk from device: {device}', level=logging.DEBUG) + debug(f'Unable to get disk from device: {device}') continue device_info = _DeviceInfo.from_disk(disk) @@ -93,7 +92,7 @@ class DeviceHandler(object): return FilesystemType(lsblk_info.fstype) if lsblk_info.fstype else None return None except ValueError: - log(f'Could not determine the filesystem: {partition.fileSystem}', level=logging.DEBUG) + debug(f'Could not determine the filesystem: {partition.fileSystem}') return None @@ -137,7 +136,7 @@ class DeviceHandler(object): try: result = SysCommand(f'btrfs subvolume list {mountpoint}') except SysCallError as err: - log(f'Failed to read btrfs subvolume information: {err}', level=logging.DEBUG) + debug(f'Failed to read btrfs subvolume information: {err}') return subvol_infos try: @@ -150,7 +149,7 @@ class DeviceHandler(object): sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None) subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint)) except json.decoder.JSONDecodeError as err: - log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR) + error(f"Could not decode lsblk JSON: {result}") raise err if not lsblk_info.mountpoint: @@ -203,14 +202,14 @@ class DeviceHandler(object): options += additional_parted_options options_str = ' '.join(options) - log(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') + info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') try: SysCommand(f"/usr/bin/{command} {options_str} {path}") - except SysCallError as error: - msg = f'Could not format {path} with {fs_type.value}: {error.message}' - log(msg, fg='red') - raise DiskError(msg) from error + except SysCallError as err: + msg = f'Could not format {path} with {fs_type.value}: {err.message}' + error(msg) + raise DiskError(msg) from err def _perform_enc_formatting( self, @@ -227,16 +226,16 @@ class DeviceHandler(object): key_file = luks_handler.encrypt() - log(f'Unlocking luks2 device: {dev_path}', level=logging.DEBUG) + debug(f'Unlocking luks2 device: {dev_path}') luks_handler.unlock(key_file=key_file) if not luks_handler.mapper_dev: raise DiskError('Failed to unlock luks device') - log(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}', level=logging.INFO) + info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}') self._perform_formatting(fs_type, luks_handler.mapper_dev) - log(f'luks2 locking device: {dev_path}', level=logging.INFO) + info(f'luks2 locking device: {dev_path}') luks_handler.lock() def format( @@ -285,7 +284,7 @@ class DeviceHandler(object): # when we require a delete and the partition to be (re)created # already exists then we have to delete it first if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]: - log(f'Delete existing partition: {part_mod.safe_dev_path}', level=logging.INFO) + info(f'Delete existing partition: {part_mod.safe_dev_path}') part_info = self.find_partition(part_mod.safe_dev_path) if not part_info: @@ -325,9 +324,9 @@ class DeviceHandler(object): for flag in part_mod.flags: partition.setFlag(flag.value) - log(f'\tType: {part_mod.type.value}', level=logging.DEBUG) - log(f'\tFilesystem: {part_mod.fs_type.value}', level=logging.DEBUG) - log(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length', level=logging.DEBUG) + debug(f'\tType: {part_mod.type.value}') + debug(f'\tFilesystem: {part_mod.fs_type.value}') + debug(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length') try: disk.addPartition(partition=partition, constraint=disk.device.optimalAlignedConstraint) @@ -339,41 +338,41 @@ class DeviceHandler(object): # the partition has a real path now as it was created part_mod.dev_path = Path(partition.path) - info = self._fetch_partuuid(part_mod.dev_path) + lsblk_info = self._fetch_partuuid(part_mod.dev_path) - part_mod.partuuid = info.partuuid - part_mod.uuid = info.uuid + part_mod.partuuid = lsblk_info.partuuid + part_mod.uuid = lsblk_info.uuid except PartitionException as ex: raise DiskError(f'Unable to add partition, most likely due to overlapping sectors: {ex}') from ex def _fetch_partuuid(self, path: Path) -> LsblkInfo: attempts = 3 - info: Optional[LsblkInfo] = None + lsblk_info: Optional[LsblkInfo] = None self.partprobe(path) for attempt_nr in range(attempts): time.sleep(attempt_nr + 1) - info = get_lsblk_info(path) + lsblk_info = get_lsblk_info(path) - if info.partuuid: + if lsblk_info.partuuid: break self.partprobe(path) - if not info or not info.partuuid: - log(f'Unable to determine new partition uuid: {path}\n{info}', level=logging.DEBUG) + if not lsblk_info or not lsblk_info.partuuid: + debug(f'Unable to determine new partition uuid: {path}\n{lsblk_info}') raise DiskError(f'Unable to determine new partition uuid: {path}') - log(f'partuuid found: {info.json()}', level=logging.DEBUG) + debug(f'partuuid found: {lsblk_info.json()}') - return info + return lsblk_info def create_btrfs_volumes( self, part_mod: PartitionModification, enc_conf: Optional['DiskEncryption'] = None ): - log(f'Creating subvolumes: {part_mod.safe_dev_path}', level=logging.INFO) + info(f'Creating subvolumes: {part_mod.safe_dev_path}') luks_handler = None @@ -396,7 +395,7 @@ class DeviceHandler(object): self.mount(part_mod.safe_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) for sub_vol in part_mod.btrfs_subvols: - log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG) + debug(f'Creating subvolume: {sub_vol.name}') if luks_handler is not None: subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name @@ -408,14 +407,14 @@ class DeviceHandler(object): if sub_vol.nodatacow: try: SysCommand(f'chattr +C {subvol_path}') - except SysCallError as error: - raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {error}') + except SysCallError as err: + raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}') if sub_vol.compress: try: SysCommand(f'chattr +c {subvol_path}') - except SysCallError as error: - raise DiskError(f'Could not set compress attribute at {subvol_path}: {error}') + except SysCallError as err: + raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}') if luks_handler is not None and luks_handler.mapper_dev is not None: self.umount(luks_handler.mapper_dev) @@ -435,12 +434,12 @@ class DeviceHandler(object): return luks_handler def _umount_all_existing(self, modification: DeviceModification): - log(f'Unmounting all partitions: {modification.device_path}', level=logging.INFO) + info(f'Unmounting all partitions: {modification.device_path}') existing_partitions = self._devices[modification.device_path].partition_infos for partition in existing_partitions: - log(f'Unmounting: {partition.path}', level=logging.DEBUG) + debug(f'Unmounting: {partition.path}') # un-mount for existing encrypted partitions if partition.fs_type == FilesystemType.Crypto_luks: @@ -472,10 +471,10 @@ class DeviceHandler(object): part_table = partition_table.value if partition_table else None disk = freshDisk(modification.device.disk.device, part_table) else: - log(f'Use existing device: {modification.device_path}') + info(f'Use existing device: {modification.device_path}') disk = modification.device.disk - log(f'Creating partitions: {modification.device_path}') + info(f'Creating partitions: {modification.device_path}') # TODO sort by delete first @@ -507,7 +506,7 @@ class DeviceHandler(object): lsblk_info = get_lsblk_info(dev_path) if target_mountpoint in lsblk_info.mountpoints: - log(f'Device already mounted at {target_mountpoint}') + info(f'Device already mounted at {target_mountpoint}') return str_options = ','.join(options) @@ -517,7 +516,7 @@ class DeviceHandler(object): command = f'mount {mount_fs} {str_options} {dev_path} {target_mountpoint}' - log(f'Mounting {dev_path}: command', level=logging.DEBUG) + debug(f'Mounting {dev_path}: command') try: SysCommand(command) @@ -536,10 +535,10 @@ class DeviceHandler(object): raise ex if len(lsblk_info.mountpoints) > 0: - log(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}', level=logging.DEBUG) + debug(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}') for mountpoint in lsblk_info.mountpoints: - log(f'Unmounting mountpoint: {mountpoint}', level=logging.DEBUG) + debug(f'Unmounting mountpoint: {mountpoint}') command = 'umount' @@ -574,10 +573,10 @@ class DeviceHandler(object): command = 'partprobe' try: - log(f'Calling partprobe: {command}', level=logging.DEBUG) + debug(f'Calling partprobe: {command}') SysCommand(command) - except SysCallError as error: - log(f'"{command}" failed to run: {error}', level=logging.DEBUG) + except SysCallError as err: + error(f'"{command}" failed to run: {err}') def _wipe(self, dev_path: Path): """ @@ -594,7 +593,7 @@ class DeviceHandler(object): This is not intended to be secure, but rather to ensure that auto-discovery tools don't recognize anything here. """ - log(f'Wiping partitions and metadata: {block_device.device_info.path}') + info(f'Wiping partitions and metadata: {block_device.device_info.path}') for partition in block_device.partition_infos: self._wipe(partition.path) @@ -609,8 +608,8 @@ def disk_layouts() -> str: lsblk_info = get_all_lsblk_info() return json.dumps(lsblk_info, indent=4, sort_keys=True, cls=JSON) except SysCallError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") + warn(f"Could not return disk layouts: {err}") return '' except json.decoder.JSONDecodeError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") + warn(f"Could not return disk layouts: {err}") return '' diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py index d57347b7..36dd0c4f 100644 --- a/archinstall/lib/disk/device_model.py +++ b/archinstall/lib/disk/device_model.py @@ -2,7 +2,6 @@ from __future__ import annotations import dataclasses import json -import logging import math import time import uuid @@ -18,7 +17,7 @@ from parted import Disk, Geometry, Partition from ..exceptions import DiskError, SysCallError from ..general import SysCommand -from ..output import log +from ..output import debug, error from ..storage import storage if TYPE_CHECKING: @@ -282,7 +281,7 @@ class _PartitionInfo: btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list) def as_json(self) -> Dict[str, Any]: - info = { + part_info = { 'Name': self.name, 'Type': self.type.value, 'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')), @@ -293,9 +292,9 @@ class _PartitionInfo: } if self.btrfs_subvol_infos: - info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes' + part_info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes' - return info + return part_info @classmethod def from_partition( @@ -392,7 +391,7 @@ class SubvolumeModification: mods = [] for entry in subvol_args: if not entry.get('name', None) or not entry.get('mountpoint', None): - log(f'Subvolume arg is missing name: {entry}', level=logging.DEBUG) + debug(f'Subvolume arg is missing name: {entry}') continue mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None @@ -705,7 +704,7 @@ class PartitionModification: """ Called for displaying data in table format """ - info = { + part_mod = { 'Status': self.status.value, 'Device': str(self.dev_path) if self.dev_path else '', 'Type': self.type.value, @@ -718,9 +717,9 @@ class PartitionModification: } if self.btrfs_subvols: - info['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes' + part_mod['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes' - return info + return part_mod @dataclass @@ -916,36 +915,36 @@ class LsblkInfo: @classmethod def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo: - info = cls() + lsblk_info = cls() for f in cls.fields(): lsblk_field = _clean_field(f, CleanType.Blockdevice) data_field = _clean_field(f, CleanType.Dataclass) val: Any = None - if isinstance(getattr(info, data_field), Path): + if isinstance(getattr(lsblk_info, data_field), Path): val = Path(blockdevice[lsblk_field]) - elif isinstance(getattr(info, data_field), Size): + elif isinstance(getattr(lsblk_info, data_field), Size): val = Size(blockdevice[lsblk_field], Unit.B) else: val = blockdevice[lsblk_field] - setattr(info, data_field, val) + setattr(lsblk_info, data_field, val) - info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])] + lsblk_info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])] # sometimes lsblk returns 'mountpoints': [null] - info.mountpoints = [Path(mnt) for mnt in info.mountpoints if mnt] + lsblk_info.mountpoints = [Path(mnt) for mnt in lsblk_info.mountpoints if mnt] fs_roots = [] - for r in info.fsroots: + for r in lsblk_info.fsroots: if r: path = Path(r) # store the fsroot entries without the leading / fs_roots.append(path.relative_to(path.anchor)) - info.fsroots = fs_roots + lsblk_info.fsroots = fs_roots - return info + return lsblk_info class CleanType(Enum): @@ -978,16 +977,16 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int = try: result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}') break - except SysCallError as error: + except SysCallError as err: # Get the output minus the message/info from lsblk if it returns a non-zero exit code. - if error.worker: - err = error.worker.decode('UTF-8') - log(f'Error calling lsblk: {err}', level=logging.DEBUG) + if err.worker: + err_str = err.worker.decode('UTF-8') + debug(f'Error calling lsblk: {err_str}') else: - raise error + raise err if retry_attempt == retry - 1: - raise error + raise err time.sleep(1) @@ -997,11 +996,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int = blockdevices = block_devices['blockdevices'] return [LsblkInfo.from_json(device) for device in blockdevices] except json.decoder.JSONDecodeError as err: - log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR) + error(f"Could not decode lsblk JSON: {result}") raise err raise DiskError(f'Failed to read disk "{dev_path}" with lsblk') + def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo: if infos := _fetch_lsblk_info(dev_path): return infos[0] diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py index 285270fb..8c64e65e 100644 --- a/archinstall/lib/disk/encryption_menu.py +++ b/archinstall/lib/disk/encryption_menu.py @@ -13,7 +13,7 @@ from ..menu import ( MenuSelectionType, TableMenu ) -from ..user_interaction.utils import get_password +from ..interactions.utils import get_password from ..menu import Menu from ..general import secret from .fido import Fido2Device, Fido2 diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py index 2a53b551..97c38d84 100644 --- a/archinstall/lib/disk/fido.py +++ b/archinstall/lib/disk/fido.py @@ -1,13 +1,12 @@ from __future__ import annotations import getpass -import logging from pathlib import Path from typing import List, Optional from .device_model import PartitionModification, Fido2Device from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes -from ..output import log +from ..output import error, info class Fido2: @@ -39,7 +38,7 @@ class Fido2: if not cls._loaded or reload: ret: Optional[str] = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8') if not ret: - log('Unable to retrieve fido2 devices', level=logging.ERROR) + error('Unable to retrieve fido2 devices') return [] fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore @@ -88,8 +87,4 @@ class Fido2: worker.write(bytes(getpass.getpass(" "), 'UTF-8')) pin_inputted = True - log( - f"You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds.", - level=logging.INFO, - fg="yellow" - ) + info('You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds') diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 6ea99340..dc99afce 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import signal import sys import time @@ -8,8 +7,8 @@ from typing import Any, Optional, TYPE_CHECKING from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption from .device_handler import device_handler -from ..hardware import has_uefi -from ..output import log +from ..hardware import SysInfo +from ..output import debug from ..menu import Menu if TYPE_CHECKING: @@ -27,13 +26,13 @@ class FilesystemHandler: def perform_filesystem_operations(self, show_countdown: bool = True): if self._disk_config.config_type == DiskLayoutType.Pre_mount: - log('Disk layout configuration is set to pre-mount, not performing any operations', level=logging.DEBUG) + debug('Disk layout configuration is set to pre-mount, not performing any operations') return device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications)) if not device_mods: - log('No modifications required', level=logging.DEBUG) + debug('No modifications required') return device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods]) @@ -48,7 +47,7 @@ class FilesystemHandler: # Setup the blockdevice, filesystem (and optionally encryption). # Once that's done, we'll hand over to perform_installation() partition_table = PartitionTable.GPT - if has_uefi() is False: + if SysInfo.has_uefi() is False: partition_table = PartitionTable.MBR for mod in device_mods: diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py index 686e8c29..89cf6293 100644 --- a/archinstall/lib/disk/partitioning_menu.py +++ b/archinstall/lib/disk/partitioning_menu.py @@ -1,13 +1,12 @@ from __future__ import annotations -import logging from pathlib import Path from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \ ModificationStatus from ..menu import Menu, ListManager, MenuSelection, TextInput -from ..output import FormattedOutput, log +from ..output import FormattedOutput, warn from .subvolume_menu import SubvolumeMenu if TYPE_CHECKING: @@ -229,7 +228,7 @@ class PartitioningList(ListManager): if not start_sector or self._validate_sector(start_sector): break - log(f'Invalid start sector entered: {start_sector}', fg='red', level=logging.INFO) + warn(f'Invalid start sector entered: {start_sector}') if not start_sector: start_sector = str(largest_free_area.start) @@ -245,7 +244,7 @@ class PartitioningList(ListManager): if not end_value or self._validate_sector(start_sector, end_value): break - log(f'Invalid end sector entered: {start_sector}', fg='red', level=logging.INFO) + warn(f'Invalid end sector entered: {start_sector}') # override the default value with the user value if end_value: @@ -300,7 +299,7 @@ class PartitioningList(ListManager): if choice.value == Menu.no(): return [] - from ..user_interaction.disk_conf import suggest_single_disk_layout + from ..interactions.disk_conf import suggest_single_disk_layout device_modification = suggest_single_disk_layout(self._device) return device_modification.partitions diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py index a66e4e04..53458d2c 100644 --- a/archinstall/lib/exceptions.py +++ b/archinstall/lib/exceptions.py @@ -3,6 +3,7 @@ from typing import Optional, TYPE_CHECKING if TYPE_CHECKING: from .general import SysCommandWorker + class RequirementError(BaseException): pass @@ -15,10 +16,6 @@ class UnknownFilesystemFormat(BaseException): pass -class ProfileError(BaseException): - pass - - class SysCallError(BaseException): def __init__(self, message :str, exit_code :Optional[int] = None, worker :Optional['SysCommandWorker'] = None) -> None: super(SysCallError, self).__init__(message) @@ -27,22 +24,10 @@ class SysCallError(BaseException): self.worker = worker -class PermissionError(BaseException): - pass - - -class ProfileNotFound(BaseException): - pass - - class HardwareIncompatibilityError(BaseException): pass -class UserError(BaseException): - pass - - class ServiceException(BaseException): pass @@ -51,9 +36,5 @@ class PackageError(BaseException): pass -class TranslationError(BaseException): - pass - - class Deprecated(BaseException): - pass
\ No newline at end of file + pass diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index 997b7d67..777ee90e 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -1,8 +1,6 @@ from __future__ import annotations -import hashlib import json -import logging import os import secrets import shlex @@ -18,9 +16,10 @@ import urllib.error import pathlib from datetime import datetime, date from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING +from select import epoll, EPOLLIN, EPOLLHUP from .exceptions import RequirementError, SysCallError -from .output import log +from .output import debug, error, info from .storage import storage @@ -28,42 +27,6 @@ if TYPE_CHECKING: from .installer import Installer -if sys.platform == 'linux': - from select import epoll, EPOLLIN, EPOLLHUP -else: - import select - EPOLLIN = 0 - EPOLLHUP = 0 - - class epoll(): - """ #!if windows - Create a epoll() implementation that simulates the epoll() behavior. - This so that the rest of the code doesn't need to worry weither we're using select() or epoll(). - """ - def __init__(self) -> None: - self.sockets: Dict[str, Any] = {} - self.monitoring: Dict[int, Any] = {} - - def unregister(self, fileno :int, *args :List[Any], **kwargs :Dict[str, Any]) -> None: - try: - del(self.monitoring[fileno]) # noqa: E275 - except: - pass - - def register(self, fileno :int, *args :int, **kwargs :Dict[str, Any]) -> None: - self.monitoring[fileno] = True - - def poll(self, timeout: float = 0.05, *args :str, **kwargs :Dict[str, Any]) -> List[Any]: - try: - return [[fileno, 1] for fileno in select.select(list(self.monitoring.keys()), [], [], timeout)[0]] - except OSError: - return [] - - -def gen_uid(entropy_length :int = 256) -> str: - return hashlib.sha512(os.urandom(entropy_length)).hexdigest() - - def generate_password(length :int = 64) -> str: haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace return ''.join(secrets.choice(haystack) for i in range(length)) @@ -156,6 +119,7 @@ class JsonEncoder: else: return JsonEncoder._encode(obj) + class JSON(json.JSONEncoder, json.JSONDecoder): """ A safe JSON encoder that will omit private information in dicts (starting with !) @@ -166,6 +130,7 @@ class JSON(json.JSONEncoder, json.JSONDecoder): def encode(self, obj :Any) -> Any: return super(JSON, self).encode(self._encode(obj)) + class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder): """ UNSAFE_JSON will call/encode and keep private information in dicts (starting with !) @@ -269,7 +234,7 @@ class SysCommandWorker: sys.stdout.flush() if len(args) >= 2 and args[1]: - log(args[1], level=logging.DEBUG, fg='red') + debug(args[1]) if self.exit_code != 0: raise SysCallError( @@ -350,7 +315,7 @@ class SysCommandWorker: self.ended = time.time() break - if self.ended or (got_output is False and pid_exists(self.pid) is False): + if self.ended or (got_output is False and _pid_exists(self.pid) is False): self.ended = time.time() try: wait_status = os.waitpid(self.pid, 0)[1] @@ -396,15 +361,15 @@ class SysCommandWorker: pass except Exception as e: exception_type = type(e).__name__ - log(f"Unexpected {exception_type} occurred in {self.cmd}: {e}", level=logging.ERROR) + error(f"Unexpected {exception_type} occurred in {self.cmd}: {e}") raise e os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars}) if storage['arguments'].get('debug'): - log(f"Executing: {self.cmd}", level=logging.DEBUG) + debug(f"Executing: {self.cmd}") except FileNotFoundError: - log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red") + error(f"{self.cmd[0]} does not exist.") self.exit_code = 1 return False else: @@ -455,7 +420,7 @@ class SysCommand: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: - log(args[1], level=logging.ERROR, fg='red') + error(args[1]) def __iter__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> Iterator[bytes]: if self.session: @@ -535,22 +500,7 @@ class SysCommand: return None -def prerequisite_check() -> bool: - """ - This function is used as a safety check before - continuing with an installation. - - Could be anything from checking that /boot is big enough - to check if nvidia hardware exists when nvidia driver was chosen. - """ - - return True - -def reboot(): - SysCommand("/usr/bin/reboot") - - -def pid_exists(pid: int) -> bool: +def _pid_exists(pid: int) -> bool: try: return any(subprocess.check_output(['/usr/bin/ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip()) except subprocess.CalledProcessError: @@ -559,7 +509,7 @@ def pid_exists(pid: int) -> bool: def run_custom_user_commands(commands :List[str], installation :Installer) -> None: for index, command in enumerate(commands): - log(f'Executing custom command "{command}" ...', level=logging.INFO) + info(f'Executing custom command "{command}" ...') with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script: temp_script.write(command) @@ -568,6 +518,7 @@ def run_custom_user_commands(commands :List[str], installation :Installer) -> No os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh") + def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool : """ Function to load a stream (file (as name) or valid JSON string into an existing dictionary @@ -582,16 +533,16 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target try: with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: target.update(json.loads(response.read())) - except urllib.error.HTTPError as error: - log(f"Could not load {configuration_identifier} via {parsed_url} due to: {error}", level=logging.ERROR, fg="red") + except urllib.error.HTTPError as err: + error(f"Could not load {configuration_identifier} via {parsed_url} due to: {err}") return False else: if pathlib.Path(stream).exists(): try: with pathlib.Path(stream).open() as fh: target.update(json.load(fh)) - except Exception as error: - log(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {error}", level=logging.ERROR, fg="red") + except Exception as err: + error(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {err}") return False else: # NOTE: This is a rudimentary check if what we're trying parse is a dict structure. @@ -600,14 +551,15 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target try: target.update(json.loads(stream)) except Exception as e: - log(f" {configuration_identifier} Contains an invalid JSON format : {e}",level=logging.ERROR, fg="red") + error(f"{configuration_identifier} Contains an invalid JSON format: {e}") return False else: - log(f" {configuration_identifier} is neither a file nor is a JSON string:",level=logging.ERROR, fg="red") + error(f"{configuration_identifier} is neither a file nor is a JSON string") return False return True + def secret(x :str): """ return * with len equal to to the input string """ return '*' * len(x) diff --git a/archinstall/lib/global_menu.py b/archinstall/lib/global_menu.py index a969d93f..13595132 100644 --- a/archinstall/lib/global_menu.py +++ b/archinstall/lib/global_menu.py @@ -11,24 +11,24 @@ from .models.users import User from .output import FormattedOutput from .profile.profile_menu import ProfileConfiguration from .storage import storage -from .user_interaction import add_number_of_parrallel_downloads -from .user_interaction import ask_additional_packages_to_install -from .user_interaction import ask_for_additional_users -from .user_interaction import ask_for_audio_selection -from .user_interaction import ask_for_bootloader -from .user_interaction import ask_for_swap -from .user_interaction import ask_hostname -from .user_interaction import ask_ntp -from .user_interaction import ask_to_configure_network -from .user_interaction import get_password, ask_for_a_timezone -from .user_interaction import select_additional_repositories -from .user_interaction import select_kernel -from .user_interaction import select_language -from .user_interaction import select_locale_enc -from .user_interaction import select_locale_lang -from .user_interaction import select_mirror_regions -from .user_interaction.disk_conf import select_disk_config -from .user_interaction.save_conf import save_config +from .configuration import save_config +from .interactions import add_number_of_parrallel_downloads +from .interactions import ask_additional_packages_to_install +from .interactions import ask_for_additional_users +from .interactions import ask_for_audio_selection +from .interactions import ask_for_bootloader +from .interactions import ask_for_swap +from .interactions import ask_hostname +from .interactions import ask_to_configure_network +from .interactions import get_password, ask_for_a_timezone +from .interactions import select_additional_repositories +from .interactions import select_kernel +from .interactions import select_language +from .interactions import select_locale_enc +from .interactions import select_locale_lang +from .interactions import select_mirror_regions +from .interactions import ask_ntp +from .interactions.disk_conf import select_disk_config if TYPE_CHECKING: _: Any diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py index 3759725f..b95301f9 100644 --- a/archinstall/lib/hardware.py +++ b/archinstall/lib/hardware.py @@ -1,27 +1,12 @@ import os -import logging -from functools import partial +from functools import cached_property from pathlib import Path -from typing import Iterator, Optional, Dict +from typing import Optional, Dict from .general import SysCommand from .networking import list_interfaces, enrich_iface_types from .exceptions import SysCallError -from .output import log - -__packages__ = [ - "mesa", - "xf86-video-amdgpu", - "xf86-video-ati", - "xf86-video-nouveau", - "xf86-video-vmware", - "libva-mesa-driver", - "libva-intel-driver", - "intel-media-driver", - "vulkan-radeon", - "vulkan-intel", - "nvidia", -] +from .output import debug AVAILABLE_GFX_DRIVERS = { # Sub-dicts are layer-2 options to be selected @@ -62,136 +47,125 @@ AVAILABLE_GFX_DRIVERS = { } -def cpuinfo() -> Iterator[dict[str, str]]: - """ - Yields information about the CPUs of the system - """ - cpu_info_path = Path("/proc/cpuinfo") - cpu: Dict[str, str] = {} +class _SysInfo: + def __init__(self): + pass - with cpu_info_path.open() as file: - for line in file: - if not (line := line.strip()): - yield cpu - cpu = {} - continue - - key, value = line.split(":", maxsplit=1) - cpu[key.strip()] = value.strip() - - -def all_meminfo() -> Dict[str, int]: - """ - Returns a dict with memory info if called with no args - or the value of the given key of said dict. - """ - mem_info_path = Path("/proc/meminfo") - mem_info: Dict[str, int] = {} - - with mem_info_path.open() as file: - for line in file: - key, value = line.strip().split(':') - num = value.split()[0] - mem_info[key] = int(num) - - return mem_info - - -def meminfo_for_key(key: str) -> int: - info = all_meminfo() - return info[key] - - -def has_wifi() -> bool: - ifaces = list(list_interfaces().values()) - return 'WIRELESS' in enrich_iface_types(ifaces).values() - - -def has_cpu_vendor(vendor_id: str) -> bool: - return any(cpu.get("vendor_id") == vendor_id for cpu in cpuinfo()) - - -has_amd_cpu = partial(has_cpu_vendor, "AuthenticAMD") - - -has_intel_cpu = partial(has_cpu_vendor, "GenuineIntel") - - -def has_uefi() -> bool: - return os.path.isdir('/sys/firmware/efi') - - -def graphics_devices() -> dict: - cards = {} - for line in SysCommand("lspci"): - if b' VGA ' in line or b' 3D ' in line: - _, identifier = line.split(b': ', 1) - cards[identifier.strip().decode('UTF-8')] = line - return cards - - -def has_nvidia_graphics() -> bool: - return any('nvidia' in x.lower() for x in graphics_devices()) - - -def has_amd_graphics() -> bool: - return any('amd' in x.lower() for x in graphics_devices()) - - -def has_intel_graphics() -> bool: - return any('intel' in x.lower() for x in graphics_devices()) - - -def cpu_vendor() -> Optional[str]: - for cpu in cpuinfo(): - return cpu.get("vendor_id") - - return None - - -def cpu_model() -> Optional[str]: - for cpu in cpuinfo(): - return cpu.get("model name") - - return None - - -def sys_vendor() -> Optional[str]: - with open(f"/sys/devices/virtual/dmi/id/sys_vendor") as vendor: - return vendor.read().strip() - - -def product_name() -> Optional[str]: - with open(f"/sys/devices/virtual/dmi/id/product_name") as product: - return product.read().strip() - - -def mem_available() -> Optional[int]: - return meminfo_for_key('MemAvailable') - - -def mem_free() -> Optional[int]: - return meminfo_for_key('MemFree') - - -def mem_total() -> Optional[int]: - return meminfo_for_key('MemTotal') - - -def virtualization() -> Optional[str]: - try: - return str(SysCommand("systemd-detect-virt")).strip('\r\n') - except SysCallError as error: - log(f"Could not detect virtual system: {error}", level=logging.DEBUG) - - return None - - -def is_vm() -> bool: - try: - result = SysCommand("systemd-detect-virt") - return b"none" not in b"".join(result).lower() - except SysCallError as error: - log(f"System is not running in a VM: {error}", level=logging.DEBUG) - - return False + @cached_property + def cpu_info(self) -> Dict[str, str]: + """ + Returns system cpu information + """ + cpu_info_path = Path("/proc/cpuinfo") + cpu: Dict[str, str] = {} + + with cpu_info_path.open() as file: + for line in file: + if line := line.strip(): + key, value = line.split(":", maxsplit=1) + cpu[key.strip()] = value.strip() + + return cpu + + @cached_property + def mem_info(self) -> Dict[str, int]: + """ + Returns system memory information + """ + mem_info_path = Path("/proc/meminfo") + mem_info: Dict[str, int] = {} + + with mem_info_path.open() as file: + for line in file: + key, value = line.strip().split(':') + num = value.split()[0] + mem_info[key] = int(num) + + return mem_info + + def mem_info_by_key(self, key: str) -> int: + return self.mem_info[key] + + +_sys_info = _SysInfo() + + +class SysInfo: + @staticmethod + def has_wifi() -> bool: + ifaces = list(list_interfaces().values()) + return 'WIRELESS' in enrich_iface_types(ifaces).values() + + @staticmethod + def has_uefi() -> bool: + return os.path.isdir('/sys/firmware/efi') + + @staticmethod + def _graphics_devices() -> Dict[str, str]: + cards: Dict[str, str] = {} + for line in SysCommand("lspci"): + if b' VGA ' in line or b' 3D ' in line: + _, identifier = line.split(b': ', 1) + cards[identifier.strip().decode('UTF-8')] = str(line) + return cards + + @staticmethod + def has_nvidia_graphics() -> bool: + return any('nvidia' in x.lower() for x in SysInfo._graphics_devices()) + + @staticmethod + def has_amd_graphics() -> bool: + return any('amd' in x.lower() for x in SysInfo._graphics_devices()) + + @staticmethod + def has_intel_graphics() -> bool: + return any('intel' in x.lower() for x in SysInfo._graphics_devices()) + + @staticmethod + def cpu_vendor() -> Optional[str]: + return _sys_info.cpu_info.get('vendor_id', None) + + @staticmethod + def cpu_model() -> Optional[str]: + return _sys_info.cpu_info.get('model name', None) + + @staticmethod + def sys_vendor() -> str: + with open(f"/sys/devices/virtual/dmi/id/sys_vendor") as vendor: + return vendor.read().strip() + + @staticmethod + def product_name() -> str: + with open(f"/sys/devices/virtual/dmi/id/product_name") as product: + return product.read().strip() + + @staticmethod + def mem_available() -> int: + return _sys_info.mem_info_by_key('MemAvailable') + + @staticmethod + def mem_free() -> int: + return _sys_info.mem_info_by_key('MemFree') + + @staticmethod + def mem_total() -> int: + return _sys_info.mem_info_by_key('MemTotal') + + @staticmethod + def virtualization() -> Optional[str]: + try: + return str(SysCommand("systemd-detect-virt")).strip('\r\n') + except SysCallError as err: + debug(f"Could not detect virtual system: {err}") + + return None + + @staticmethod + def is_vm() -> bool: + try: + result = SysCommand("systemd-detect-virt") + return b"none" not in b"".join(result).lower() + except SysCallError as err: + debug(f"System is not running in a VM: {err}") + + return False diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 726ff3d0..3c427ab2 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -1,5 +1,4 @@ import glob -import logging import os import re import shlex @@ -12,17 +11,16 @@ from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, It from . import disk from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError from .general import SysCommand -from .hardware import has_uefi, is_vm, cpu_vendor -from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout +from .hardware import SysInfo +from .locale import verify_keyboard_layout, verify_x11_keyboard_layout from .luks import Luks2 from .mirrors import use_mirrors from .models.bootloader import Bootloader from .models.network_configuration import NetworkConfiguration from .models.users import User -from .output import log +from .output import log, error, info, warn, debug from .pacman import run_pacman from .plugins import plugins -from .services import service_state from .storage import storage if TYPE_CHECKING: @@ -41,28 +39,6 @@ def accessibility_tools_in_use() -> bool: class Installer: - """ - `Installer()` is the wrapper for most basic installation steps. - It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. - - :param partition: Requires a partition as the first argument, this is - so that the installer can mount to `mountpoint` and strap packages there. - :type partition: class:`archinstall.Partition` - - :param boot_partition: There's two reasons for needing a boot partition argument, - The first being so that `mkinitcpio` can place the `vmlinuz` kernel at the right place - during the `pacstrap` or `linux` and the base packages for a minimal installation. - The second being when :py:func:`~archinstall.Installer.add_bootloader` is called, - A `boot_partition` must be known to the installer before this is called. - :type boot_partition: class:`archinstall.Partition` - - :param profile: A profile to install, this is optional and can be called later manually. - This just simplifies the process by not having to call :py:func:`~archinstall.Installer.install_profile` later on. - :type profile: str, optional - - :param hostname: The given /etc/hostname for the machine. - :type hostname: str, optional - """ def __init__( self, target: Path, @@ -71,6 +47,10 @@ class Installer: base_packages: List[str] = [], kernels: Optional[List[str]] = None ): + """ + `Installer()` is the wrapper for most basic installation steps. + It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. + """ if not base_packages: base_packages = __packages__[:3] @@ -126,7 +106,7 @@ class Installer: def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: - log(exc_val, fg='red', level=logging.ERROR) + error(exc_val) self.sync_log_to_install_medium() @@ -137,48 +117,41 @@ class Installer: raise exc_val if not (missing_steps := self.post_install_check()): - self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO) + log('Installation completed without any errors. You may now reboot.', fg='green') self.sync_log_to_install_medium() return True else: - self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING) + warn('Some required steps were not successfully installed/configured before leaving the installer:') for step in missing_steps: - self.log(f' - {step}', fg='red', level=logging.WARNING) + warn(f' - {step}') - self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING) - self.log("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING) + warn(f"Detailed error logs can be found at: {storage['LOG_PATH']}") + warn("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues") self.sync_log_to_install_medium() return False - def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str): - """ - installer.log() wraps output.log() mainly to set a default log-level for this install session. - Any manual override can be done per log() call. - """ - log(*args, level=level, **kwargs) - def _verify_service_stop(self): """ Certain services might be running that affects the system during installation. One such service is "reflector.service" which updates /etc/pacman.d/mirrorlist We need to wait for it before we continue since we opted in to use a custom mirror/region. """ - log('Waiting for time sync (systemd-timesyncd.service) to complete.', level=logging.INFO) + info('Waiting for time sync (systemd-timesyncd.service) to complete.') while SysCommand('timedatectl show --property=NTPSynchronized --value').decode().rstrip() != 'yes': time.sleep(1) - log('Waiting for automatic mirror selection (reflector) to complete.', level=logging.INFO) - while service_state('reflector') not in ('dead', 'failed', 'exited'): + info('Waiting for automatic mirror selection (reflector) to complete.') + while self._service_state('reflector') not in ('dead', 'failed', 'exited'): time.sleep(1) - log('Waiting pacman-init.service to complete.', level=logging.INFO) - while service_state('pacman-init') not in ('dead', 'failed', 'exited'): + info('Waiting pacman-init.service to complete.') + while self._service_state('pacman-init') not in ('dead', 'failed', 'exited'): time.sleep(1) - log('Waiting Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.', level=logging.INFO) - while service_state('archlinux-keyring-wkd-sync') not in ('dead', 'failed', 'exited'): + info('Waiting Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.') + while self._service_state('archlinux-keyring-wkd-sync') not in ('dead', 'failed', 'exited'): time.sleep(1) def _verify_boot_part(self): @@ -204,7 +177,7 @@ class Installer: self._verify_service_stop() def mount_ordered_layout(self): - log('Mounting partitions in order', level=logging.INFO) + info('Mounting partitions in order') for mod in self._disk_config.device_modifications: # partitions have to mounted in the right order on btrfs the mountpoint will @@ -275,7 +248,7 @@ class Installer: ) if gen_enc_file and not part_mod.is_root(): - log(f'Creating key-file: {part_mod.dev_path}', level=logging.INFO) + info(f'Creating key-file: {part_mod.dev_path}') luks_handler.create_keyfile(self.target) if part_mod.is_root() and not gen_enc_file: @@ -384,25 +357,25 @@ class Installer: if (result := plugin.on_pacstrap(packages)): packages = result - self.log(f'Installing packages: {packages}', level=logging.INFO) + info(f'Installing packages: {packages}') # TODO: We technically only need to run the -Syy once. try: run_pacman('-Syy', default_cmd='/usr/bin/pacman') - except SysCallError as error: - self.log(f'Could not sync a new package database: {error}', level=logging.ERROR, fg="red") + except SysCallError as err: + error(f'Could not sync a new package database: {err}') if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): return self._pacstrap(packages) - raise RequirementError(f'Could not sync mirrors: {error}') + raise RequirementError(f'Could not sync mirrors: {err}') try: SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) return True - except SysCallError as error: - self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red") + except SysCallError as err: + error(f'Could not strap in packages: {err}') if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): @@ -420,12 +393,12 @@ class Installer: use_mirrors(mirrors, destination=destination) def genfstab(self, flags :str = '-pU'): - self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) + info(f"Updating {self.target}/etc/fstab") try: gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode() - except SysCallError as error: - raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}') + except SysCallError as err: + raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {err}') if not gen_fstab: raise RequirementError(f'Genrating fstab returned empty value') @@ -530,24 +503,20 @@ class Installer: return True else: - self.log( - f"Time zone {zone} does not exist, continuing with system default.", - level=logging.WARNING, - fg='red' - ) + warn(f'Time zone {zone} does not exist, continuing with system default') return False def activate_time_syncronization(self) -> None: - self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO) + info('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers') self.enable_service('systemd-timesyncd') def enable_espeakup(self) -> None: - self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO) + info('Enabling espeakup.service for speech synthesis (accessibility)') self.enable_service('espeakup') def enable_periodic_trim(self) -> None: - self.log("Enabling periodic TRIM") + info("Enabling periodic TRIM") # fstrim is owned by util-linux, a dependency of both base and systemd. self.enable_service("fstrim.timer") @@ -556,12 +525,12 @@ class Installer: services = [services] for service in services: - self.log(f'Enabling service {service}', level=logging.INFO) + info(f'Enabling service {service}') try: self.arch_chroot(f'systemctl enable {service}') - except SysCallError as error: - raise ServiceException(f"Unable to start service {service}: {error}") + except SysCallError as err: + raise ServiceException(f"Unable to start service {service}: {err}") for plugin in plugins.values(): if hasattr(plugin, 'on_service'): @@ -713,11 +682,11 @@ class Installer: if 'encrypt' not in self._hooks: self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') - if not has_uefi(): + if not SysInfo.has_uefi(): self.base_packages.append('grub') - if not is_vm(): - vendor = cpu_vendor() + if not SysInfo.is_vm(): + vendor = SysInfo.cpu_vendor() if vendor == "AuthenticAMD": self.base_packages.append("amd-ucode") if (ucode := Path(f"{self.target}/boot/amd-ucode.img")).exists(): @@ -727,21 +696,21 @@ class Installer: if (ucode := Path(f"{self.target}/boot/intel-ucode.img")).exists(): ucode.unlink() else: - self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode.", level=logging.DEBUG) + debug(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode") # Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set. # This action takes place on the host system as pacstrap copies over package repository lists. if multilib: - self.log("The multilib flag is set. This system will be installed with the multilib repository enabled.") + info("The multilib flag is set. This system will be installed with the multilib repository enabled.") self.enable_multilib_repository() else: - self.log("The multilib flag is not set. This system will be installed without multilib repositories enabled.") + info("The multilib flag is not set. This system will be installed without multilib repositories enabled.") if testing: - self.log("The testing flag is set. This system will be installed with testing repositories enabled.") + info("The testing flag is set. This system will be installed with testing repositories enabled.") self.enable_testing_repositories(multilib) else: - self.log("The testing flag is not set. This system will be installed without testing repositories enabled.") + info("The testing flag is not set. This system will be installed without testing repositories enabled.") self._pacstrap(self.base_packages) self.helper_flags['base-strapped'] = True @@ -773,7 +742,7 @@ class Installer: # Run registered post-install hooks for function in self.post_base_install: - self.log(f"Running post-installation hook: {function}", level=logging.INFO) + info(f"Running post-installation hook: {function}") function(self) for plugin in plugins.values(): @@ -782,7 +751,7 @@ class Installer: def setup_swap(self, kind :str = 'zram'): if kind == 'zram': - self.log(f"Setting up swap on zram") + info(f"Setting up swap on zram") self._pacstrap('zram-generator') # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 @@ -812,7 +781,7 @@ class Installer: def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): self._pacstrap('efibootmgr') - if not has_uefi(): + if not SysInfo.has_uefi(): raise HardwareIncompatibilityError # TODO: Ideally we would want to check if another config @@ -862,16 +831,18 @@ class Installer: entry.write(f'# Created on: {self.init_time}\n') entry.write(f'title Arch Linux ({kernel}{variant})\n') entry.write(f"linux /vmlinuz-{kernel}\n") - if not is_vm(): - vendor = cpu_vendor() + if not SysInfo.is_vm(): + vendor = SysInfo.cpu_vendor() if vendor == "AuthenticAMD": entry.write("initrd /amd-ucode.img\n") elif vendor == "GenuineIntel": entry.write("initrd /intel-ucode.img\n") else: - self.log( - f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", - level=logging.DEBUG) + debug( + f"Unknown CPU vendor '{vendor}' detected.", + "Archinstall won't add any ucode to systemd-boot config.", + ) + entry.write(f"initrd /initramfs-{kernel}{variant}.img\n") # blkid doesn't trigger on loopback devices really well, # so we'll use the old manual method until we get that sorted out. @@ -890,7 +861,7 @@ class Installer: if root_partition.fs_type.is_crypto(): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log('Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) + debug('Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}') kernel_options = f"options" @@ -905,7 +876,7 @@ class Installer: entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}') else: - log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) + debug(f'Identifying root partition by PARTUUID: {root_partition.partuuid}') entry.write(f'options root=PARTUUID={root_partition.partuuid} {options_entry}') self.helper_flags['bootloader'] = 'systemd' @@ -920,7 +891,7 @@ class Installer: _file = "/etc/default/grub" if root_partition.fs_type.is_crypto(): - log(f"Using UUID {root_partition.uuid} as encrypted root identifier", level=logging.DEBUG) + debug(f"Using UUID {root_partition.uuid} as encrypted root identifier") cmd_line_linux = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_partition.uuid}:cryptlvm rootfstype={root_partition.fs_type.value}\"/'" enable_cryptdisk = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'" @@ -931,9 +902,9 @@ class Installer: SysCommand(f"/usr/bin/arch-chroot {self.target} {cmd_line_linux} {_file}") - log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO) + info(f"GRUB boot partition: {boot_partition.dev_path}") - if has_uefi(): + if SysInfo.has_uefi(): self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? try: @@ -941,8 +912,8 @@ class Installer: except SysCallError: try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) - except SysCallError as error: - raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}") + except SysCallError as err: + raise DiskError(f"Could not install GRUB to {self.target}/boot: {err}") else: device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path) @@ -958,13 +929,13 @@ class Installer: f' --recheck {device.device_info.path}' SysCommand(cmd, peek_output=True) - except SysCallError as error: - raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}") + except SysCallError as err: + raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {err}") try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg') - except SysCallError as error: - raise DiskError(f"Could not configure GRUB: {error}") + except SysCallError as err: + raise DiskError(f"Could not configure GRUB: {err}") self.helper_flags['bootloader'] = "grub" @@ -975,7 +946,7 @@ class Installer: ): self._pacstrap('efibootmgr') - if not has_uefi(): + if not SysInfo.has_uefi(): raise HardwareIncompatibilityError # TODO: Ideally we would want to check if another config @@ -989,14 +960,14 @@ class Installer: kernel_parameters = [] - if not is_vm(): - vendor = cpu_vendor() + if not SysInfo.is_vm(): + vendor = SysInfo.cpu_vendor() if vendor == "AuthenticAMD": kernel_parameters.append("initrd=\\amd-ucode.img") elif vendor == "GenuineIntel": kernel_parameters.append("initrd=\\intel-ucode.img") else: - self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.", level=logging.DEBUG) + debug(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.") kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img") @@ -1006,10 +977,10 @@ class Installer: if root_partition.fs_type.is_crypto(): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) + debug(f'Identifying root partition by PARTUUID: {root_partition.partuuid}') kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}') else: - log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) + debug(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}') kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}') device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path) @@ -1060,7 +1031,7 @@ class Installer: if root_partition is None: raise ValueError(f'Could not detect root at mountpoint {self.target}') - self.log(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}', level=logging.INFO) + info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}') match bootloader: case Bootloader.Systemd: @@ -1078,7 +1049,7 @@ class Installer: self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username) def enable_sudo(self, entity: str, group :bool = False): - self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) + info(f'Enabling sudo permissions for {entity}') sudoers_dir = f"{self.target}/etc/sudoers.d" @@ -1127,11 +1098,11 @@ class Installer: handled_by_plugin = result if not handled_by_plugin: - self.log(f'Creating user {user}', level=logging.INFO) + info(f'Creating user {user}') try: SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}') - except SysCallError as error: - raise SystemError(f"Could not create user inside installation: {error}") + except SysCallError as err: + raise SystemError(f"Could not create user inside installation: {err}") for plugin in plugins.values(): if hasattr(plugin, 'on_user_created'): @@ -1149,7 +1120,7 @@ class Installer: self.helper_flags['user'] = True def user_set_pw(self, user :str, password :str) -> bool: - self.log(f'Setting password for {user}', level=logging.INFO) + info(f'Setting password for {user}') if user == 'root': # This means the root account isn't locked/disabled with * in /etc/passwd @@ -1166,7 +1137,7 @@ class Installer: return False def user_set_shell(self, user :str, shell :str) -> bool: - self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) + info(f'Setting shell for {user} to {shell}') try: SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"") @@ -1183,49 +1154,59 @@ class Installer: return False def set_keyboard_language(self, language: str) -> bool: - log(f"Setting keyboard language to {language}", level=logging.INFO) + info(f"Setting keyboard language to {language}") + if len(language.strip()): if not verify_keyboard_layout(language): - self.log(f"Invalid keyboard language specified: {language}", fg="red", level=logging.ERROR) + error(f"Invalid keyboard language specified: {language}") return False # In accordance with https://github.com/archlinux/archinstall/issues/107#issuecomment-841701968 # Setting an empty keymap first, allows the subsequent call to set layout for both console and x11. - from .systemd import Boot + from .boot import Boot with Boot(self) as session: os.system('/usr/bin/systemd-run --machine=archinstall --pty localectl set-keymap ""') try: session.SysCommand(["localectl", "set-keymap", language]) - except SysCallError as error: - raise ServiceException(f"Unable to set locale '{language}' for console: {error}") + except SysCallError as err: + raise ServiceException(f"Unable to set locale '{language}' for console: {err}") - self.log(f"Keyboard language for this installation is now set to: {language}") + info(f"Keyboard language for this installation is now set to: {language}") else: - self.log('Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO) + info('Keyboard language was not changed from default (no language specified)') return True def set_x11_keyboard_language(self, language: str) -> bool: - log(f"Setting x11 keyboard language to {language}", level=logging.INFO) """ A fallback function to set x11 layout specifically and separately from console layout. This isn't strictly necessary since .set_keyboard_language() does this as well. """ + info(f"Setting x11 keyboard language to {language}") + if len(language.strip()): if not verify_x11_keyboard_layout(language): - self.log(f"Invalid x11-keyboard language specified: {language}", fg="red", level=logging.ERROR) + error(f"Invalid x11-keyboard language specified: {language}") return False - from .systemd import Boot + from .boot import Boot with Boot(self) as session: session.SysCommand(["localectl", "set-x11-keymap", '""']) try: session.SysCommand(["localectl", "set-x11-keymap", language]) - except SysCallError as error: - raise ServiceException(f"Unable to set locale '{language}' for X11: {error}") + except SysCallError as err: + raise ServiceException(f"Unable to set locale '{language}' for X11: {err}") else: - self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO) + info(f'X11-Keyboard language was not changed from default (no language specified)') return True + + def _service_state(self, service_name: str) -> str: + if os.path.splitext(service_name)[1] != '.service': + service_name += '.service' # Just to be safe + + state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'})) + + return state.strip().decode('UTF-8') diff --git a/archinstall/lib/interactions/__init__.py b/archinstall/lib/interactions/__init__.py new file mode 100644 index 00000000..b5691a10 --- /dev/null +++ b/archinstall/lib/interactions/__init__.py @@ -0,0 +1,20 @@ +from .locale_conf import select_locale_lang, select_locale_enc +from .manage_users_conf import UserList, ask_for_additional_users +from .network_conf import ManualNetworkConfig, ask_to_configure_network +from .utils import get_password + +from .disk_conf import ( + select_devices, select_disk_config, get_default_partition_layout, + select_main_filesystem_format, suggest_single_disk_layout, + suggest_multi_disk_layout +) + +from .general_conf import ( + ask_ntp, ask_hostname, ask_for_a_timezone, ask_for_audio_selection, select_language, + select_mirror_regions, select_archinstall_language, ask_additional_packages_to_install, + add_number_of_parrallel_downloads, select_additional_repositories +) + +from .system_conf import ( + select_kernel, ask_for_bootloader, select_driver, ask_for_swap +) diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/interactions/disk_conf.py index a77e950a..78e4cff4 100644 --- a/archinstall/lib/user_interaction/disk_conf.py +++ b/archinstall/lib/interactions/disk_conf.py @@ -1,14 +1,15 @@ from __future__ import annotations -import logging from pathlib import Path -from typing import Any, TYPE_CHECKING, Optional, List, Tuple +from typing import Any, TYPE_CHECKING +from typing import Optional, List, Tuple from .. import disk -from ..hardware import has_uefi -from ..menu import Menu, MenuSelectionType, TableMenu -from ..output import FormattedOutput -from ..output import log +from ..hardware import SysInfo +from ..menu import Menu +from ..menu import TableMenu +from ..menu.menu import MenuSelectionType +from ..output import FormattedOutput, debug from ..utils.util import prompt_dir if TYPE_CHECKING: @@ -170,7 +171,7 @@ def select_disk_config( def _boot_partition() -> disk.PartitionModification: - if has_uefi(): + if SysInfo.has_uefi(): start = disk.Size(1, disk.Unit.MiB) size = disk.Size(512, disk.Unit.MiB) else: @@ -189,7 +190,7 @@ def _boot_partition() -> disk.PartitionModification: ) -def ask_for_main_filesystem_format(advanced_options=False) -> disk.FilesystemType: +def select_main_filesystem_format(advanced_options=False) -> disk.FilesystemType: options = { 'btrfs': disk.FilesystemType.Btrfs, 'ext4': disk.FilesystemType.Ext4, @@ -212,7 +213,7 @@ def suggest_single_disk_layout( separate_home: Optional[bool] = None ) -> disk.DeviceModification: if not filesystem_type: - filesystem_type = ask_for_main_filesystem_format(advanced_options) + filesystem_type = select_main_filesystem_format(advanced_options) min_size_to_allow_home_part = disk.Size(40, disk.Unit.GiB) root_partition_size = disk.Size(20, disk.Unit.GiB) @@ -258,7 +259,7 @@ def suggest_single_disk_layout( using_home_partition = False # root partition - start = disk.Size(513, disk.Unit.MiB) if has_uefi() else disk.Size(206, disk.Unit.MiB) + start = disk.Size(513, disk.Unit.MiB) if SysInfo.has_uefi() else disk.Size(206, disk.Unit.MiB) # Set a size for / (/root) if using_subvolumes or device_size_gib < min_size_to_allow_home_part or not using_home_partition: @@ -324,7 +325,7 @@ def suggest_multi_disk_layout( compression = False if not filesystem_type: - filesystem_type = ask_for_main_filesystem_format(advanced_options) + filesystem_type = select_main_filesystem_format(advanced_options) # find proper disk for /home possible_devices = list(filter(lambda x: x.device_info.total_size >= min_home_partition_size, devices)) @@ -353,9 +354,10 @@ def suggest_multi_disk_layout( compression = choice.value == Menu.yes() device_paths = ', '.join([str(d.device_info.path) for d in devices]) - log(f"Suggesting multi-disk-layout for devices: {device_paths}", level=logging.DEBUG) - log(f"/root: {root_device.device_info.path}", level=logging.DEBUG) - log(f"/home: {home_device.device_info.path}", level=logging.DEBUG) + + debug(f"Suggesting multi-disk-layout for devices: {device_paths}") + debug(f"/root: {root_device.device_info.path}") + debug(f"/home: {home_device.device_info.path}") root_device_modification = disk.DeviceModification(root_device, wipe=True) home_device_modification = disk.DeviceModification(home_device, wipe=True) @@ -368,7 +370,7 @@ def suggest_multi_disk_layout( root_partition = disk.PartitionModification( status=disk.ModificationStatus.Create, type=disk.PartitionType.Primary, - start=disk.Size(513, disk.Unit.MiB) if has_uefi() else disk.Size(206, disk.Unit.MiB), + start=disk.Size(513, disk.Unit.MiB) if SysInfo.has_uefi() else disk.Size(206, disk.Unit.MiB), length=disk.Size(100, disk.Unit.Percent, total_size=root_device.device_info.total_size), mountpoint=Path('/'), mount_options=['compress=zstd'] if compression else [], diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/interactions/general_conf.py index 9722dc4d..5fcfa633 100644 --- a/archinstall/lib/user_interaction/general_conf.py +++ b/archinstall/lib/interactions/general_conf.py @@ -1,13 +1,12 @@ from __future__ import annotations -import logging import pathlib from typing import List, Any, Optional, Dict, TYPE_CHECKING -from ..locale_helpers import list_keyboard_languages, list_timezones +from ..locale import list_keyboard_languages, list_timezones from ..menu import MenuSelectionType, Menu, TextInput from ..mirrors import list_mirrors -from ..output import log +from ..output import warn from ..packages.packages import validate_package_list from ..storage import storage from ..translationhandler import Language @@ -176,7 +175,7 @@ def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List valid, invalid = validate_package_list(packages) if invalid: - log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red') + warn(f"Some packages could not be found in the repository: {invalid}") packages = read_packages(valid) continue break diff --git a/archinstall/lib/user_interaction/locale_conf.py b/archinstall/lib/interactions/locale_conf.py index cdc3423a..de115202 100644 --- a/archinstall/lib/user_interaction/locale_conf.py +++ b/archinstall/lib/interactions/locale_conf.py @@ -1,8 +1,6 @@ -from __future__ import annotations - from typing import Any, TYPE_CHECKING, Optional -from ..locale_helpers import list_locales +from ..locale import list_locales from ..menu import Menu, MenuSelectionType if TYPE_CHECKING: diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/interactions/manage_users_conf.py index 879578da..879578da 100644 --- a/archinstall/lib/user_interaction/manage_users_conf.py +++ b/archinstall/lib/interactions/manage_users_conf.py diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/interactions/network_conf.py index b682c1d2..18a834a1 100644 --- a/archinstall/lib/user_interaction/network_conf.py +++ b/archinstall/lib/interactions/network_conf.py @@ -1,14 +1,13 @@ from __future__ import annotations import ipaddress -import logging from typing import Any, Optional, TYPE_CHECKING, List, Union, Dict from ..menu import MenuSelectionType, TextInput from ..models.network_configuration import NetworkConfiguration, NicType from ..networking import list_interfaces -from ..output import log, FormattedOutput +from ..output import FormattedOutput, warn from ..menu import ListManager, Menu if TYPE_CHECKING: @@ -91,7 +90,7 @@ class ManualNetworkConfig(ListManager): ipaddress.ip_interface(ip) break except ValueError: - log("You need to enter a valid IP in IP-config mode.", level=logging.WARNING, fg='red') + warn("You need to enter a valid IP in IP-config mode") # Implemented new check for correct gateway IP address gateway = None @@ -106,7 +105,7 @@ class ManualNetworkConfig(ListManager): ipaddress.ip_address(gateway) break except ValueError: - log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red') + warn("You need to enter a valid gateway (router) IP address") if edit_iface.dns: display_dns = ' '.join(edit_iface.dns) diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/interactions/system_conf.py index 3f57d0e7..bbcb5b23 100644 --- a/archinstall/lib/user_interaction/system_conf.py +++ b/archinstall/lib/interactions/system_conf.py @@ -2,7 +2,7 @@ from __future__ import annotations from typing import List, Any, Dict, TYPE_CHECKING, Optional -from ..hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics +from ..hardware import AVAILABLE_GFX_DRIVERS, SysInfo from ..menu import MenuSelectionType, Menu from ..models.bootloader import Bootloader @@ -41,7 +41,7 @@ def select_kernel(preset: List[str] = []) -> List[str]: def ask_for_bootloader(preset: Bootloader) -> Bootloader: # when the system only supports grub - if not has_uefi(): + if not SysInfo.has_uefi(): options = [Bootloader.Grub.value] default = Bootloader.Grub.value else: @@ -81,11 +81,11 @@ def select_driver(options: Dict[str, Any] = {}, current_value: Optional[str] = N if drivers: title = '' - if has_amd_graphics(): + if SysInfo.has_amd_graphics(): title += str(_('For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.')) + '\n' - if has_intel_graphics(): + if SysInfo.has_intel_graphics(): title += str(_('For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n')) - if has_nvidia_graphics(): + if SysInfo.has_nvidia_graphics(): title += str(_('For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n')) title += str(_('\nSelect a graphics driver or leave blank to install all open-source drivers')) diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/interactions/utils.py index 918945c0..f6b5b2d3 100644 --- a/archinstall/lib/user_interaction/utils.py +++ b/archinstall/lib/interactions/utils.py @@ -4,7 +4,7 @@ import getpass from typing import Any, Optional, TYPE_CHECKING from ..models import PasswordStrength -from ..output import log +from ..output import log, error if TYPE_CHECKING: _: Any @@ -26,7 +26,7 @@ def get_password(prompt: str = '') -> Optional[str]: passwd_verification = getpass.getpass(prompt=_('And one more time for verification: ')) if password != passwd_verification: - log(' * Passwords did not match * ', fg='red') + error(' * Passwords did not match * ') continue return password diff --git a/archinstall/lib/locale.py b/archinstall/lib/locale.py new file mode 100644 index 00000000..0a36c072 --- /dev/null +++ b/archinstall/lib/locale.py @@ -0,0 +1,68 @@ +from typing import Iterator, List + +from .exceptions import ServiceException, SysCallError +from .general import SysCommand +from .output import error + + +def list_keyboard_languages() -> Iterator[str]: + for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}): + yield line.decode('UTF-8').strip() + + +def list_locales() -> List[str]: + with open('/etc/locale.gen', 'r') as fp: + locales = [] + # before the list of locales begins there's an empty line with a '#' in front + # so we'll collect the localels from bottom up and halt when we're donw + entries = fp.readlines() + entries.reverse() + + for entry in entries: + text = entry.replace('#', '').strip() + if text == '': + break + locales.append(text) + + locales.reverse() + return locales + + +def list_x11_keyboard_languages() -> Iterator[str]: + for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}): + yield line.decode('UTF-8').strip() + + +def verify_keyboard_layout(layout :str) -> bool: + for language in list_keyboard_languages(): + if layout.lower() == language.lower(): + return True + return False + + +def verify_x11_keyboard_layout(layout :str) -> bool: + for language in list_x11_keyboard_languages(): + if layout.lower() == language.lower(): + return True + return False + + +def set_keyboard_language(locale :str) -> bool: + if len(locale.strip()): + if not verify_keyboard_layout(locale): + error(f"Invalid keyboard locale specified: {locale}") + return False + + try: + SysCommand(f'localectl set-keymap {locale}') + except SysCallError as err: + raise ServiceException(f"Unable to set locale '{locale}' for console: {err}") + + return True + + return False + + +def list_timezones() -> Iterator[str]: + for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}): + yield line.decode('UTF-8').strip() diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py deleted file mode 100644 index efb0365f..00000000 --- a/archinstall/lib/locale_helpers.py +++ /dev/null @@ -1,176 +0,0 @@ -import logging -from typing import Iterator, List, Callable, Optional - -from .exceptions import ServiceException, SysCallError -from .general import SysCommand -from .output import log -from .storage import storage - - -def list_keyboard_languages() -> Iterator[str]: - for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}): - yield line.decode('UTF-8').strip() - - -def list_locales() -> List[str]: - with open('/etc/locale.gen', 'r') as fp: - locales = [] - # before the list of locales begins there's an empty line with a '#' in front - # so we'll collect the localels from bottom up and halt when we're donw - entries = fp.readlines() - entries.reverse() - - for entry in entries: - text = entry.replace('#', '').strip() - if text == '': - break - locales.append(text) - - locales.reverse() - return locales - -def get_locale_mode_text(mode): - if mode == 'LC_ALL': - mode_text = "general (LC_ALL)" - elif mode == "LC_CTYPE": - mode_text = "Character set" - elif mode == "LC_NUMERIC": - mode_text = "Numeric values" - elif mode == "LC_TIME": - mode_text = "Time Values" - elif mode == "LC_COLLATE": - mode_text = "sort order" - elif mode == "LC_MESSAGES": - mode_text = "text messages" - else: - mode_text = "Unassigned" - return mode_text - - -def reset_cmd_locale(): - """ sets the cmd_locale to its saved default """ - storage['CMD_LOCALE'] = storage.get('CMD_LOCALE_DEFAULT',{}) - - -def unset_cmd_locale(): - """ archinstall will use the execution environment default """ - storage['CMD_LOCALE'] = {} - - -def set_cmd_locale( - general: Optional[str] = None, - charset :str = 'C', - numbers :str = 'C', - time :str = 'C', - collate :str = 'C', - messages :str = 'C' -): - """ - Set the cmd locale. - If the parameter general is specified, it takes precedence over the rest (might as well not exist) - The rest define some specific settings above the installed default language. If anyone of this parameters is none means the installation default - """ - installed_locales = list_installed_locales() - result = {} - if general: - if general in installed_locales: - storage['CMD_LOCALE'] = {'LC_ALL':general} - else: - log(f"{get_locale_mode_text('LC_ALL')} {general} is not installed. Defaulting to C",fg="yellow",level=logging.WARNING) - return - - if numbers: - if numbers in installed_locales: - result["LC_NUMERIC"] = numbers - else: - log(f"{get_locale_mode_text('LC_NUMERIC')} {numbers} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING) - if charset: - if charset in installed_locales: - result["LC_CTYPE"] = charset - else: - log(f"{get_locale_mode_text('LC_CTYPE')} {charset} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING) - if time: - if time in installed_locales: - result["LC_TIME"] = time - else: - log(f"{get_locale_mode_text('LC_TIME')} {time} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING) - if collate: - if collate in installed_locales: - result["LC_COLLATE"] = collate - else: - log(f"{get_locale_mode_text('LC_COLLATE')} {collate} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING) - if messages: - if messages in installed_locales: - result["LC_MESSAGES"] = messages - else: - log(f"{get_locale_mode_text('LC_MESSAGES')} {messages} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING) - storage['CMD_LOCALE'] = result - -def host_locale_environ(func :Callable): - """ decorator when we want a function executing in the host's locale environment """ - def wrapper(*args, **kwargs): - unset_cmd_locale() - result = func(*args,**kwargs) - reset_cmd_locale() - return result - return wrapper - -def c_locale_environ(func :Callable): - """ decorator when we want a function executing in the C locale environment """ - def wrapper(*args, **kwargs): - set_cmd_locale(general='C') - result = func(*args,**kwargs) - reset_cmd_locale() - return result - return wrapper - -def list_installed_locales() -> List[str]: - lista = [] - for line in SysCommand('locale -a'): - lista.append(line.decode('UTF-8').strip()) - return lista - -def list_x11_keyboard_languages() -> Iterator[str]: - for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}): - yield line.decode('UTF-8').strip() - - -def verify_keyboard_layout(layout :str) -> bool: - for language in list_keyboard_languages(): - if layout.lower() == language.lower(): - return True - return False - - -def verify_x11_keyboard_layout(layout :str) -> bool: - for language in list_x11_keyboard_languages(): - if layout.lower() == language.lower(): - return True - return False - - -def search_keyboard_layout(layout :str) -> Iterator[str]: - for language in list_keyboard_languages(): - if layout.lower() in language.lower(): - yield language - - -def set_keyboard_language(locale :str) -> bool: - if len(locale.strip()): - if not verify_keyboard_layout(locale): - log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR) - return False - - try: - SysCommand(f'localectl set-keymap {locale}') - except SysCallError as error: - raise ServiceException(f"Unable to set locale '{locale}' for console: {error}") - - return True - - return False - - -def list_timezones() -> Iterator[str]: - for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}): - yield line.decode('UTF-8').strip() diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 53a5e8d2..f9b09b53 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import shlex import time from dataclasses import dataclass @@ -9,7 +8,7 @@ from typing import Optional, List from . import disk from .general import SysCommand, generate_password, SysCommandWorker -from .output import log +from .output import info, debug from .exceptions import SysCallError, DiskError from .storage import storage @@ -61,7 +60,7 @@ class Luks2: iter_time: int = 10000, key_file: Optional[Path] = None ) -> Path: - log(f'Luks2 encrypting: {self.luks_dev_path}', level=logging.INFO) + info(f'Luks2 encrypting: {self.luks_dev_path}') byte_password = self._password_bytes() @@ -95,21 +94,21 @@ class Luks2: try: SysCommand(cryptsetup_args) break - except SysCallError as error: + except SysCallError as err: time.sleep(storage['DISK_TIMEOUTS']) if retry_attempt != storage['DISK_RETRY_ATTEMPTS'] - 1: continue - if error.exit_code == 1: - log(f'luks2 partition currently in use: {self.luks_dev_path}') - log('Attempting to unmount, crypt-close and trying encryption again') + if err.exit_code == 1: + info(f'luks2 partition currently in use: {self.luks_dev_path}') + info('Attempting to unmount, crypt-close and trying encryption again') self.lock() # Then try again to set up the crypt-device SysCommand(cryptsetup_args) else: - raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {error}') + raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}') return key_file @@ -119,7 +118,7 @@ class Luks2: try: return SysCommand(command).decode().strip() # type: ignore except SysCallError as err: - log(f'Unable to get UUID for Luks device: {self.luks_dev_path}', level=logging.INFO) + info(f'Unable to get UUID for Luks device: {self.luks_dev_path}') raise err def is_unlocked(self) -> bool: @@ -133,7 +132,7 @@ class Luks2: :param key_file: An alternative key file :type key_file: Path """ - log(f'Unlocking luks2 device: {self.luks_dev_path}', level=logging.DEBUG) + debug(f'Unlocking luks2 device: {self.luks_dev_path}') if not self.mapper_name: raise ValueError('mapper name missing') @@ -170,11 +169,11 @@ class Luks2: for child in lsblk_info.children: # Unmount the child location for mountpoint in child.mountpoints: - log(f'Unmounting {mountpoint}', level=logging.DEBUG) + debug(f'Unmounting {mountpoint}') disk.device_handler.umount(mountpoint, recursive=True) # And close it if possible. - log(f"Closing crypt device {child.name}", level=logging.DEBUG) + debug(f"Closing crypt device {child.name}") SysCommand(f"cryptsetup close {child.name}") self._mapper_dev = None @@ -194,10 +193,10 @@ class Luks2: if key_file.exists(): if not override: - log(f'Key file {key_file} already exists, keeping existing') + info(f'Key file {key_file} already exists, keeping existing') return else: - log(f'Key file {key_file} already exists, overriding') + info(f'Key file {key_file} already exists, overriding') key_file_path.mkdir(parents=True, exist_ok=True) @@ -210,7 +209,7 @@ class Luks2: self._crypttab(crypttab_path, key_file, options=["luks", "key-slot=1"]) def _add_key(self, key_file: Path): - log(f'Adding additional key-file {key_file}', level=logging.INFO) + info(f'Adding additional key-file {key_file}') command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}' worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'}) @@ -230,7 +229,7 @@ class Luks2: key_file: Path, options: List[str] ) -> None: - log(f'Adding crypttab entry for key {key_file}', level=logging.INFO) + info(f'Adding crypttab entry for key {key_file}') with open(crypttab_path, 'a') as crypttab: opt = ','.join(options) diff --git a/archinstall/lib/menu/abstract_menu.py b/archinstall/lib/menu/abstract_menu.py index e44d65a4..2bd56374 100644 --- a/archinstall/lib/menu/abstract_menu.py +++ b/archinstall/lib/menu/abstract_menu.py @@ -1,11 +1,10 @@ from __future__ import annotations -import logging from typing import Callable, Any, List, Iterator, Tuple, Optional, Dict, TYPE_CHECKING from .menu import Menu, MenuSelectionType -from ..locale_helpers import set_keyboard_language -from ..output import log +from ..locale import set_keyboard_language +from ..output import error from ..translationhandler import TranslationHandler, Language if TYPE_CHECKING: @@ -211,7 +210,7 @@ class AbstractMenu: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager # TODO: skip processing when it comes from a planified exit if len(args) >= 2 and args[1]: - log(args[1], level=logging.ERROR, fg='red') + error(args[1]) print(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues") raise args[1] @@ -483,7 +482,7 @@ class AbstractMenu: yield item def _select_archinstall_language(self, preset: Language) -> Language: - from ..user_interaction.general_conf import select_archinstall_language + from ..interactions.general_conf import select_archinstall_language language = select_archinstall_language(self.translation_handler.translated_languages, preset) self._translation_handler.activate(language) return language diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py index f3fdb85f..768dfe55 100644 --- a/archinstall/lib/menu/menu.py +++ b/archinstall/lib/menu/menu.py @@ -6,11 +6,8 @@ from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional, Callable from simple_term_menu import TerminalMenu # type: ignore from ..exceptions import RequirementError -from ..output import log +from ..output import debug -from collections.abc import Iterable -import sys -import logging if TYPE_CHECKING: _: Any @@ -127,33 +124,15 @@ class Menu(TerminalMenu): :param extra_bottom_space: Add an extra empty line at the end of the menu :type extra_bottom_space: bool """ - # we guarantee the inmutability of the options outside the class. - # an unknown number of iterables (.keys(),.values(),generator,...) can't be directly copied, in this case - # we recourse to make them lists before, but thru an exceptions - # this is the old code, which is not maintenable with more types - # options = copy(list(p_options) if isinstance(p_options,(type({}.keys()),type({}.values()))) else p_options) - # We check that the options are iterable. If not we abort. Else we copy them to lists - # it options is a dictionary we use the values as entries of the list - # if options is a string object, each character becomes an entry - # if options is a list, we implictily build a copy to maintain immutability - if not isinstance(p_options,Iterable): - log(f"Objects of type {type(p_options)} is not iterable, and are not supported at Menu",fg="red") - log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) - raise RequirementError("Menu() requires an iterable as option.") - - if isinstance(p_options,dict): + if isinstance(p_options, Dict): options = list(p_options.keys()) else: options = list(p_options) if not options: - log(" * Menu didn't find any options to choose from * ", fg='red') - log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) raise RequirementError('Menu.__init__() requires at least one option to proceed.') if any([o for o in options if not isinstance(o, str)]): - log(" * Menu options must be of type string * ", fg='red') - log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) raise RequirementError('Menu.__init__() requires the options to be of type string') if sort: @@ -343,7 +322,7 @@ class Menu(TerminalMenu): idx = self._menu_options.index(self._default_menu_value) indexes.append(idx) except (IndexError, ValueError): - log(f'Error finding index of {p}: {self._menu_options}', level=logging.DEBUG) + debug(f'Error finding index of {p}: {self._menu_options}') if len(indexes) == 0: indexes.append(0) diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index c6c5c8e4..62a0b081 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,4 +1,3 @@ -import logging import pathlib import urllib.error import urllib.request @@ -6,7 +5,7 @@ from typing import Union, Iterable, Dict, Any, List from dataclasses import dataclass from .general import SysCommand -from .output import log +from .output import info, warn from .exceptions import SysCallError from .storage import storage @@ -136,7 +135,7 @@ def use_mirrors( regions: Dict[str, Iterable[str]], destination: str = '/etc/pacman.d/mirrorlist' ): - log(f'A new package mirror-list has been created: {destination}', level=logging.INFO) + info(f'A new package mirror-list has been created: {destination}') with open(destination, 'w') as mirrorlist: for region, mirrors in regions.items(): for mirror in mirrors: @@ -170,7 +169,7 @@ def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: try: response = urllib.request.urlopen(url) except urllib.error.URLError as err: - log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange") + warn(f'Could not fetch an active mirror-list: {err}') return regions mirrorlist = response.read() diff --git a/archinstall/lib/models/bootloader.py b/archinstall/lib/models/bootloader.py index 38254c99..e21cda33 100644 --- a/archinstall/lib/models/bootloader.py +++ b/archinstall/lib/models/bootloader.py @@ -1,12 +1,11 @@ from __future__ import annotations -import logging import sys from enum import Enum from typing import List -from ..hardware import has_uefi -from ..output import log +from ..hardware import SysInfo +from ..output import warn class Bootloader(Enum): @@ -23,7 +22,7 @@ class Bootloader(Enum): @classmethod def get_default(cls) -> Bootloader: - if has_uefi(): + if SysInfo.has_uefi(): return Bootloader.Systemd else: return Bootloader.Grub @@ -35,6 +34,6 @@ class Bootloader(Enum): if bootloader not in cls.values(): values = ', '.join(cls.values()) - log(f'Invalid bootloader value "{bootloader}". Allowed values: {values}', level=logging.WARN) + warn(f'Invalid bootloader value "{bootloader}". Allowed values: {values}') sys.exit(1) return Bootloader(bootloader) diff --git a/archinstall/lib/models/network_configuration.py b/archinstall/lib/models/network_configuration.py index a8795fc1..93dd1c44 100644 --- a/archinstall/lib/models/network_configuration.py +++ b/archinstall/lib/models/network_configuration.py @@ -1,11 +1,10 @@ from __future__ import annotations -import logging from dataclasses import dataclass, field from enum import Enum from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING, Tuple -from ..output import log +from ..output import debug from ..profile import ProfileConfiguration if TYPE_CHECKING: @@ -138,8 +137,7 @@ class NetworkConfigurationHandler: iface = manual_config.get('iface', None) if iface is None: - log(_('No iface specified for manual configuration')) - exit(1) + raise ValueError('No iface specified for manual configuration') if manual_config.get('dhcp', False) or not any([manual_config.get(v, '') for v in ['ip', 'gateway', 'dns']]): configurations.append( @@ -148,8 +146,7 @@ class NetworkConfigurationHandler: else: ip = manual_config.get('ip', '') if not ip: - log(_('Manual nic configuration with no auto DHCP requires an IP address'), fg='red') - exit(1) + raise ValueError('Manual nic configuration with no auto DHCP requires an IP address') dns = manual_config.get('dns', []) if not isinstance(dns, list): @@ -173,8 +170,7 @@ class NetworkConfigurationHandler: return NicType(nic_type) except ValueError: options = [e.value for e in NicType] - log(_('Unknown nic type: {}. Possible values are {}').format(nic_type, options), fg='red') - exit(1) + raise ValueError(f'Unknown nic type: {nic_type}. Possible values are {options}') def parse_arguments(self, config: Any): if isinstance(config, list): # new data format @@ -187,4 +183,4 @@ class NetworkConfigurationHandler: else: # manual configuration settings self._configuration = self._parse_manual_config([config]) else: - log(f'Unable to parse network configuration: {config}', level=logging.DEBUG) + debug(f'Unable to parse network configuration: {config}') diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index b858daaf..6906c320 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -1,4 +1,3 @@ -import logging import os import socket import ssl @@ -8,18 +7,16 @@ from urllib.error import URLError from urllib.parse import urlencode from urllib.request import urlopen -from .exceptions import HardwareIncompatibilityError, SysCallError -from .general import SysCommand -from .output import log +from .exceptions import SysCallError +from .output import error, info, debug from .pacman import run_pacman -from .storage import storage def get_hw_addr(ifname :str) -> str: import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) - return ':'.join('%02x' % b for b in info[18:24]) + ret = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) + return ':'.join('%02x' % b for b in ret[18:24]) def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: @@ -36,26 +33,26 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: def check_mirror_reachable() -> bool: - log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO) + info("Testing connectivity to the Arch Linux mirrors...") try: run_pacman("-Sy") return True except SysCallError as err: if os.geteuid() != 0: - log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red") - log(f'exit_code: {err.exit_code}, Error: {err.message}', level=logging.DEBUG) + error("check_mirror_reachable() uses 'pacman -Sy' which requires root.") + debug(f'exit_code: {err.exit_code}, Error: {err.message}') return False def update_keyring() -> bool: - log("Updating archlinux-keyring ...", level=logging.INFO) + info("Updating archlinux-keyring ...") try: run_pacman("-Sy --noconfirm archlinux-keyring") return True except SysCallError: if os.geteuid() != 0: - log("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.", level=logging.ERROR, fg="red") + error("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.") return False @@ -80,38 +77,6 @@ def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str return result -def wireless_scan(interface :str) -> None: - interfaces = enrich_iface_types(list(list_interfaces().values())) - if interfaces[interface] != 'WIRELESS': - raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}") - - try: - SysCommand(f"iwctl station {interface} scan") - except SysCallError as error: - raise SystemError(f"Could not scan for wireless networks: {error}") - - if '_WIFI' not in storage: - storage['_WIFI'] = {} - if interface not in storage['_WIFI']: - storage['_WIFI'][interface] = {} - - storage['_WIFI'][interface]['scanning'] = True - - -# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25 -def get_wireless_networks(interface :str) -> None: - # TODO: Make this oneliner pritter to check if the interface is scanning or not. - # TODO: Rename this to list_wireless_networks() as it doesn't return anything - if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False: - import time - - wireless_scan(interface) - time.sleep(5) - - for line in SysCommand(f"iwctl station {interface} get-networks"): - print(line) - - def fetch_data_from_url(url: str, params: Optional[Dict] = None) -> str: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py index d65f835f..bd31b5b3 100644 --- a/archinstall/lib/output.py +++ b/archinstall/lib/output.py @@ -1,15 +1,16 @@ import logging import os import sys +from enum import Enum + from pathlib import Path from typing import Dict, Union, List, Any, Callable, Optional +from dataclasses import asdict, is_dataclass from .storage import storage -from dataclasses import asdict, is_dataclass class FormattedOutput: - @classmethod def values( cls, @@ -118,7 +119,7 @@ class FormattedOutput: class Journald: @staticmethod - def log(message :str, level :int = logging.DEBUG) -> None: + def log(message: str, level: int = logging.DEBUG) -> None: try: import systemd.journal # type: ignore except ModuleNotFoundError: @@ -134,16 +135,37 @@ class Journald: log_adapter.log(level, message) -# TODO: Replace log() for session based logging. -class SessionLogging: - def __init__(self): - pass +def check_log_permissions(): + filename = storage.get('LOG_FILE', None) + + if not filename: + return + + log_dir = storage.get('LOG_PATH', Path('./')) + absolute_logfile = log_dir / filename + + try: + log_dir.mkdir(exist_ok=True, parents=True) + with absolute_logfile.open('a') as fp: + fp.write('') + except PermissionError: + # Fallback to creating the log file in the current folder + fallback_log_file = Path('./').absolute() / filename + absolute_logfile = fallback_log_file + absolute_logfile.mkdir(exist_ok=True, parents=True) + storage['LOG_PATH'] = Path('./').absolute() + err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {fallback_log_file} instead." + warn(err_string) -# Found first reference here: https://stackoverflow.com/questions/7445658/how-to-detect-if-the-console-does-support-ansi-escape-codes-in-python -# And re-used this: https://github.com/django/django/blob/master/django/core/management/color.py#L12 -def supports_color() -> bool: + +def _supports_color() -> bool: """ + Found first reference here: + https://stackoverflow.com/questions/7445658/how-to-detect-if-the-console-does-support-ansi-escape-codes-in-python + And re-used this: + https://github.com/django/django/blob/master/django/core/management/color.py#L12 + Return True if the running system's terminal supports color, and False otherwise. """ @@ -154,13 +176,30 @@ def supports_color() -> bool: return supported_platform and is_a_tty -# Heavily influenced by: https://github.com/django/django/blob/ae8338daf34fd746771e0678081999b656177bae/django/utils/termcolors.py#L13 -# Color options here: https://askubuntu.com/questions/528928/how-to-do-underline-bold-italic-strikethrough-color-background-and-size-i -def stylize_output(text: str, *opts :str, **kwargs) -> str: +class Font(Enum): + bold = '1' + italic = '3' + underscore = '4' + blink = '5' + reverse = '7' + conceal = '8' + + +def _stylize_output( + text: str, + fg: str, + bg: Optional[str], + reset: bool, + font: List[Font] = [], +) -> str: """ + Heavily influenced by: + https://github.com/django/django/blob/ae8338daf34fd746771e0678081999b656177bae/django/utils/termcolors.py#L13 + Color options here: + https://askubuntu.com/questions/528928/how-to-do-underline-bold-italic-strikethrough-color-background-and-size-i + Adds styling to a text given a set of color arguments. """ - opt_dict = {'bold': '1', 'italic': '3', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'} colors = { 'black' : '0', 'red' : '1', @@ -178,65 +217,72 @@ def stylize_output(text: str, *opts :str, **kwargs) -> str: 'darkgray' : '8;5;240', 'lightgray' : '8;5;256' } + foreground = {key: f'3{colors[key]}' for key in colors} background = {key: f'4{colors[key]}' for key in colors} - reset = '0' - code_list = [] - if text == '' and len(opts) == 1 and opts[0] == 'reset': - return '\x1b[%sm' % reset - for k, v in kwargs.items(): - if k == 'fg': - code_list.append(foreground[str(v)]) - elif k == 'bg': - code_list.append(background[str(v)]) + if text == '' and reset: + return '\x1b[%sm' % '0' + + code_list.append(foreground[str(fg)]) + + if bg: + code_list.append(background[str(bg)]) + + for o in font: + code_list.append(o.value) + + ansi = ';'.join(code_list) + + return f'\033[{ansi}m{text}\033[0m' + - for o in opts: - if o in opt_dict: - code_list.append(opt_dict[o]) +def info(*msgs: str): + log(*msgs, level=logging.INFO) - if 'noreset' not in opts: - text = '%s\x1b[%sm' % (text or '', reset) - return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '') +def debug(*msgs: str): + log(*msgs, level=logging.DEBUG) -def log(*args :str, **kwargs :Union[str, int, Dict[str, Union[str, int]]]) -> None: - string = orig_string = ' '.join([str(x) for x in args]) +def error(*msgs: str): + log(*msgs, level=logging.ERROR, fg='red') + + +def warn(*msgs: str): + log(*msgs, level=logging.WARNING, fg='yellow') + + +def log( + *msgs: str, + level: int = logging.INFO, + fg: str = 'white', + bg: Optional[str] = None, + reset: bool = False, + font: List[Font] = [] +): + text = orig_string = ' '.join([str(x) for x in msgs]) # Attempt to colorize the output if supported # Insert default colors and override with **kwargs - if supports_color(): - kwargs = {'fg': 'white', **kwargs} - string = stylize_output(string, **kwargs) + if _supports_color(): + text = _stylize_output(text, fg, bg, reset, font) # If a logfile is defined in storage, # we use that one to output everything if filename := storage.get('LOG_FILE', None): - absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) + log_dir = storage.get('LOG_PATH', Path('./')) + absolute_logfile = log_dir / filename - try: - Path(absolute_logfile).parents[0].mkdir(exist_ok=True, parents=True) - with open(absolute_logfile, 'a') as log_file: - log_file.write("") - except PermissionError: - # Fallback to creating the log file in the current folder - err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {Path('./').absolute() / filename} instead." - absolute_logfile = Path('./').absolute() / filename - absolute_logfile.parents[0].mkdir(exist_ok=True) - absolute_logfile = str(absolute_logfile) - storage['LOG_PATH'] = './' - log(err_string, fg="red") - - with open(absolute_logfile, 'a') as log_file: - log_file.write(f"{orig_string}\n") - - Journald.log(string, level=int(str(kwargs.get('level', logging.INFO)))) + with open(absolute_logfile, 'a') as fp: + fp.write(f"{orig_string}\n") + + Journald.log(text, level=level) # Finally, print the log unless we skipped it based on level. # We use sys.stdout.write()+flush() instead of print() to try and # fix issue #94 - if kwargs.get('level', logging.INFO) != logging.DEBUG or storage.get('arguments', {}).get('verbose', False): - sys.stdout.write(f"{string}\n") + if level != logging.DEBUG or storage.get('arguments', {}).get('verbose', False): + sys.stdout.write(f"{text}\n") sys.stdout.flush() diff --git a/archinstall/lib/pacman.py b/archinstall/lib/pacman.py index 0dfd5afa..f5514f05 100644 --- a/archinstall/lib/pacman.py +++ b/archinstall/lib/pacman.py @@ -1,10 +1,9 @@ -import logging import pathlib import time from typing import TYPE_CHECKING, Any from .general import SysCommand -from .output import log +from .output import warn, error if TYPE_CHECKING: _: Any @@ -19,14 +18,14 @@ def run_pacman(args :str, default_cmd :str = 'pacman') -> SysCommand: pacman_db_lock = pathlib.Path('/var/lib/pacman/db.lck') if pacman_db_lock.exists(): - log(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.'), level=logging.WARNING, fg="red") + warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.')) started = time.time() while pacman_db_lock.exists(): time.sleep(0.25) if time.time() - started > (60 * 10): - log(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.'), level=logging.WARNING, fg="red") + error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.')) exit(1) return SysCommand(f'{default_cmd} {args}') diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index b1ece04f..4ccb0666 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -1,6 +1,5 @@ import hashlib import importlib -import logging import os import sys import urllib.parse @@ -9,7 +8,7 @@ from importlib import metadata from pathlib import Path from typing import Optional, List -from .output import log +from .output import error, info, warn from .storage import storage plugins = {} @@ -24,11 +23,13 @@ for plugin_definition in metadata.entry_points().select(group='archinstall.plugi try: plugins[plugin_definition.name] = plugin_entrypoint() except Exception as err: - log(f'Error: {err}', level=logging.ERROR) - log(f"The above error was detected when loading the plugin: {plugin_definition}", fg="red", level=logging.ERROR) + error( + f'Error: {err}', + f"The above error was detected when loading the plugin: {plugin_definition}" + ) -def localize_path(path: Path) -> Path: +def _localize_path(path: Path) -> Path: """ Support structures for load_plugin() """ @@ -45,7 +46,7 @@ def localize_path(path: Path) -> Path: return path -def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str]: +def _import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str]: if not namespace: namespace = os.path.basename(path) @@ -61,8 +62,10 @@ def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str return namespace except Exception as err: - log(f'Error: {err}', level=logging.ERROR) - log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR) + error( + f'Error: {err}', + f"The above error was detected when loading the plugin: {path}" + ) try: del sys.modules[namespace] @@ -72,7 +75,7 @@ def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str return namespace -def find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]: +def _find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]: indices = [idx for idx, elem in enumerate(haystack) if elem == needle] if n <= len(indices): return indices[n - 1] @@ -82,34 +85,36 @@ def find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]: def load_plugin(path: Path): namespace: Optional[str] = None parsed_url = urllib.parse.urlparse(str(path)) - log(f"Loading plugin from url {parsed_url}.", level=logging.INFO) + info(f"Loading plugin from url {parsed_url}") # The Profile was not a direct match on a remote URL if not parsed_url.scheme: # Path was not found in any known examples, check if it's an absolute path if os.path.isfile(path): - namespace = import_via_path(path) + namespace = _import_via_path(path) elif parsed_url.scheme in ('https', 'http'): - localized = localize_path(path) - namespace = import_via_path(localized) + localized = _localize_path(path) + namespace = _import_via_path(localized) if namespace and namespace in sys.modules: # Version dependency via __archinstall__version__ variable (if present) in the plugin # Any errors in version inconsistency will be handled through normal error handling if not defined. if hasattr(sys.modules[namespace], '__archinstall__version__'): - archinstall_major_and_minor_version = float(storage['__version__'][:find_nth(storage['__version__'], '.', 2)]) + archinstall_major_and_minor_version = float(storage['__version__'][:_find_nth(storage['__version__'], '.', 2)]) if sys.modules[namespace].__archinstall__version__ < archinstall_major_and_minor_version: - log(f"Plugin {sys.modules[namespace]} does not support the current Archinstall version.", fg="red", level=logging.ERROR) + error(f"Plugin {sys.modules[namespace]} does not support the current Archinstall version.") # Locate the plugin entry-point called Plugin() # This in accordance with the entry_points() from setup.cfg above if hasattr(sys.modules[namespace], 'Plugin'): try: plugins[namespace] = sys.modules[namespace].Plugin() - log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO) + info(f"Plugin {plugins[namespace]} has been loaded.") except Exception as err: - log(f'Error: {err}', level=logging.ERROR) - log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR) + error( + f'Error: {err}', + f"The above error was detected when initiating the plugin: {path}" + ) else: - log(f"Plugin '{path}' is missing a valid entry-point or is corrupt.", fg="yellow", level=logging.WARNING) + warn(f"Plugin '{path}' is missing a valid entry-point or is corrupt.") diff --git a/archinstall/lib/profile/profile_menu.py b/archinstall/lib/profile/profile_menu.py index 6462685a..213466a6 100644 --- a/archinstall/lib/profile/profile_menu.py +++ b/archinstall/lib/profile/profile_menu.py @@ -6,7 +6,7 @@ from archinstall.default_profiles.profile import Profile, GreeterType from .profile_model import ProfileConfiguration from ..hardware import AVAILABLE_GFX_DRIVERS from ..menu import Menu, MenuSelectionType, AbstractSubMenu, Selector -from ..user_interaction.system_conf import select_driver +from ..interactions.system_conf import select_driver if TYPE_CHECKING: _: Any diff --git a/archinstall/lib/profile/profiles_handler.py b/archinstall/lib/profile/profiles_handler.py index 6ed95f8e..16fef251 100644 --- a/archinstall/lib/profile/profiles_handler.py +++ b/archinstall/lib/profile/profiles_handler.py @@ -1,7 +1,6 @@ from __future__ import annotations import importlib.util -import logging import sys from collections import Counter from functools import cached_property @@ -15,7 +14,7 @@ from .profile_model import ProfileConfiguration from ..hardware import AVAILABLE_GFX_DRIVERS from ..menu import MenuSelectionType, Menu, MenuSelection from ..networking import list_interfaces, fetch_data_from_url -from ..output import log +from ..output import error, debug, info, warn from ..storage import storage if TYPE_CHECKING: @@ -106,7 +105,7 @@ class ProfileHandler: invalid = ', '.join([k for k, v in resolved.items() if v is None]) if invalid: - log(f'No profile definition found: {invalid}') + info(f'No profile definition found: {invalid}') custom_settings = profile_config.get('custom_settings', {}) for profile in valid: @@ -216,7 +215,7 @@ class ProfileHandler: install_session.add_additional_packages(additional_pkg) except Exception as err: - log(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}", level=logging.WARNING, fg="yellow") + warn(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}") # Prep didn't run, so there's no driver to install install_session.add_additional_packages(['xorg-server', 'xorg-xinit']) @@ -250,7 +249,7 @@ class ProfileHandler: self.add_custom_profiles(profiles) except ValueError: err = str(_('Unable to fetch profile from specified url: {}')).format(url) - log(err, level=logging.ERROR, fg="red") + error(err) def _load_profile_class(self, module: ModuleType) -> List[Profile]: """ @@ -264,7 +263,7 @@ class ProfileHandler: if isinstance(cls_, Profile): profiles.append(cls_) except Exception: - log(f'Cannot import {module}, it does not appear to be a Profile class', level=logging.DEBUG) + debug(f'Cannot import {module}, it does not appear to be a Profile class') return profiles @@ -278,7 +277,7 @@ class ProfileHandler: if len(duplicates) > 0: err = str(_('Profiles must have unique name, but profile definitions with duplicate name found: {}')).format(duplicates[0][0]) - log(err, level=logging.ERROR, fg="red") + error(err) sys.exit(1) def _is_legacy(self, file: Path) -> bool: @@ -297,15 +296,15 @@ class ProfileHandler: Process a file for profile definitions """ if self._is_legacy(file): - log(f'Cannot import {file} because it is no longer supported, please use the new profile format') + info(f'Cannot import {file} because it is no longer supported, please use the new profile format') return [] if not file.is_file(): - log(f'Cannot find profile file {file}') + info(f'Cannot find profile file {file}') return [] name = file.name.removesuffix(file.suffix) - log(f'Importing profile: {file}', level=logging.DEBUG) + debug(f'Importing profile: {file}') try: spec = importlib.util.spec_from_file_location(name, file) @@ -315,7 +314,7 @@ class ProfileHandler: spec.loader.exec_module(imported) return self._load_profile_class(imported) except Exception as e: - log(f'Unable to parse file {file}: {e}', level=logging.ERROR) + error(f'Unable to parse file {file}: {e}') return [] diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py deleted file mode 100644 index b177052b..00000000 --- a/archinstall/lib/services.py +++ /dev/null @@ -1,11 +0,0 @@ -import os -from .general import SysCommand - - -def service_state(service_name: str) -> str: - if os.path.splitext(service_name)[1] != '.service': - service_name += '.service' # Just to be safe - - state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'})) - - return state.strip().decode('UTF-8') diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py index 5a54d816..2f256e5d 100644 --- a/archinstall/lib/storage.py +++ b/archinstall/lib/storage.py @@ -11,8 +11,8 @@ from pathlib import Path storage: Dict[str, Any] = { 'PROFILE': Path(__file__).parent.parent.joinpath('default_profiles'), - 'LOG_PATH': '/var/log/archinstall', - 'LOG_FILE': 'install.log', + 'LOG_PATH': Path('/var/log/archinstall'), + 'LOG_FILE': Path('install.log'), 'MOUNT_POINT': Path('/mnt/archinstall'), 'ENC_IDENTIFIER': 'ainst', 'DISK_TIMEOUTS' : 1, # seconds diff --git a/archinstall/lib/translationhandler.py b/archinstall/lib/translationhandler.py index 0d74f974..5f0f0695 100644 --- a/archinstall/lib/translationhandler.py +++ b/archinstall/lib/translationhandler.py @@ -1,14 +1,14 @@ from __future__ import annotations import json -import logging import os import gettext from dataclasses import dataclass from pathlib import Path from typing import List, Dict, Any, TYPE_CHECKING, Optional -from .exceptions import TranslationError + +from .output import error, debug if TYPE_CHECKING: _: Any @@ -80,8 +80,8 @@ class TranslationHandler: language = Language(abbr, lang, translation, percent, translated_lang) languages.append(language) - except FileNotFoundError as error: - raise TranslationError(f"Could not locate language file for '{lang}': {error}") + except FileNotFoundError as err: + raise FileNotFoundError(f"Could not locate language file for '{lang}': {err}") return languages @@ -89,12 +89,12 @@ class TranslationHandler: """ Set the provided font as the new terminal font """ - from .general import SysCommand, log + from .general import SysCommand try: - log(f'Setting font: {font}', level=logging.DEBUG) + debug(f'Setting font: {font}') SysCommand(f'setfont {font}') except Exception: - log(f'Unable to set font {font}', level=logging.ERROR) + error(f'Unable to set font {font}') def _load_language_mappings(self) -> List[Dict[str, Any]]: """ diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py deleted file mode 100644 index 5ee89de0..00000000 --- a/archinstall/lib/user_interaction/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from .manage_users_conf import ask_for_additional_users -from .locale_conf import select_locale_lang, select_locale_enc -from .system_conf import select_kernel, select_driver, ask_for_bootloader, ask_for_swap -from .network_conf import ask_to_configure_network -from .general_conf import ( - ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions, - select_archinstall_language, ask_additional_packages_to_install, - select_additional_repositories, ask_hostname, add_number_of_parrallel_downloads -) -from .utils import get_password diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py deleted file mode 100644 index e05b9afe..00000000 --- a/archinstall/lib/user_interaction/save_conf.py +++ /dev/null @@ -1,113 +0,0 @@ -from __future__ import annotations - -import logging - -from pathlib import Path -from typing import Any, Dict, TYPE_CHECKING - -from ..general import SysCommand -from ..menu import Menu -from ..menu.menu import MenuSelectionType -from ..output import log -from ..configuration import ConfigurationOutput - -if TYPE_CHECKING: - _: Any - - -def save_config(config: Dict): - def preview(selection: str): - if options['user_config'] == selection: - serialized = config_output.user_config_to_json() - return f'{config_output.user_configuration_file}\n{serialized}' - elif options['user_creds'] == selection: - if maybe_serial := config_output.user_credentials_to_json(): - return f'{config_output.user_credentials_file}\n{maybe_serial}' - else: - return str(_('No configuration')) - elif options['all'] == selection: - output = f'{config_output.user_configuration_file}\n' - if config_output.user_credentials_to_json(): - output += f'{config_output.user_credentials_file}\n' - return output[:-1] - return None - - config_output = ConfigurationOutput(config) - - options = { - 'user_config': str(_('Save user configuration')), - 'user_creds': str(_('Save user credentials')), - 'disk_layout': str(_('Save disk layout')), - 'all': str(_('Save all')) - } - - choice = Menu( - _('Choose which configuration to save'), - list(options.values()), - sort=False, - skip=True, - preview_size=0.75, - preview_command=preview - ).run() - - if choice.type_ == MenuSelectionType.Skip: - return - - save_config_value = choice.single_value - saving_key = [k for k, v in options.items() if v == save_config_value][0] - - dirs_to_exclude = [ - '/bin', - '/dev', - '/lib', - '/lib64', - '/lost+found', - '/opt', - '/proc', - '/run', - '/sbin', - '/srv', - '/sys', - '/usr', - '/var', - ] - - log('Ignore configuration option folders: ' + ','.join(dirs_to_exclude), level=logging.DEBUG) - log(_('Finding possible directories to save configuration files ...'), level=logging.INFO) - - find_exclude = '-path ' + ' -prune -o -path '.join(dirs_to_exclude) + ' -prune ' - file_picker_command = f'find / {find_exclude} -o -type d -print0' - - directories = SysCommand(file_picker_command).decode() - - if directories is None: - raise ValueError('Failed to retrieve possible configuration directories') - - possible_save_dirs = list(filter(None, directories.split('\x00'))) - - selection = Menu( - _('Select directory (or directories) for saving configuration files'), - possible_save_dirs, - multi=True, - skip=True, - allow_reset=False, - ).run() - - match selection.type_: - case MenuSelectionType.Skip: - return - - save_dirs = selection.multi_value - - log(f'Saving {saving_key} configuration files to {save_dirs}', level=logging.DEBUG) - - if save_dirs is not None: - for save_dir_str in save_dirs: - save_dir = Path(save_dir_str) - if options['user_config'] == save_config_value: - config_output.save_user_config(save_dir) - elif options['user_creds'] == save_config_value: - config_output.save_user_creds(save_dir) - elif options['all'] == save_config_value: - config_output.save_user_config(save_dir) - config_output.save_user_creds(save_dir) diff --git a/archinstall/lib/utils/util.py b/archinstall/lib/utils/util.py index ded480ae..34716f4a 100644 --- a/archinstall/lib/utils/util.py +++ b/archinstall/lib/utils/util.py @@ -1,7 +1,7 @@ from pathlib import Path from typing import Any, TYPE_CHECKING, Optional -from ..output import log +from ..output import info if TYPE_CHECKING: _: Any @@ -16,7 +16,7 @@ def prompt_dir(text: str, header: Optional[str] = None) -> Path: dest_path = Path(path) if dest_path.exists() and dest_path.is_dir(): return dest_path - log(_('Not a valid directory: {}').format(dest_path), fg='red') + info(_('Not a valid directory: {}').format(dest_path)) def is_subpath(first: Path, second: Path): |