Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
authorDaniel Girtler <blackrabbit256@gmail.com>2023-05-12 02:30:09 +1000
committerGitHub <noreply@github.com>2023-05-11 18:30:09 +0200
commit89cefb9a1c7d4c4968e7d8645149078e601c9d1c (patch)
tree12c84bdcef1b0ef3f8a21977e25c7f0f89388138 /archinstall/lib
parent6e6b850a8f687b193172aaa321d49bd2956c1d4f (diff)
Cleanup imports and unused code (#1801)
* Cleanup imports and unused code * Update build check * Keep deprecation exception * Simplify logging --------- Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/boot.py (renamed from archinstall/lib/systemd.py)13
-rw-r--r--archinstall/lib/configuration.py114
-rw-r--r--archinstall/lib/disk/device_handler.py99
-rw-r--r--archinstall/lib/disk/device_model.py50
-rw-r--r--archinstall/lib/disk/encryption_menu.py2
-rw-r--r--archinstall/lib/disk/fido.py11
-rw-r--r--archinstall/lib/disk/filesystem.py11
-rw-r--r--archinstall/lib/disk/partitioning_menu.py9
-rw-r--r--archinstall/lib/exceptions.py23
-rw-r--r--archinstall/lib/general.py88
-rw-r--r--archinstall/lib/global_menu.py36
-rw-r--r--archinstall/lib/hardware.py274
-rw-r--r--archinstall/lib/installer.py225
-rw-r--r--archinstall/lib/interactions/__init__.py20
-rw-r--r--archinstall/lib/interactions/disk_conf.py (renamed from archinstall/lib/user_interaction/disk_conf.py)32
-rw-r--r--archinstall/lib/interactions/general_conf.py (renamed from archinstall/lib/user_interaction/general_conf.py)7
-rw-r--r--archinstall/lib/interactions/locale_conf.py (renamed from archinstall/lib/user_interaction/locale_conf.py)4
-rw-r--r--archinstall/lib/interactions/manage_users_conf.py (renamed from archinstall/lib/user_interaction/manage_users_conf.py)0
-rw-r--r--archinstall/lib/interactions/network_conf.py (renamed from archinstall/lib/user_interaction/network_conf.py)7
-rw-r--r--archinstall/lib/interactions/system_conf.py (renamed from archinstall/lib/user_interaction/system_conf.py)10
-rw-r--r--archinstall/lib/interactions/utils.py (renamed from archinstall/lib/user_interaction/utils.py)4
-rw-r--r--archinstall/lib/locale.py68
-rw-r--r--archinstall/lib/locale_helpers.py176
-rw-r--r--archinstall/lib/luks.py31
-rw-r--r--archinstall/lib/menu/abstract_menu.py9
-rw-r--r--archinstall/lib/menu/menu.py27
-rw-r--r--archinstall/lib/mirrors.py7
-rw-r--r--archinstall/lib/models/bootloader.py9
-rw-r--r--archinstall/lib/models/network_configuration.py14
-rw-r--r--archinstall/lib/networking.py53
-rw-r--r--archinstall/lib/output.py154
-rw-r--r--archinstall/lib/pacman.py7
-rw-r--r--archinstall/lib/plugins.py43
-rw-r--r--archinstall/lib/profile/profile_menu.py2
-rw-r--r--archinstall/lib/profile/profiles_handler.py21
-rw-r--r--archinstall/lib/services.py11
-rw-r--r--archinstall/lib/storage.py4
-rw-r--r--archinstall/lib/translationhandler.py14
-rw-r--r--archinstall/lib/user_interaction/__init__.py10
-rw-r--r--archinstall/lib/user_interaction/save_conf.py113
-rw-r--r--archinstall/lib/utils/util.py4
41 files changed, 778 insertions, 1038 deletions
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/boot.py
index 6ccbc5f6..62c50df3 100644
--- a/archinstall/lib/systemd.py
+++ b/archinstall/lib/boot.py
@@ -1,10 +1,9 @@
-import logging
import time
from typing import Iterator, Optional
from .exceptions import SysCallError
from .general import SysCommand, SysCommandWorker, locate_binary
from .installer import Installer
-from .output import log
+from .output import error
from .storage import storage
@@ -48,16 +47,18 @@ class Boot:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
- log(args[1], level=logging.ERROR, fg='red')
- log(f"The error above occurred in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red")
+ error(
+ args[1],
+ f"The error above occurred in a temporary boot-up of the installation {self.instance}"
+ )
shutdown = None
shutdown_exit_code: Optional[int] = -1
try:
shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now')
- except SysCallError as error:
- shutdown_exit_code = error.exit_code
+ except SysCallError as err:
+ shutdown_exit_code = err.exit_code
if self.session:
while self.session.is_alive():
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py
index 22c41c0d..c3af3a83 100644
--- a/archinstall/lib/configuration.py
+++ b/archinstall/lib/configuration.py
@@ -1,13 +1,13 @@
import os
import json
import stat
-import logging
from pathlib import Path
from typing import Optional, Dict, Any, TYPE_CHECKING
+from .menu import Menu, MenuSelectionType
from .storage import storage
-from .general import JSON, UNSAFE_JSON
-from .output import log
+from .general import JSON, UNSAFE_JSON, SysCommand
+from .output import debug, info, warn
if TYPE_CHECKING:
_: Any
@@ -69,18 +69,18 @@ class ConfigurationOutput:
def show(self):
print(_('\nThis is your chosen configuration:'))
- log(" -- Chosen configuration --", level=logging.DEBUG)
+ debug(" -- Chosen configuration --")
user_conig = self.user_config_to_json()
- log(user_conig, level=logging.INFO)
+ info(user_conig)
print()
def _is_valid_path(self, dest_path: Path) -> bool:
if (not dest_path.exists()) or not (dest_path.is_dir()):
- log(
- 'Destination directory {} does not exist or is not a directory,\n Configuration files can not be saved'.format(dest_path.resolve()),
- fg="yellow"
+ warn(
+ f'Destination directory {dest_path.resolve()} does not exist or is not a directory\n.',
+ 'Configuration files can not be saved'
)
return False
return True
@@ -111,3 +111,101 @@ class ConfigurationOutput:
if self._is_valid_path(dest_path):
self.save_user_config(dest_path)
self.save_user_creds(dest_path)
+
+
+def save_config(config: Dict):
+ def preview(selection: str):
+ if options['user_config'] == selection:
+ serialized = config_output.user_config_to_json()
+ return f'{config_output.user_configuration_file}\n{serialized}'
+ elif options['user_creds'] == selection:
+ if maybe_serial := config_output.user_credentials_to_json():
+ return f'{config_output.user_credentials_file}\n{maybe_serial}'
+ else:
+ return str(_('No configuration'))
+ elif options['all'] == selection:
+ output = f'{config_output.user_configuration_file}\n'
+ if config_output.user_credentials_to_json():
+ output += f'{config_output.user_credentials_file}\n'
+ return output[:-1]
+ return None
+
+ config_output = ConfigurationOutput(config)
+
+ options = {
+ 'user_config': str(_('Save user configuration')),
+ 'user_creds': str(_('Save user credentials')),
+ 'disk_layout': str(_('Save disk layout')),
+ 'all': str(_('Save all'))
+ }
+
+ choice = Menu(
+ _('Choose which configuration to save'),
+ list(options.values()),
+ sort=False,
+ skip=True,
+ preview_size=0.75,
+ preview_command=preview
+ ).run()
+
+ if choice.type_ == MenuSelectionType.Skip:
+ return
+
+ save_config_value = choice.single_value
+ saving_key = [k for k, v in options.items() if v == save_config_value][0]
+
+ dirs_to_exclude = [
+ '/bin',
+ '/dev',
+ '/lib',
+ '/lib64',
+ '/lost+found',
+ '/opt',
+ '/proc',
+ '/run',
+ '/sbin',
+ '/srv',
+ '/sys',
+ '/usr',
+ '/var',
+ ]
+
+ debug('Ignore configuration option folders: ' + ','.join(dirs_to_exclude))
+ info(_('Finding possible directories to save configuration files ...'))
+
+ find_exclude = '-path ' + ' -prune -o -path '.join(dirs_to_exclude) + ' -prune '
+ file_picker_command = f'find / {find_exclude} -o -type d -print0'
+
+ directories = SysCommand(file_picker_command).decode()
+
+ if directories is None:
+ raise ValueError('Failed to retrieve possible configuration directories')
+
+ possible_save_dirs = list(filter(None, directories.split('\x00')))
+
+ selection = Menu(
+ _('Select directory (or directories) for saving configuration files'),
+ possible_save_dirs,
+ multi=True,
+ skip=True,
+ allow_reset=False,
+ ).run()
+
+ match selection.type_:
+ case MenuSelectionType.Skip:
+ return
+
+ save_dirs = selection.multi_value
+
+ debug(f'Saving {saving_key} configuration files to {save_dirs}')
+
+ if save_dirs is not None:
+ for save_dir_str in save_dirs:
+ save_dir = Path(save_dir_str)
+ if options['user_config'] == save_config_value:
+ config_output.save_user_config(save_dir)
+ elif options['user_creds'] == save_config_value:
+ config_output.save_user_creds(save_dir)
+ elif options['all'] == save_config_value:
+ config_output.save_user_config(save_dir)
+ config_output.save_user_creds(save_dir)
diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py
index 13bde77a..4341c53c 100644
--- a/archinstall/lib/disk/device_handler.py
+++ b/archinstall/lib/disk/device_handler.py
@@ -1,7 +1,6 @@
from __future__ import annotations
import json
-import logging
import os
import time
from pathlib import Path
@@ -24,7 +23,7 @@ from .device_model import (
from ..exceptions import DiskError, UnknownFilesystemFormat
from ..general import SysCommand, SysCallError, JSON
from ..luks import Luks2
-from ..output import log
+from ..output import debug, error, info, warn
from ..utils.util import is_subpath
if TYPE_CHECKING:
@@ -48,11 +47,11 @@ class DeviceHandler(object):
for device in getAllDevices():
try:
disk = Disk(device)
- except DiskLabelException as error:
- if 'unrecognised disk label' in getattr(error, 'message', str(error)):
+ except DiskLabelException as err:
+ if 'unrecognised disk label' in getattr(error, 'message', str(err)):
disk = freshDisk(device, PartitionTable.GPT.value)
else:
- log(f'Unable to get disk from device: {device}', level=logging.DEBUG)
+ debug(f'Unable to get disk from device: {device}')
continue
device_info = _DeviceInfo.from_disk(disk)
@@ -93,7 +92,7 @@ class DeviceHandler(object):
return FilesystemType(lsblk_info.fstype) if lsblk_info.fstype else None
return None
except ValueError:
- log(f'Could not determine the filesystem: {partition.fileSystem}', level=logging.DEBUG)
+ debug(f'Could not determine the filesystem: {partition.fileSystem}')
return None
@@ -137,7 +136,7 @@ class DeviceHandler(object):
try:
result = SysCommand(f'btrfs subvolume list {mountpoint}')
except SysCallError as err:
- log(f'Failed to read btrfs subvolume information: {err}', level=logging.DEBUG)
+ debug(f'Failed to read btrfs subvolume information: {err}')
return subvol_infos
try:
@@ -150,7 +149,7 @@ class DeviceHandler(object):
sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None)
subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint))
except json.decoder.JSONDecodeError as err:
- log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR)
+ error(f"Could not decode lsblk JSON: {result}")
raise err
if not lsblk_info.mountpoint:
@@ -203,14 +202,14 @@ class DeviceHandler(object):
options += additional_parted_options
options_str = ' '.join(options)
- log(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
+ info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
try:
SysCommand(f"/usr/bin/{command} {options_str} {path}")
- except SysCallError as error:
- msg = f'Could not format {path} with {fs_type.value}: {error.message}'
- log(msg, fg='red')
- raise DiskError(msg) from error
+ except SysCallError as err:
+ msg = f'Could not format {path} with {fs_type.value}: {err.message}'
+ error(msg)
+ raise DiskError(msg) from err
def _perform_enc_formatting(
self,
@@ -227,16 +226,16 @@ class DeviceHandler(object):
key_file = luks_handler.encrypt()
- log(f'Unlocking luks2 device: {dev_path}', level=logging.DEBUG)
+ debug(f'Unlocking luks2 device: {dev_path}')
luks_handler.unlock(key_file=key_file)
if not luks_handler.mapper_dev:
raise DiskError('Failed to unlock luks device')
- log(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}', level=logging.INFO)
+ info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}')
self._perform_formatting(fs_type, luks_handler.mapper_dev)
- log(f'luks2 locking device: {dev_path}', level=logging.INFO)
+ info(f'luks2 locking device: {dev_path}')
luks_handler.lock()
def format(
@@ -285,7 +284,7 @@ class DeviceHandler(object):
# when we require a delete and the partition to be (re)created
# already exists then we have to delete it first
if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]:
- log(f'Delete existing partition: {part_mod.safe_dev_path}', level=logging.INFO)
+ info(f'Delete existing partition: {part_mod.safe_dev_path}')
part_info = self.find_partition(part_mod.safe_dev_path)
if not part_info:
@@ -325,9 +324,9 @@ class DeviceHandler(object):
for flag in part_mod.flags:
partition.setFlag(flag.value)
- log(f'\tType: {part_mod.type.value}', level=logging.DEBUG)
- log(f'\tFilesystem: {part_mod.fs_type.value}', level=logging.DEBUG)
- log(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length', level=logging.DEBUG)
+ debug(f'\tType: {part_mod.type.value}')
+ debug(f'\tFilesystem: {part_mod.fs_type.value}')
+ debug(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length')
try:
disk.addPartition(partition=partition, constraint=disk.device.optimalAlignedConstraint)
@@ -339,41 +338,41 @@ class DeviceHandler(object):
# the partition has a real path now as it was created
part_mod.dev_path = Path(partition.path)
- info = self._fetch_partuuid(part_mod.dev_path)
+ lsblk_info = self._fetch_partuuid(part_mod.dev_path)
- part_mod.partuuid = info.partuuid
- part_mod.uuid = info.uuid
+ part_mod.partuuid = lsblk_info.partuuid
+ part_mod.uuid = lsblk_info.uuid
except PartitionException as ex:
raise DiskError(f'Unable to add partition, most likely due to overlapping sectors: {ex}') from ex
def _fetch_partuuid(self, path: Path) -> LsblkInfo:
attempts = 3
- info: Optional[LsblkInfo] = None
+ lsblk_info: Optional[LsblkInfo] = None
self.partprobe(path)
for attempt_nr in range(attempts):
time.sleep(attempt_nr + 1)
- info = get_lsblk_info(path)
+ lsblk_info = get_lsblk_info(path)
- if info.partuuid:
+ if lsblk_info.partuuid:
break
self.partprobe(path)
- if not info or not info.partuuid:
- log(f'Unable to determine new partition uuid: {path}\n{info}', level=logging.DEBUG)
+ if not lsblk_info or not lsblk_info.partuuid:
+ debug(f'Unable to determine new partition uuid: {path}\n{lsblk_info}')
raise DiskError(f'Unable to determine new partition uuid: {path}')
- log(f'partuuid found: {info.json()}', level=logging.DEBUG)
+ debug(f'partuuid found: {lsblk_info.json()}')
- return info
+ return lsblk_info
def create_btrfs_volumes(
self,
part_mod: PartitionModification,
enc_conf: Optional['DiskEncryption'] = None
):
- log(f'Creating subvolumes: {part_mod.safe_dev_path}', level=logging.INFO)
+ info(f'Creating subvolumes: {part_mod.safe_dev_path}')
luks_handler = None
@@ -396,7 +395,7 @@ class DeviceHandler(object):
self.mount(part_mod.safe_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
for sub_vol in part_mod.btrfs_subvols:
- log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG)
+ debug(f'Creating subvolume: {sub_vol.name}')
if luks_handler is not None:
subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
@@ -408,14 +407,14 @@ class DeviceHandler(object):
if sub_vol.nodatacow:
try:
SysCommand(f'chattr +C {subvol_path}')
- except SysCallError as error:
- raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {error}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}')
if sub_vol.compress:
try:
SysCommand(f'chattr +c {subvol_path}')
- except SysCallError as error:
- raise DiskError(f'Could not set compress attribute at {subvol_path}: {error}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}')
if luks_handler is not None and luks_handler.mapper_dev is not None:
self.umount(luks_handler.mapper_dev)
@@ -435,12 +434,12 @@ class DeviceHandler(object):
return luks_handler
def _umount_all_existing(self, modification: DeviceModification):
- log(f'Unmounting all partitions: {modification.device_path}', level=logging.INFO)
+ info(f'Unmounting all partitions: {modification.device_path}')
existing_partitions = self._devices[modification.device_path].partition_infos
for partition in existing_partitions:
- log(f'Unmounting: {partition.path}', level=logging.DEBUG)
+ debug(f'Unmounting: {partition.path}')
# un-mount for existing encrypted partitions
if partition.fs_type == FilesystemType.Crypto_luks:
@@ -472,10 +471,10 @@ class DeviceHandler(object):
part_table = partition_table.value if partition_table else None
disk = freshDisk(modification.device.disk.device, part_table)
else:
- log(f'Use existing device: {modification.device_path}')
+ info(f'Use existing device: {modification.device_path}')
disk = modification.device.disk
- log(f'Creating partitions: {modification.device_path}')
+ info(f'Creating partitions: {modification.device_path}')
# TODO sort by delete first
@@ -507,7 +506,7 @@ class DeviceHandler(object):
lsblk_info = get_lsblk_info(dev_path)
if target_mountpoint in lsblk_info.mountpoints:
- log(f'Device already mounted at {target_mountpoint}')
+ info(f'Device already mounted at {target_mountpoint}')
return
str_options = ','.join(options)
@@ -517,7 +516,7 @@ class DeviceHandler(object):
command = f'mount {mount_fs} {str_options} {dev_path} {target_mountpoint}'
- log(f'Mounting {dev_path}: command', level=logging.DEBUG)
+ debug(f'Mounting {dev_path}: command')
try:
SysCommand(command)
@@ -536,10 +535,10 @@ class DeviceHandler(object):
raise ex
if len(lsblk_info.mountpoints) > 0:
- log(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}', level=logging.DEBUG)
+ debug(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}')
for mountpoint in lsblk_info.mountpoints:
- log(f'Unmounting mountpoint: {mountpoint}', level=logging.DEBUG)
+ debug(f'Unmounting mountpoint: {mountpoint}')
command = 'umount'
@@ -574,10 +573,10 @@ class DeviceHandler(object):
command = 'partprobe'
try:
- log(f'Calling partprobe: {command}', level=logging.DEBUG)
+ debug(f'Calling partprobe: {command}')
SysCommand(command)
- except SysCallError as error:
- log(f'"{command}" failed to run: {error}', level=logging.DEBUG)
+ except SysCallError as err:
+ error(f'"{command}" failed to run: {err}')
def _wipe(self, dev_path: Path):
"""
@@ -594,7 +593,7 @@ class DeviceHandler(object):
This is not intended to be secure, but rather to ensure that
auto-discovery tools don't recognize anything here.
"""
- log(f'Wiping partitions and metadata: {block_device.device_info.path}')
+ info(f'Wiping partitions and metadata: {block_device.device_info.path}')
for partition in block_device.partition_infos:
self._wipe(partition.path)
@@ -609,8 +608,8 @@ def disk_layouts() -> str:
lsblk_info = get_all_lsblk_info()
return json.dumps(lsblk_info, indent=4, sort_keys=True, cls=JSON)
except SysCallError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
+ warn(f"Could not return disk layouts: {err}")
return ''
except json.decoder.JSONDecodeError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
+ warn(f"Could not return disk layouts: {err}")
return ''
diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py
index d57347b7..36dd0c4f 100644
--- a/archinstall/lib/disk/device_model.py
+++ b/archinstall/lib/disk/device_model.py
@@ -2,7 +2,6 @@ from __future__ import annotations
import dataclasses
import json
-import logging
import math
import time
import uuid
@@ -18,7 +17,7 @@ from parted import Disk, Geometry, Partition
from ..exceptions import DiskError, SysCallError
from ..general import SysCommand
-from ..output import log
+from ..output import debug, error
from ..storage import storage
if TYPE_CHECKING:
@@ -282,7 +281,7 @@ class _PartitionInfo:
btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list)
def as_json(self) -> Dict[str, Any]:
- info = {
+ part_info = {
'Name': self.name,
'Type': self.type.value,
'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')),
@@ -293,9 +292,9 @@ class _PartitionInfo:
}
if self.btrfs_subvol_infos:
- info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes'
+ part_info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes'
- return info
+ return part_info
@classmethod
def from_partition(
@@ -392,7 +391,7 @@ class SubvolumeModification:
mods = []
for entry in subvol_args:
if not entry.get('name', None) or not entry.get('mountpoint', None):
- log(f'Subvolume arg is missing name: {entry}', level=logging.DEBUG)
+ debug(f'Subvolume arg is missing name: {entry}')
continue
mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None
@@ -705,7 +704,7 @@ class PartitionModification:
"""
Called for displaying data in table format
"""
- info = {
+ part_mod = {
'Status': self.status.value,
'Device': str(self.dev_path) if self.dev_path else '',
'Type': self.type.value,
@@ -718,9 +717,9 @@ class PartitionModification:
}
if self.btrfs_subvols:
- info['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes'
+ part_mod['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes'
- return info
+ return part_mod
@dataclass
@@ -916,36 +915,36 @@ class LsblkInfo:
@classmethod
def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo:
- info = cls()
+ lsblk_info = cls()
for f in cls.fields():
lsblk_field = _clean_field(f, CleanType.Blockdevice)
data_field = _clean_field(f, CleanType.Dataclass)
val: Any = None
- if isinstance(getattr(info, data_field), Path):
+ if isinstance(getattr(lsblk_info, data_field), Path):
val = Path(blockdevice[lsblk_field])
- elif isinstance(getattr(info, data_field), Size):
+ elif isinstance(getattr(lsblk_info, data_field), Size):
val = Size(blockdevice[lsblk_field], Unit.B)
else:
val = blockdevice[lsblk_field]
- setattr(info, data_field, val)
+ setattr(lsblk_info, data_field, val)
- info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])]
+ lsblk_info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])]
# sometimes lsblk returns 'mountpoints': [null]
- info.mountpoints = [Path(mnt) for mnt in info.mountpoints if mnt]
+ lsblk_info.mountpoints = [Path(mnt) for mnt in lsblk_info.mountpoints if mnt]
fs_roots = []
- for r in info.fsroots:
+ for r in lsblk_info.fsroots:
if r:
path = Path(r)
# store the fsroot entries without the leading /
fs_roots.append(path.relative_to(path.anchor))
- info.fsroots = fs_roots
+ lsblk_info.fsroots = fs_roots
- return info
+ return lsblk_info
class CleanType(Enum):
@@ -978,16 +977,16 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
try:
result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}')
break
- except SysCallError as error:
+ except SysCallError as err:
# Get the output minus the message/info from lsblk if it returns a non-zero exit code.
- if error.worker:
- err = error.worker.decode('UTF-8')
- log(f'Error calling lsblk: {err}', level=logging.DEBUG)
+ if err.worker:
+ err_str = err.worker.decode('UTF-8')
+ debug(f'Error calling lsblk: {err_str}')
else:
- raise error
+ raise err
if retry_attempt == retry - 1:
- raise error
+ raise err
time.sleep(1)
@@ -997,11 +996,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
blockdevices = block_devices['blockdevices']
return [LsblkInfo.from_json(device) for device in blockdevices]
except json.decoder.JSONDecodeError as err:
- log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR)
+ error(f"Could not decode lsblk JSON: {result}")
raise err
raise DiskError(f'Failed to read disk "{dev_path}" with lsblk')
+
def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
if infos := _fetch_lsblk_info(dev_path):
return infos[0]
diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py
index 285270fb..8c64e65e 100644
--- a/archinstall/lib/disk/encryption_menu.py
+++ b/archinstall/lib/disk/encryption_menu.py
@@ -13,7 +13,7 @@ from ..menu import (
MenuSelectionType,
TableMenu
)
-from ..user_interaction.utils import get_password
+from ..interactions.utils import get_password
from ..menu import Menu
from ..general import secret
from .fido import Fido2Device, Fido2
diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py
index 2a53b551..97c38d84 100644
--- a/archinstall/lib/disk/fido.py
+++ b/archinstall/lib/disk/fido.py
@@ -1,13 +1,12 @@
from __future__ import annotations
import getpass
-import logging
from pathlib import Path
from typing import List, Optional
from .device_model import PartitionModification, Fido2Device
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
-from ..output import log
+from ..output import error, info
class Fido2:
@@ -39,7 +38,7 @@ class Fido2:
if not cls._loaded or reload:
ret: Optional[str] = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8')
if not ret:
- log('Unable to retrieve fido2 devices', level=logging.ERROR)
+ error('Unable to retrieve fido2 devices')
return []
fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore
@@ -88,8 +87,4 @@ class Fido2:
worker.write(bytes(getpass.getpass(" "), 'UTF-8'))
pin_inputted = True
- log(
- f"You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds.",
- level=logging.INFO,
- fg="yellow"
- )
+ info('You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds')
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 6ea99340..dc99afce 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-import logging
import signal
import sys
import time
@@ -8,8 +7,8 @@ from typing import Any, Optional, TYPE_CHECKING
from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption
from .device_handler import device_handler
-from ..hardware import has_uefi
-from ..output import log
+from ..hardware import SysInfo
+from ..output import debug
from ..menu import Menu
if TYPE_CHECKING:
@@ -27,13 +26,13 @@ class FilesystemHandler:
def perform_filesystem_operations(self, show_countdown: bool = True):
if self._disk_config.config_type == DiskLayoutType.Pre_mount:
- log('Disk layout configuration is set to pre-mount, not performing any operations', level=logging.DEBUG)
+ debug('Disk layout configuration is set to pre-mount, not performing any operations')
return
device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications))
if not device_mods:
- log('No modifications required', level=logging.DEBUG)
+ debug('No modifications required')
return
device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods])
@@ -48,7 +47,7 @@ class FilesystemHandler:
# Setup the blockdevice, filesystem (and optionally encryption).
# Once that's done, we'll hand over to perform_installation()
partition_table = PartitionTable.GPT
- if has_uefi() is False:
+ if SysInfo.has_uefi() is False:
partition_table = PartitionTable.MBR
for mod in device_mods:
diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py
index 686e8c29..89cf6293 100644
--- a/archinstall/lib/disk/partitioning_menu.py
+++ b/archinstall/lib/disk/partitioning_menu.py
@@ -1,13 +1,12 @@
from __future__ import annotations
-import logging
from pathlib import Path
from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple
from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \
ModificationStatus
from ..menu import Menu, ListManager, MenuSelection, TextInput
-from ..output import FormattedOutput, log
+from ..output import FormattedOutput, warn
from .subvolume_menu import SubvolumeMenu
if TYPE_CHECKING:
@@ -229,7 +228,7 @@ class PartitioningList(ListManager):
if not start_sector or self._validate_sector(start_sector):
break
- log(f'Invalid start sector entered: {start_sector}', fg='red', level=logging.INFO)
+ warn(f'Invalid start sector entered: {start_sector}')
if not start_sector:
start_sector = str(largest_free_area.start)
@@ -245,7 +244,7 @@ class PartitioningList(ListManager):
if not end_value or self._validate_sector(start_sector, end_value):
break
- log(f'Invalid end sector entered: {start_sector}', fg='red', level=logging.INFO)
+ warn(f'Invalid end sector entered: {start_sector}')
# override the default value with the user value
if end_value:
@@ -300,7 +299,7 @@ class PartitioningList(ListManager):
if choice.value == Menu.no():
return []
- from ..user_interaction.disk_conf import suggest_single_disk_layout
+ from ..interactions.disk_conf import suggest_single_disk_layout
device_modification = suggest_single_disk_layout(self._device)
return device_modification.partitions
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index a66e4e04..53458d2c 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -3,6 +3,7 @@ from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from .general import SysCommandWorker
+
class RequirementError(BaseException):
pass
@@ -15,10 +16,6 @@ class UnknownFilesystemFormat(BaseException):
pass
-class ProfileError(BaseException):
- pass
-
-
class SysCallError(BaseException):
def __init__(self, message :str, exit_code :Optional[int] = None, worker :Optional['SysCommandWorker'] = None) -> None:
super(SysCallError, self).__init__(message)
@@ -27,22 +24,10 @@ class SysCallError(BaseException):
self.worker = worker
-class PermissionError(BaseException):
- pass
-
-
-class ProfileNotFound(BaseException):
- pass
-
-
class HardwareIncompatibilityError(BaseException):
pass
-class UserError(BaseException):
- pass
-
-
class ServiceException(BaseException):
pass
@@ -51,9 +36,5 @@ class PackageError(BaseException):
pass
-class TranslationError(BaseException):
- pass
-
-
class Deprecated(BaseException):
- pass \ No newline at end of file
+ pass
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 997b7d67..777ee90e 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -1,8 +1,6 @@
from __future__ import annotations
-import hashlib
import json
-import logging
import os
import secrets
import shlex
@@ -18,9 +16,10 @@ import urllib.error
import pathlib
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
+from select import epoll, EPOLLIN, EPOLLHUP
from .exceptions import RequirementError, SysCallError
-from .output import log
+from .output import debug, error, info
from .storage import storage
@@ -28,42 +27,6 @@ if TYPE_CHECKING:
from .installer import Installer
-if sys.platform == 'linux':
- from select import epoll, EPOLLIN, EPOLLHUP
-else:
- import select
- EPOLLIN = 0
- EPOLLHUP = 0
-
- class epoll():
- """ #!if windows
- Create a epoll() implementation that simulates the epoll() behavior.
- This so that the rest of the code doesn't need to worry weither we're using select() or epoll().
- """
- def __init__(self) -> None:
- self.sockets: Dict[str, Any] = {}
- self.monitoring: Dict[int, Any] = {}
-
- def unregister(self, fileno :int, *args :List[Any], **kwargs :Dict[str, Any]) -> None:
- try:
- del(self.monitoring[fileno]) # noqa: E275
- except:
- pass
-
- def register(self, fileno :int, *args :int, **kwargs :Dict[str, Any]) -> None:
- self.monitoring[fileno] = True
-
- def poll(self, timeout: float = 0.05, *args :str, **kwargs :Dict[str, Any]) -> List[Any]:
- try:
- return [[fileno, 1] for fileno in select.select(list(self.monitoring.keys()), [], [], timeout)[0]]
- except OSError:
- return []
-
-
-def gen_uid(entropy_length :int = 256) -> str:
- return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
-
-
def generate_password(length :int = 64) -> str:
haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace
return ''.join(secrets.choice(haystack) for i in range(length))
@@ -156,6 +119,7 @@ class JsonEncoder:
else:
return JsonEncoder._encode(obj)
+
class JSON(json.JSONEncoder, json.JSONDecoder):
"""
A safe JSON encoder that will omit private information in dicts (starting with !)
@@ -166,6 +130,7 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
def encode(self, obj :Any) -> Any:
return super(JSON, self).encode(self._encode(obj))
+
class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder):
"""
UNSAFE_JSON will call/encode and keep private information in dicts (starting with !)
@@ -269,7 +234,7 @@ class SysCommandWorker:
sys.stdout.flush()
if len(args) >= 2 and args[1]:
- log(args[1], level=logging.DEBUG, fg='red')
+ debug(args[1])
if self.exit_code != 0:
raise SysCallError(
@@ -350,7 +315,7 @@ class SysCommandWorker:
self.ended = time.time()
break
- if self.ended or (got_output is False and pid_exists(self.pid) is False):
+ if self.ended or (got_output is False and _pid_exists(self.pid) is False):
self.ended = time.time()
try:
wait_status = os.waitpid(self.pid, 0)[1]
@@ -396,15 +361,15 @@ class SysCommandWorker:
pass
except Exception as e:
exception_type = type(e).__name__
- log(f"Unexpected {exception_type} occurred in {self.cmd}: {e}", level=logging.ERROR)
+ error(f"Unexpected {exception_type} occurred in {self.cmd}: {e}")
raise e
os.execve(self.cmd[0], list(self.cmd), {**os.environ, **self.environment_vars})
if storage['arguments'].get('debug'):
- log(f"Executing: {self.cmd}", level=logging.DEBUG)
+ debug(f"Executing: {self.cmd}")
except FileNotFoundError:
- log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red")
+ error(f"{self.cmd[0]} does not exist.")
self.exit_code = 1
return False
else:
@@ -455,7 +420,7 @@ class SysCommand:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
- log(args[1], level=logging.ERROR, fg='red')
+ error(args[1])
def __iter__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> Iterator[bytes]:
if self.session:
@@ -535,22 +500,7 @@ class SysCommand:
return None
-def prerequisite_check() -> bool:
- """
- This function is used as a safety check before
- continuing with an installation.
-
- Could be anything from checking that /boot is big enough
- to check if nvidia hardware exists when nvidia driver was chosen.
- """
-
- return True
-
-def reboot():
- SysCommand("/usr/bin/reboot")
-
-
-def pid_exists(pid: int) -> bool:
+def _pid_exists(pid: int) -> bool:
try:
return any(subprocess.check_output(['/usr/bin/ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip())
except subprocess.CalledProcessError:
@@ -559,7 +509,7 @@ def pid_exists(pid: int) -> bool:
def run_custom_user_commands(commands :List[str], installation :Installer) -> None:
for index, command in enumerate(commands):
- log(f'Executing custom command "{command}" ...', level=logging.INFO)
+ info(f'Executing custom command "{command}" ...')
with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script:
temp_script.write(command)
@@ -568,6 +518,7 @@ def run_custom_user_commands(commands :List[str], installation :Installer) -> No
os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")
+
def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool :
"""
Function to load a stream (file (as name) or valid JSON string into an existing dictionary
@@ -582,16 +533,16 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target
try:
with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response:
target.update(json.loads(response.read()))
- except urllib.error.HTTPError as error:
- log(f"Could not load {configuration_identifier} via {parsed_url} due to: {error}", level=logging.ERROR, fg="red")
+ except urllib.error.HTTPError as err:
+ error(f"Could not load {configuration_identifier} via {parsed_url} due to: {err}")
return False
else:
if pathlib.Path(stream).exists():
try:
with pathlib.Path(stream).open() as fh:
target.update(json.load(fh))
- except Exception as error:
- log(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {error}", level=logging.ERROR, fg="red")
+ except Exception as err:
+ error(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {err}")
return False
else:
# NOTE: This is a rudimentary check if what we're trying parse is a dict structure.
@@ -600,14 +551,15 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target
try:
target.update(json.loads(stream))
except Exception as e:
- log(f" {configuration_identifier} Contains an invalid JSON format : {e}",level=logging.ERROR, fg="red")
+ error(f"{configuration_identifier} Contains an invalid JSON format: {e}")
return False
else:
- log(f" {configuration_identifier} is neither a file nor is a JSON string:",level=logging.ERROR, fg="red")
+ error(f"{configuration_identifier} is neither a file nor is a JSON string")
return False
return True
+
def secret(x :str):
""" return * with len equal to to the input string """
return '*' * len(x)
diff --git a/archinstall/lib/global_menu.py b/archinstall/lib/global_menu.py
index a969d93f..13595132 100644
--- a/archinstall/lib/global_menu.py
+++ b/archinstall/lib/global_menu.py
@@ -11,24 +11,24 @@ from .models.users import User
from .output import FormattedOutput
from .profile.profile_menu import ProfileConfiguration
from .storage import storage
-from .user_interaction import add_number_of_parrallel_downloads
-from .user_interaction import ask_additional_packages_to_install
-from .user_interaction import ask_for_additional_users
-from .user_interaction import ask_for_audio_selection
-from .user_interaction import ask_for_bootloader
-from .user_interaction import ask_for_swap
-from .user_interaction import ask_hostname
-from .user_interaction import ask_ntp
-from .user_interaction import ask_to_configure_network
-from .user_interaction import get_password, ask_for_a_timezone
-from .user_interaction import select_additional_repositories
-from .user_interaction import select_kernel
-from .user_interaction import select_language
-from .user_interaction import select_locale_enc
-from .user_interaction import select_locale_lang
-from .user_interaction import select_mirror_regions
-from .user_interaction.disk_conf import select_disk_config
-from .user_interaction.save_conf import save_config
+from .configuration import save_config
+from .interactions import add_number_of_parrallel_downloads
+from .interactions import ask_additional_packages_to_install
+from .interactions import ask_for_additional_users
+from .interactions import ask_for_audio_selection
+from .interactions import ask_for_bootloader
+from .interactions import ask_for_swap
+from .interactions import ask_hostname
+from .interactions import ask_to_configure_network
+from .interactions import get_password, ask_for_a_timezone
+from .interactions import select_additional_repositories
+from .interactions import select_kernel
+from .interactions import select_language
+from .interactions import select_locale_enc
+from .interactions import select_locale_lang
+from .interactions import select_mirror_regions
+from .interactions import ask_ntp
+from .interactions.disk_conf import select_disk_config
if TYPE_CHECKING:
_: Any
diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py
index 3759725f..b95301f9 100644
--- a/archinstall/lib/hardware.py
+++ b/archinstall/lib/hardware.py
@@ -1,27 +1,12 @@
import os
-import logging
-from functools import partial
+from functools import cached_property
from pathlib import Path
-from typing import Iterator, Optional, Dict
+from typing import Optional, Dict
from .general import SysCommand
from .networking import list_interfaces, enrich_iface_types
from .exceptions import SysCallError
-from .output import log
-
-__packages__ = [
- "mesa",
- "xf86-video-amdgpu",
- "xf86-video-ati",
- "xf86-video-nouveau",
- "xf86-video-vmware",
- "libva-mesa-driver",
- "libva-intel-driver",
- "intel-media-driver",
- "vulkan-radeon",
- "vulkan-intel",
- "nvidia",
-]
+from .output import debug
AVAILABLE_GFX_DRIVERS = {
# Sub-dicts are layer-2 options to be selected
@@ -62,136 +47,125 @@ AVAILABLE_GFX_DRIVERS = {
}
-def cpuinfo() -> Iterator[dict[str, str]]:
- """
- Yields information about the CPUs of the system
- """
- cpu_info_path = Path("/proc/cpuinfo")
- cpu: Dict[str, str] = {}
+class _SysInfo:
+ def __init__(self):
+ pass
- with cpu_info_path.open() as file:
- for line in file:
- if not (line := line.strip()):
- yield cpu
- cpu = {}
- continue
-
- key, value = line.split(":", maxsplit=1)
- cpu[key.strip()] = value.strip()
-
-
-def all_meminfo() -> Dict[str, int]:
- """
- Returns a dict with memory info if called with no args
- or the value of the given key of said dict.
- """
- mem_info_path = Path("/proc/meminfo")
- mem_info: Dict[str, int] = {}
-
- with mem_info_path.open() as file:
- for line in file:
- key, value = line.strip().split(':')
- num = value.split()[0]
- mem_info[key] = int(num)
-
- return mem_info
-
-
-def meminfo_for_key(key: str) -> int:
- info = all_meminfo()
- return info[key]
-
-
-def has_wifi() -> bool:
- ifaces = list(list_interfaces().values())
- return 'WIRELESS' in enrich_iface_types(ifaces).values()
-
-
-def has_cpu_vendor(vendor_id: str) -> bool:
- return any(cpu.get("vendor_id") == vendor_id for cpu in cpuinfo())
-
-
-has_amd_cpu = partial(has_cpu_vendor, "AuthenticAMD")
-
-
-has_intel_cpu = partial(has_cpu_vendor, "GenuineIntel")
-
-
-def has_uefi() -> bool:
- return os.path.isdir('/sys/firmware/efi')
-
-
-def graphics_devices() -> dict:
- cards = {}
- for line in SysCommand("lspci"):
- if b' VGA ' in line or b' 3D ' in line:
- _, identifier = line.split(b': ', 1)
- cards[identifier.strip().decode('UTF-8')] = line
- return cards
-
-
-def has_nvidia_graphics() -> bool:
- return any('nvidia' in x.lower() for x in graphics_devices())
-
-
-def has_amd_graphics() -> bool:
- return any('amd' in x.lower() for x in graphics_devices())
-
-
-def has_intel_graphics() -> bool:
- return any('intel' in x.lower() for x in graphics_devices())
-
-
-def cpu_vendor() -> Optional[str]:
- for cpu in cpuinfo():
- return cpu.get("vendor_id")
-
- return None
-
-
-def cpu_model() -> Optional[str]:
- for cpu in cpuinfo():
- return cpu.get("model name")
-
- return None
-
-
-def sys_vendor() -> Optional[str]:
- with open(f"/sys/devices/virtual/dmi/id/sys_vendor") as vendor:
- return vendor.read().strip()
-
-
-def product_name() -> Optional[str]:
- with open(f"/sys/devices/virtual/dmi/id/product_name") as product:
- return product.read().strip()
-
-
-def mem_available() -> Optional[int]:
- return meminfo_for_key('MemAvailable')
-
-
-def mem_free() -> Optional[int]:
- return meminfo_for_key('MemFree')
-
-
-def mem_total() -> Optional[int]:
- return meminfo_for_key('MemTotal')
-
-
-def virtualization() -> Optional[str]:
- try:
- return str(SysCommand("systemd-detect-virt")).strip('\r\n')
- except SysCallError as error:
- log(f"Could not detect virtual system: {error}", level=logging.DEBUG)
-
- return None
-
-
-def is_vm() -> bool:
- try:
- result = SysCommand("systemd-detect-virt")
- return b"none" not in b"".join(result).lower()
- except SysCallError as error:
- log(f"System is not running in a VM: {error}", level=logging.DEBUG)
-
- return False
+ @cached_property
+ def cpu_info(self) -> Dict[str, str]:
+ """
+ Returns system cpu information
+ """
+ cpu_info_path = Path("/proc/cpuinfo")
+ cpu: Dict[str, str] = {}
+
+ with cpu_info_path.open() as file:
+ for line in file:
+ if line := line.strip():
+ key, value = line.split(":", maxsplit=1)
+ cpu[key.strip()] = value.strip()
+
+ return cpu
+
+ @cached_property
+ def mem_info(self) -> Dict[str, int]:
+ """
+ Returns system memory information
+ """
+ mem_info_path = Path("/proc/meminfo")
+ mem_info: Dict[str, int] = {}
+
+ with mem_info_path.open() as file:
+ for line in file:
+ key, value = line.strip().split(':')
+ num = value.split()[0]
+ mem_info[key] = int(num)
+
+ return mem_info
+
+ def mem_info_by_key(self, key: str) -> int:
+ return self.mem_info[key]
+
+
+_sys_info = _SysInfo()
+
+
+class SysInfo:
+ @staticmethod
+ def has_wifi() -> bool:
+ ifaces = list(list_interfaces().values())
+ return 'WIRELESS' in enrich_iface_types(ifaces).values()
+
+ @staticmethod
+ def has_uefi() -> bool:
+ return os.path.isdir('/sys/firmware/efi')
+
+ @staticmethod
+ def _graphics_devices() -> Dict[str, str]:
+ cards: Dict[str, str] = {}
+ for line in SysCommand("lspci"):
+ if b' VGA ' in line or b' 3D ' in line:
+ _, identifier = line.split(b': ', 1)
+ cards[identifier.strip().decode('UTF-8')] = str(line)
+ return cards
+
+ @staticmethod
+ def has_nvidia_graphics() -> bool:
+ return any('nvidia' in x.lower() for x in SysInfo._graphics_devices())
+
+ @staticmethod
+ def has_amd_graphics() -> bool:
+ return any('amd' in x.lower() for x in SysInfo._graphics_devices())
+
+ @staticmethod
+ def has_intel_graphics() -> bool:
+ return any('intel' in x.lower() for x in SysInfo._graphics_devices())
+
+ @staticmethod
+ def cpu_vendor() -> Optional[str]:
+ return _sys_info.cpu_info.get('vendor_id', None)
+
+ @staticmethod
+ def cpu_model() -> Optional[str]:
+ return _sys_info.cpu_info.get('model name', None)
+
+ @staticmethod
+ def sys_vendor() -> str:
+ with open(f"/sys/devices/virtual/dmi/id/sys_vendor") as vendor:
+ return vendor.read().strip()
+
+ @staticmethod
+ def product_name() -> str:
+ with open(f"/sys/devices/virtual/dmi/id/product_name") as product:
+ return product.read().strip()
+
+ @staticmethod
+ def mem_available() -> int:
+ return _sys_info.mem_info_by_key('MemAvailable')
+
+ @staticmethod
+ def mem_free() -> int:
+ return _sys_info.mem_info_by_key('MemFree')
+
+ @staticmethod
+ def mem_total() -> int:
+ return _sys_info.mem_info_by_key('MemTotal')
+
+ @staticmethod
+ def virtualization() -> Optional[str]:
+ try:
+ return str(SysCommand("systemd-detect-virt")).strip('\r\n')
+ except SysCallError as err:
+ debug(f"Could not detect virtual system: {err}")
+
+ return None
+
+ @staticmethod
+ def is_vm() -> bool:
+ try:
+ result = SysCommand("systemd-detect-virt")
+ return b"none" not in b"".join(result).lower()
+ except SysCallError as err:
+ debug(f"System is not running in a VM: {err}")
+
+ return False
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 726ff3d0..3c427ab2 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,5 +1,4 @@
import glob
-import logging
import os
import re
import shlex
@@ -12,17 +11,16 @@ from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, It
from . import disk
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
from .general import SysCommand
-from .hardware import has_uefi, is_vm, cpu_vendor
-from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout
+from .hardware import SysInfo
+from .locale import verify_keyboard_layout, verify_x11_keyboard_layout
from .luks import Luks2
from .mirrors import use_mirrors
from .models.bootloader import Bootloader
from .models.network_configuration import NetworkConfiguration
from .models.users import User
-from .output import log
+from .output import log, error, info, warn, debug
from .pacman import run_pacman
from .plugins import plugins
-from .services import service_state
from .storage import storage
if TYPE_CHECKING:
@@ -41,28 +39,6 @@ def accessibility_tools_in_use() -> bool:
class Installer:
- """
- `Installer()` is the wrapper for most basic installation steps.
- It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
-
- :param partition: Requires a partition as the first argument, this is
- so that the installer can mount to `mountpoint` and strap packages there.
- :type partition: class:`archinstall.Partition`
-
- :param boot_partition: There's two reasons for needing a boot partition argument,
- The first being so that `mkinitcpio` can place the `vmlinuz` kernel at the right place
- during the `pacstrap` or `linux` and the base packages for a minimal installation.
- The second being when :py:func:`~archinstall.Installer.add_bootloader` is called,
- A `boot_partition` must be known to the installer before this is called.
- :type boot_partition: class:`archinstall.Partition`
-
- :param profile: A profile to install, this is optional and can be called later manually.
- This just simplifies the process by not having to call :py:func:`~archinstall.Installer.install_profile` later on.
- :type profile: str, optional
-
- :param hostname: The given /etc/hostname for the machine.
- :type hostname: str, optional
- """
def __init__(
self,
target: Path,
@@ -71,6 +47,10 @@ class Installer:
base_packages: List[str] = [],
kernels: Optional[List[str]] = None
):
+ """
+ `Installer()` is the wrapper for most basic installation steps.
+ It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
+ """
if not base_packages:
base_packages = __packages__[:3]
@@ -126,7 +106,7 @@ class Installer:
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
- log(exc_val, fg='red', level=logging.ERROR)
+ error(exc_val)
self.sync_log_to_install_medium()
@@ -137,48 +117,41 @@ class Installer:
raise exc_val
if not (missing_steps := self.post_install_check()):
- self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO)
+ log('Installation completed without any errors. You may now reboot.', fg='green')
self.sync_log_to_install_medium()
return True
else:
- self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
+ warn('Some required steps were not successfully installed/configured before leaving the installer:')
for step in missing_steps:
- self.log(f' - {step}', fg='red', level=logging.WARNING)
+ warn(f' - {step}')
- self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING)
- self.log("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
+ warn(f"Detailed error logs can be found at: {storage['LOG_PATH']}")
+ warn("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues")
self.sync_log_to_install_medium()
return False
- def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
- """
- installer.log() wraps output.log() mainly to set a default log-level for this install session.
- Any manual override can be done per log() call.
- """
- log(*args, level=level, **kwargs)
-
def _verify_service_stop(self):
"""
Certain services might be running that affects the system during installation.
One such service is "reflector.service" which updates /etc/pacman.d/mirrorlist
We need to wait for it before we continue since we opted in to use a custom mirror/region.
"""
- log('Waiting for time sync (systemd-timesyncd.service) to complete.', level=logging.INFO)
+ info('Waiting for time sync (systemd-timesyncd.service) to complete.')
while SysCommand('timedatectl show --property=NTPSynchronized --value').decode().rstrip() != 'yes':
time.sleep(1)
- log('Waiting for automatic mirror selection (reflector) to complete.', level=logging.INFO)
- while service_state('reflector') not in ('dead', 'failed', 'exited'):
+ info('Waiting for automatic mirror selection (reflector) to complete.')
+ while self._service_state('reflector') not in ('dead', 'failed', 'exited'):
time.sleep(1)
- log('Waiting pacman-init.service to complete.', level=logging.INFO)
- while service_state('pacman-init') not in ('dead', 'failed', 'exited'):
+ info('Waiting pacman-init.service to complete.')
+ while self._service_state('pacman-init') not in ('dead', 'failed', 'exited'):
time.sleep(1)
- log('Waiting Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.', level=logging.INFO)
- while service_state('archlinux-keyring-wkd-sync') not in ('dead', 'failed', 'exited'):
+ info('Waiting Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.')
+ while self._service_state('archlinux-keyring-wkd-sync') not in ('dead', 'failed', 'exited'):
time.sleep(1)
def _verify_boot_part(self):
@@ -204,7 +177,7 @@ class Installer:
self._verify_service_stop()
def mount_ordered_layout(self):
- log('Mounting partitions in order', level=logging.INFO)
+ info('Mounting partitions in order')
for mod in self._disk_config.device_modifications:
# partitions have to mounted in the right order on btrfs the mountpoint will
@@ -275,7 +248,7 @@ class Installer:
)
if gen_enc_file and not part_mod.is_root():
- log(f'Creating key-file: {part_mod.dev_path}', level=logging.INFO)
+ info(f'Creating key-file: {part_mod.dev_path}')
luks_handler.create_keyfile(self.target)
if part_mod.is_root() and not gen_enc_file:
@@ -384,25 +357,25 @@ class Installer:
if (result := plugin.on_pacstrap(packages)):
packages = result
- self.log(f'Installing packages: {packages}', level=logging.INFO)
+ info(f'Installing packages: {packages}')
# TODO: We technically only need to run the -Syy once.
try:
run_pacman('-Syy', default_cmd='/usr/bin/pacman')
- except SysCallError as error:
- self.log(f'Could not sync a new package database: {error}', level=logging.ERROR, fg="red")
+ except SysCallError as err:
+ error(f'Could not sync a new package database: {err}')
if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
return self._pacstrap(packages)
- raise RequirementError(f'Could not sync mirrors: {error}')
+ raise RequirementError(f'Could not sync mirrors: {err}')
try:
SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True)
return True
- except SysCallError as error:
- self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red")
+ except SysCallError as err:
+ error(f'Could not strap in packages: {err}')
if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
@@ -420,12 +393,12 @@ class Installer:
use_mirrors(mirrors, destination=destination)
def genfstab(self, flags :str = '-pU'):
- self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
+ info(f"Updating {self.target}/etc/fstab")
try:
gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode()
- except SysCallError as error:
- raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}')
+ except SysCallError as err:
+ raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {err}')
if not gen_fstab:
raise RequirementError(f'Genrating fstab returned empty value')
@@ -530,24 +503,20 @@ class Installer:
return True
else:
- self.log(
- f"Time zone {zone} does not exist, continuing with system default.",
- level=logging.WARNING,
- fg='red'
- )
+ warn(f'Time zone {zone} does not exist, continuing with system default')
return False
def activate_time_syncronization(self) -> None:
- self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO)
+ info('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers')
self.enable_service('systemd-timesyncd')
def enable_espeakup(self) -> None:
- self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO)
+ info('Enabling espeakup.service for speech synthesis (accessibility)')
self.enable_service('espeakup')
def enable_periodic_trim(self) -> None:
- self.log("Enabling periodic TRIM")
+ info("Enabling periodic TRIM")
# fstrim is owned by util-linux, a dependency of both base and systemd.
self.enable_service("fstrim.timer")
@@ -556,12 +525,12 @@ class Installer:
services = [services]
for service in services:
- self.log(f'Enabling service {service}', level=logging.INFO)
+ info(f'Enabling service {service}')
try:
self.arch_chroot(f'systemctl enable {service}')
- except SysCallError as error:
- raise ServiceException(f"Unable to start service {service}: {error}")
+ except SysCallError as err:
+ raise ServiceException(f"Unable to start service {service}: {err}")
for plugin in plugins.values():
if hasattr(plugin, 'on_service'):
@@ -713,11 +682,11 @@ class Installer:
if 'encrypt' not in self._hooks:
self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
- if not has_uefi():
+ if not SysInfo.has_uefi():
self.base_packages.append('grub')
- if not is_vm():
- vendor = cpu_vendor()
+ if not SysInfo.is_vm():
+ vendor = SysInfo.cpu_vendor()
if vendor == "AuthenticAMD":
self.base_packages.append("amd-ucode")
if (ucode := Path(f"{self.target}/boot/amd-ucode.img")).exists():
@@ -727,21 +696,21 @@ class Installer:
if (ucode := Path(f"{self.target}/boot/intel-ucode.img")).exists():
ucode.unlink()
else:
- self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode.", level=logging.DEBUG)
+ debug(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode")
# Determine whether to enable multilib/testing repositories before running pacstrap if testing flag is set.
# This action takes place on the host system as pacstrap copies over package repository lists.
if multilib:
- self.log("The multilib flag is set. This system will be installed with the multilib repository enabled.")
+ info("The multilib flag is set. This system will be installed with the multilib repository enabled.")
self.enable_multilib_repository()
else:
- self.log("The multilib flag is not set. This system will be installed without multilib repositories enabled.")
+ info("The multilib flag is not set. This system will be installed without multilib repositories enabled.")
if testing:
- self.log("The testing flag is set. This system will be installed with testing repositories enabled.")
+ info("The testing flag is set. This system will be installed with testing repositories enabled.")
self.enable_testing_repositories(multilib)
else:
- self.log("The testing flag is not set. This system will be installed without testing repositories enabled.")
+ info("The testing flag is not set. This system will be installed without testing repositories enabled.")
self._pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True
@@ -773,7 +742,7 @@ class Installer:
# Run registered post-install hooks
for function in self.post_base_install:
- self.log(f"Running post-installation hook: {function}", level=logging.INFO)
+ info(f"Running post-installation hook: {function}")
function(self)
for plugin in plugins.values():
@@ -782,7 +751,7 @@ class Installer:
def setup_swap(self, kind :str = 'zram'):
if kind == 'zram':
- self.log(f"Setting up swap on zram")
+ info(f"Setting up swap on zram")
self._pacstrap('zram-generator')
# We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813
@@ -812,7 +781,7 @@ class Installer:
def _add_systemd_bootloader(self, root_partition: disk.PartitionModification):
self._pacstrap('efibootmgr')
- if not has_uefi():
+ if not SysInfo.has_uefi():
raise HardwareIncompatibilityError
# TODO: Ideally we would want to check if another config
@@ -862,16 +831,18 @@ class Installer:
entry.write(f'# Created on: {self.init_time}\n')
entry.write(f'title Arch Linux ({kernel}{variant})\n')
entry.write(f"linux /vmlinuz-{kernel}\n")
- if not is_vm():
- vendor = cpu_vendor()
+ if not SysInfo.is_vm():
+ vendor = SysInfo.cpu_vendor()
if vendor == "AuthenticAMD":
entry.write("initrd /amd-ucode.img\n")
elif vendor == "GenuineIntel":
entry.write("initrd /intel-ucode.img\n")
else:
- self.log(
- f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.",
- level=logging.DEBUG)
+ debug(
+ f"Unknown CPU vendor '{vendor}' detected.",
+ "Archinstall won't add any ucode to systemd-boot config.",
+ )
+
entry.write(f"initrd /initramfs-{kernel}{variant}.img\n")
# blkid doesn't trigger on loopback devices really well,
# so we'll use the old manual method until we get that sorted out.
@@ -890,7 +861,7 @@ class Installer:
if root_partition.fs_type.is_crypto():
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log('Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
+ debug('Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}')
kernel_options = f"options"
@@ -905,7 +876,7 @@ class Installer:
entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}')
else:
- log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
+ debug(f'Identifying root partition by PARTUUID: {root_partition.partuuid}')
entry.write(f'options root=PARTUUID={root_partition.partuuid} {options_entry}')
self.helper_flags['bootloader'] = 'systemd'
@@ -920,7 +891,7 @@ class Installer:
_file = "/etc/default/grub"
if root_partition.fs_type.is_crypto():
- log(f"Using UUID {root_partition.uuid} as encrypted root identifier", level=logging.DEBUG)
+ debug(f"Using UUID {root_partition.uuid} as encrypted root identifier")
cmd_line_linux = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_partition.uuid}:cryptlvm rootfstype={root_partition.fs_type.value}\"/'"
enable_cryptdisk = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'"
@@ -931,9 +902,9 @@ class Installer:
SysCommand(f"/usr/bin/arch-chroot {self.target} {cmd_line_linux} {_file}")
- log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO)
+ info(f"GRUB boot partition: {boot_partition.dev_path}")
- if has_uefi():
+ if SysInfo.has_uefi():
self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
try:
@@ -941,8 +912,8 @@ class Installer:
except SysCallError:
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
- except SysCallError as error:
- raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}")
+ except SysCallError as err:
+ raise DiskError(f"Could not install GRUB to {self.target}/boot: {err}")
else:
device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path)
@@ -958,13 +929,13 @@ class Installer:
f' --recheck {device.device_info.path}'
SysCommand(cmd, peek_output=True)
- except SysCallError as error:
- raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}")
+ except SysCallError as err:
+ raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {err}")
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg')
- except SysCallError as error:
- raise DiskError(f"Could not configure GRUB: {error}")
+ except SysCallError as err:
+ raise DiskError(f"Could not configure GRUB: {err}")
self.helper_flags['bootloader'] = "grub"
@@ -975,7 +946,7 @@ class Installer:
):
self._pacstrap('efibootmgr')
- if not has_uefi():
+ if not SysInfo.has_uefi():
raise HardwareIncompatibilityError
# TODO: Ideally we would want to check if another config
@@ -989,14 +960,14 @@ class Installer:
kernel_parameters = []
- if not is_vm():
- vendor = cpu_vendor()
+ if not SysInfo.is_vm():
+ vendor = SysInfo.cpu_vendor()
if vendor == "AuthenticAMD":
kernel_parameters.append("initrd=\\amd-ucode.img")
elif vendor == "GenuineIntel":
kernel_parameters.append("initrd=\\intel-ucode.img")
else:
- self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.", level=logging.DEBUG)
+ debug(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.")
kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img")
@@ -1006,10 +977,10 @@ class Installer:
if root_partition.fs_type.is_crypto():
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
+ debug(f'Identifying root partition by PARTUUID: {root_partition.partuuid}')
kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}')
else:
- log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
+ debug(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}')
kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}')
device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path)
@@ -1060,7 +1031,7 @@ class Installer:
if root_partition is None:
raise ValueError(f'Could not detect root at mountpoint {self.target}')
- self.log(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}', level=logging.INFO)
+ info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}')
match bootloader:
case Bootloader.Systemd:
@@ -1078,7 +1049,7 @@ class Installer:
self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username)
def enable_sudo(self, entity: str, group :bool = False):
- self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
+ info(f'Enabling sudo permissions for {entity}')
sudoers_dir = f"{self.target}/etc/sudoers.d"
@@ -1127,11 +1098,11 @@ class Installer:
handled_by_plugin = result
if not handled_by_plugin:
- self.log(f'Creating user {user}', level=logging.INFO)
+ info(f'Creating user {user}')
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')
- except SysCallError as error:
- raise SystemError(f"Could not create user inside installation: {error}")
+ except SysCallError as err:
+ raise SystemError(f"Could not create user inside installation: {err}")
for plugin in plugins.values():
if hasattr(plugin, 'on_user_created'):
@@ -1149,7 +1120,7 @@ class Installer:
self.helper_flags['user'] = True
def user_set_pw(self, user :str, password :str) -> bool:
- self.log(f'Setting password for {user}', level=logging.INFO)
+ info(f'Setting password for {user}')
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
@@ -1166,7 +1137,7 @@ class Installer:
return False
def user_set_shell(self, user :str, shell :str) -> bool:
- self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
+ info(f'Setting shell for {user} to {shell}')
try:
SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")
@@ -1183,49 +1154,59 @@ class Installer:
return False
def set_keyboard_language(self, language: str) -> bool:
- log(f"Setting keyboard language to {language}", level=logging.INFO)
+ info(f"Setting keyboard language to {language}")
+
if len(language.strip()):
if not verify_keyboard_layout(language):
- self.log(f"Invalid keyboard language specified: {language}", fg="red", level=logging.ERROR)
+ error(f"Invalid keyboard language specified: {language}")
return False
# In accordance with https://github.com/archlinux/archinstall/issues/107#issuecomment-841701968
# Setting an empty keymap first, allows the subsequent call to set layout for both console and x11.
- from .systemd import Boot
+ from .boot import Boot
with Boot(self) as session:
os.system('/usr/bin/systemd-run --machine=archinstall --pty localectl set-keymap ""')
try:
session.SysCommand(["localectl", "set-keymap", language])
- except SysCallError as error:
- raise ServiceException(f"Unable to set locale '{language}' for console: {error}")
+ except SysCallError as err:
+ raise ServiceException(f"Unable to set locale '{language}' for console: {err}")
- self.log(f"Keyboard language for this installation is now set to: {language}")
+ info(f"Keyboard language for this installation is now set to: {language}")
else:
- self.log('Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
+ info('Keyboard language was not changed from default (no language specified)')
return True
def set_x11_keyboard_language(self, language: str) -> bool:
- log(f"Setting x11 keyboard language to {language}", level=logging.INFO)
"""
A fallback function to set x11 layout specifically and separately from console layout.
This isn't strictly necessary since .set_keyboard_language() does this as well.
"""
+ info(f"Setting x11 keyboard language to {language}")
+
if len(language.strip()):
if not verify_x11_keyboard_layout(language):
- self.log(f"Invalid x11-keyboard language specified: {language}", fg="red", level=logging.ERROR)
+ error(f"Invalid x11-keyboard language specified: {language}")
return False
- from .systemd import Boot
+ from .boot import Boot
with Boot(self) as session:
session.SysCommand(["localectl", "set-x11-keymap", '""'])
try:
session.SysCommand(["localectl", "set-x11-keymap", language])
- except SysCallError as error:
- raise ServiceException(f"Unable to set locale '{language}' for X11: {error}")
+ except SysCallError as err:
+ raise ServiceException(f"Unable to set locale '{language}' for X11: {err}")
else:
- self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
+ info(f'X11-Keyboard language was not changed from default (no language specified)')
return True
+
+ def _service_state(self, service_name: str) -> str:
+ if os.path.splitext(service_name)[1] != '.service':
+ service_name += '.service' # Just to be safe
+
+ state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'}))
+
+ return state.strip().decode('UTF-8')
diff --git a/archinstall/lib/interactions/__init__.py b/archinstall/lib/interactions/__init__.py
new file mode 100644
index 00000000..b5691a10
--- /dev/null
+++ b/archinstall/lib/interactions/__init__.py
@@ -0,0 +1,20 @@
+from .locale_conf import select_locale_lang, select_locale_enc
+from .manage_users_conf import UserList, ask_for_additional_users
+from .network_conf import ManualNetworkConfig, ask_to_configure_network
+from .utils import get_password
+
+from .disk_conf import (
+ select_devices, select_disk_config, get_default_partition_layout,
+ select_main_filesystem_format, suggest_single_disk_layout,
+ suggest_multi_disk_layout
+)
+
+from .general_conf import (
+ ask_ntp, ask_hostname, ask_for_a_timezone, ask_for_audio_selection, select_language,
+ select_mirror_regions, select_archinstall_language, ask_additional_packages_to_install,
+ add_number_of_parrallel_downloads, select_additional_repositories
+)
+
+from .system_conf import (
+ select_kernel, ask_for_bootloader, select_driver, ask_for_swap
+)
diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/interactions/disk_conf.py
index a77e950a..78e4cff4 100644
--- a/archinstall/lib/user_interaction/disk_conf.py
+++ b/archinstall/lib/interactions/disk_conf.py
@@ -1,14 +1,15 @@
from __future__ import annotations
-import logging
from pathlib import Path
-from typing import Any, TYPE_CHECKING, Optional, List, Tuple
+from typing import Any, TYPE_CHECKING
+from typing import Optional, List, Tuple
from .. import disk
-from ..hardware import has_uefi
-from ..menu import Menu, MenuSelectionType, TableMenu
-from ..output import FormattedOutput
-from ..output import log
+from ..hardware import SysInfo
+from ..menu import Menu
+from ..menu import TableMenu
+from ..menu.menu import MenuSelectionType
+from ..output import FormattedOutput, debug
from ..utils.util import prompt_dir
if TYPE_CHECKING:
@@ -170,7 +171,7 @@ def select_disk_config(
def _boot_partition() -> disk.PartitionModification:
- if has_uefi():
+ if SysInfo.has_uefi():
start = disk.Size(1, disk.Unit.MiB)
size = disk.Size(512, disk.Unit.MiB)
else:
@@ -189,7 +190,7 @@ def _boot_partition() -> disk.PartitionModification:
)
-def ask_for_main_filesystem_format(advanced_options=False) -> disk.FilesystemType:
+def select_main_filesystem_format(advanced_options=False) -> disk.FilesystemType:
options = {
'btrfs': disk.FilesystemType.Btrfs,
'ext4': disk.FilesystemType.Ext4,
@@ -212,7 +213,7 @@ def suggest_single_disk_layout(
separate_home: Optional[bool] = None
) -> disk.DeviceModification:
if not filesystem_type:
- filesystem_type = ask_for_main_filesystem_format(advanced_options)
+ filesystem_type = select_main_filesystem_format(advanced_options)
min_size_to_allow_home_part = disk.Size(40, disk.Unit.GiB)
root_partition_size = disk.Size(20, disk.Unit.GiB)
@@ -258,7 +259,7 @@ def suggest_single_disk_layout(
using_home_partition = False
# root partition
- start = disk.Size(513, disk.Unit.MiB) if has_uefi() else disk.Size(206, disk.Unit.MiB)
+ start = disk.Size(513, disk.Unit.MiB) if SysInfo.has_uefi() else disk.Size(206, disk.Unit.MiB)
# Set a size for / (/root)
if using_subvolumes or device_size_gib < min_size_to_allow_home_part or not using_home_partition:
@@ -324,7 +325,7 @@ def suggest_multi_disk_layout(
compression = False
if not filesystem_type:
- filesystem_type = ask_for_main_filesystem_format(advanced_options)
+ filesystem_type = select_main_filesystem_format(advanced_options)
# find proper disk for /home
possible_devices = list(filter(lambda x: x.device_info.total_size >= min_home_partition_size, devices))
@@ -353,9 +354,10 @@ def suggest_multi_disk_layout(
compression = choice.value == Menu.yes()
device_paths = ', '.join([str(d.device_info.path) for d in devices])
- log(f"Suggesting multi-disk-layout for devices: {device_paths}", level=logging.DEBUG)
- log(f"/root: {root_device.device_info.path}", level=logging.DEBUG)
- log(f"/home: {home_device.device_info.path}", level=logging.DEBUG)
+
+ debug(f"Suggesting multi-disk-layout for devices: {device_paths}")
+ debug(f"/root: {root_device.device_info.path}")
+ debug(f"/home: {home_device.device_info.path}")
root_device_modification = disk.DeviceModification(root_device, wipe=True)
home_device_modification = disk.DeviceModification(home_device, wipe=True)
@@ -368,7 +370,7 @@ def suggest_multi_disk_layout(
root_partition = disk.PartitionModification(
status=disk.ModificationStatus.Create,
type=disk.PartitionType.Primary,
- start=disk.Size(513, disk.Unit.MiB) if has_uefi() else disk.Size(206, disk.Unit.MiB),
+ start=disk.Size(513, disk.Unit.MiB) if SysInfo.has_uefi() else disk.Size(206, disk.Unit.MiB),
length=disk.Size(100, disk.Unit.Percent, total_size=root_device.device_info.total_size),
mountpoint=Path('/'),
mount_options=['compress=zstd'] if compression else [],
diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/interactions/general_conf.py
index 9722dc4d..5fcfa633 100644
--- a/archinstall/lib/user_interaction/general_conf.py
+++ b/archinstall/lib/interactions/general_conf.py
@@ -1,13 +1,12 @@
from __future__ import annotations
-import logging
import pathlib
from typing import List, Any, Optional, Dict, TYPE_CHECKING
-from ..locale_helpers import list_keyboard_languages, list_timezones
+from ..locale import list_keyboard_languages, list_timezones
from ..menu import MenuSelectionType, Menu, TextInput
from ..mirrors import list_mirrors
-from ..output import log
+from ..output import warn
from ..packages.packages import validate_package_list
from ..storage import storage
from ..translationhandler import Language
@@ -176,7 +175,7 @@ def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List
valid, invalid = validate_package_list(packages)
if invalid:
- log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red')
+ warn(f"Some packages could not be found in the repository: {invalid}")
packages = read_packages(valid)
continue
break
diff --git a/archinstall/lib/user_interaction/locale_conf.py b/archinstall/lib/interactions/locale_conf.py
index cdc3423a..de115202 100644
--- a/archinstall/lib/user_interaction/locale_conf.py
+++ b/archinstall/lib/interactions/locale_conf.py
@@ -1,8 +1,6 @@
-from __future__ import annotations
-
from typing import Any, TYPE_CHECKING, Optional
-from ..locale_helpers import list_locales
+from ..locale import list_locales
from ..menu import Menu, MenuSelectionType
if TYPE_CHECKING:
diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/interactions/manage_users_conf.py
index 879578da..879578da 100644
--- a/archinstall/lib/user_interaction/manage_users_conf.py
+++ b/archinstall/lib/interactions/manage_users_conf.py
diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/interactions/network_conf.py
index b682c1d2..18a834a1 100644
--- a/archinstall/lib/user_interaction/network_conf.py
+++ b/archinstall/lib/interactions/network_conf.py
@@ -1,14 +1,13 @@
from __future__ import annotations
import ipaddress
-import logging
from typing import Any, Optional, TYPE_CHECKING, List, Union, Dict
from ..menu import MenuSelectionType, TextInput
from ..models.network_configuration import NetworkConfiguration, NicType
from ..networking import list_interfaces
-from ..output import log, FormattedOutput
+from ..output import FormattedOutput, warn
from ..menu import ListManager, Menu
if TYPE_CHECKING:
@@ -91,7 +90,7 @@ class ManualNetworkConfig(ListManager):
ipaddress.ip_interface(ip)
break
except ValueError:
- log("You need to enter a valid IP in IP-config mode.", level=logging.WARNING, fg='red')
+ warn("You need to enter a valid IP in IP-config mode")
# Implemented new check for correct gateway IP address
gateway = None
@@ -106,7 +105,7 @@ class ManualNetworkConfig(ListManager):
ipaddress.ip_address(gateway)
break
except ValueError:
- log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red')
+ warn("You need to enter a valid gateway (router) IP address")
if edit_iface.dns:
display_dns = ' '.join(edit_iface.dns)
diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/interactions/system_conf.py
index 3f57d0e7..bbcb5b23 100644
--- a/archinstall/lib/user_interaction/system_conf.py
+++ b/archinstall/lib/interactions/system_conf.py
@@ -2,7 +2,7 @@ from __future__ import annotations
from typing import List, Any, Dict, TYPE_CHECKING, Optional
-from ..hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics
+from ..hardware import AVAILABLE_GFX_DRIVERS, SysInfo
from ..menu import MenuSelectionType, Menu
from ..models.bootloader import Bootloader
@@ -41,7 +41,7 @@ def select_kernel(preset: List[str] = []) -> List[str]:
def ask_for_bootloader(preset: Bootloader) -> Bootloader:
# when the system only supports grub
- if not has_uefi():
+ if not SysInfo.has_uefi():
options = [Bootloader.Grub.value]
default = Bootloader.Grub.value
else:
@@ -81,11 +81,11 @@ def select_driver(options: Dict[str, Any] = {}, current_value: Optional[str] = N
if drivers:
title = ''
- if has_amd_graphics():
+ if SysInfo.has_amd_graphics():
title += str(_('For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.')) + '\n'
- if has_intel_graphics():
+ if SysInfo.has_intel_graphics():
title += str(_('For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n'))
- if has_nvidia_graphics():
+ if SysInfo.has_nvidia_graphics():
title += str(_('For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n'))
title += str(_('\nSelect a graphics driver or leave blank to install all open-source drivers'))
diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/interactions/utils.py
index 918945c0..f6b5b2d3 100644
--- a/archinstall/lib/user_interaction/utils.py
+++ b/archinstall/lib/interactions/utils.py
@@ -4,7 +4,7 @@ import getpass
from typing import Any, Optional, TYPE_CHECKING
from ..models import PasswordStrength
-from ..output import log
+from ..output import log, error
if TYPE_CHECKING:
_: Any
@@ -26,7 +26,7 @@ def get_password(prompt: str = '') -> Optional[str]:
passwd_verification = getpass.getpass(prompt=_('And one more time for verification: '))
if password != passwd_verification:
- log(' * Passwords did not match * ', fg='red')
+ error(' * Passwords did not match * ')
continue
return password
diff --git a/archinstall/lib/locale.py b/archinstall/lib/locale.py
new file mode 100644
index 00000000..0a36c072
--- /dev/null
+++ b/archinstall/lib/locale.py
@@ -0,0 +1,68 @@
+from typing import Iterator, List
+
+from .exceptions import ServiceException, SysCallError
+from .general import SysCommand
+from .output import error
+
+
+def list_keyboard_languages() -> Iterator[str]:
+ for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
+ yield line.decode('UTF-8').strip()
+
+
+def list_locales() -> List[str]:
+ with open('/etc/locale.gen', 'r') as fp:
+ locales = []
+ # before the list of locales begins there's an empty line with a '#' in front
+ # so we'll collect the localels from bottom up and halt when we're donw
+ entries = fp.readlines()
+ entries.reverse()
+
+ for entry in entries:
+ text = entry.replace('#', '').strip()
+ if text == '':
+ break
+ locales.append(text)
+
+ locales.reverse()
+ return locales
+
+
+def list_x11_keyboard_languages() -> Iterator[str]:
+ for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
+ yield line.decode('UTF-8').strip()
+
+
+def verify_keyboard_layout(layout :str) -> bool:
+ for language in list_keyboard_languages():
+ if layout.lower() == language.lower():
+ return True
+ return False
+
+
+def verify_x11_keyboard_layout(layout :str) -> bool:
+ for language in list_x11_keyboard_languages():
+ if layout.lower() == language.lower():
+ return True
+ return False
+
+
+def set_keyboard_language(locale :str) -> bool:
+ if len(locale.strip()):
+ if not verify_keyboard_layout(locale):
+ error(f"Invalid keyboard locale specified: {locale}")
+ return False
+
+ try:
+ SysCommand(f'localectl set-keymap {locale}')
+ except SysCallError as err:
+ raise ServiceException(f"Unable to set locale '{locale}' for console: {err}")
+
+ return True
+
+ return False
+
+
+def list_timezones() -> Iterator[str]:
+ for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}):
+ yield line.decode('UTF-8').strip()
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
deleted file mode 100644
index efb0365f..00000000
--- a/archinstall/lib/locale_helpers.py
+++ /dev/null
@@ -1,176 +0,0 @@
-import logging
-from typing import Iterator, List, Callable, Optional
-
-from .exceptions import ServiceException, SysCallError
-from .general import SysCommand
-from .output import log
-from .storage import storage
-
-
-def list_keyboard_languages() -> Iterator[str]:
- for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
- yield line.decode('UTF-8').strip()
-
-
-def list_locales() -> List[str]:
- with open('/etc/locale.gen', 'r') as fp:
- locales = []
- # before the list of locales begins there's an empty line with a '#' in front
- # so we'll collect the localels from bottom up and halt when we're donw
- entries = fp.readlines()
- entries.reverse()
-
- for entry in entries:
- text = entry.replace('#', '').strip()
- if text == '':
- break
- locales.append(text)
-
- locales.reverse()
- return locales
-
-def get_locale_mode_text(mode):
- if mode == 'LC_ALL':
- mode_text = "general (LC_ALL)"
- elif mode == "LC_CTYPE":
- mode_text = "Character set"
- elif mode == "LC_NUMERIC":
- mode_text = "Numeric values"
- elif mode == "LC_TIME":
- mode_text = "Time Values"
- elif mode == "LC_COLLATE":
- mode_text = "sort order"
- elif mode == "LC_MESSAGES":
- mode_text = "text messages"
- else:
- mode_text = "Unassigned"
- return mode_text
-
-
-def reset_cmd_locale():
- """ sets the cmd_locale to its saved default """
- storage['CMD_LOCALE'] = storage.get('CMD_LOCALE_DEFAULT',{})
-
-
-def unset_cmd_locale():
- """ archinstall will use the execution environment default """
- storage['CMD_LOCALE'] = {}
-
-
-def set_cmd_locale(
- general: Optional[str] = None,
- charset :str = 'C',
- numbers :str = 'C',
- time :str = 'C',
- collate :str = 'C',
- messages :str = 'C'
-):
- """
- Set the cmd locale.
- If the parameter general is specified, it takes precedence over the rest (might as well not exist)
- The rest define some specific settings above the installed default language. If anyone of this parameters is none means the installation default
- """
- installed_locales = list_installed_locales()
- result = {}
- if general:
- if general in installed_locales:
- storage['CMD_LOCALE'] = {'LC_ALL':general}
- else:
- log(f"{get_locale_mode_text('LC_ALL')} {general} is not installed. Defaulting to C",fg="yellow",level=logging.WARNING)
- return
-
- if numbers:
- if numbers in installed_locales:
- result["LC_NUMERIC"] = numbers
- else:
- log(f"{get_locale_mode_text('LC_NUMERIC')} {numbers} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
- if charset:
- if charset in installed_locales:
- result["LC_CTYPE"] = charset
- else:
- log(f"{get_locale_mode_text('LC_CTYPE')} {charset} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
- if time:
- if time in installed_locales:
- result["LC_TIME"] = time
- else:
- log(f"{get_locale_mode_text('LC_TIME')} {time} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
- if collate:
- if collate in installed_locales:
- result["LC_COLLATE"] = collate
- else:
- log(f"{get_locale_mode_text('LC_COLLATE')} {collate} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
- if messages:
- if messages in installed_locales:
- result["LC_MESSAGES"] = messages
- else:
- log(f"{get_locale_mode_text('LC_MESSAGES')} {messages} is not installed. Defaulting to installation language",fg="yellow",level=logging.WARNING)
- storage['CMD_LOCALE'] = result
-
-def host_locale_environ(func :Callable):
- """ decorator when we want a function executing in the host's locale environment """
- def wrapper(*args, **kwargs):
- unset_cmd_locale()
- result = func(*args,**kwargs)
- reset_cmd_locale()
- return result
- return wrapper
-
-def c_locale_environ(func :Callable):
- """ decorator when we want a function executing in the C locale environment """
- def wrapper(*args, **kwargs):
- set_cmd_locale(general='C')
- result = func(*args,**kwargs)
- reset_cmd_locale()
- return result
- return wrapper
-
-def list_installed_locales() -> List[str]:
- lista = []
- for line in SysCommand('locale -a'):
- lista.append(line.decode('UTF-8').strip())
- return lista
-
-def list_x11_keyboard_languages() -> Iterator[str]:
- for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
- yield line.decode('UTF-8').strip()
-
-
-def verify_keyboard_layout(layout :str) -> bool:
- for language in list_keyboard_languages():
- if layout.lower() == language.lower():
- return True
- return False
-
-
-def verify_x11_keyboard_layout(layout :str) -> bool:
- for language in list_x11_keyboard_languages():
- if layout.lower() == language.lower():
- return True
- return False
-
-
-def search_keyboard_layout(layout :str) -> Iterator[str]:
- for language in list_keyboard_languages():
- if layout.lower() in language.lower():
- yield language
-
-
-def set_keyboard_language(locale :str) -> bool:
- if len(locale.strip()):
- if not verify_keyboard_layout(locale):
- log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR)
- return False
-
- try:
- SysCommand(f'localectl set-keymap {locale}')
- except SysCallError as error:
- raise ServiceException(f"Unable to set locale '{locale}' for console: {error}")
-
- return True
-
- return False
-
-
-def list_timezones() -> Iterator[str]:
- for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}):
- yield line.decode('UTF-8').strip()
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 53a5e8d2..f9b09b53 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-import logging
import shlex
import time
from dataclasses import dataclass
@@ -9,7 +8,7 @@ from typing import Optional, List
from . import disk
from .general import SysCommand, generate_password, SysCommandWorker
-from .output import log
+from .output import info, debug
from .exceptions import SysCallError, DiskError
from .storage import storage
@@ -61,7 +60,7 @@ class Luks2:
iter_time: int = 10000,
key_file: Optional[Path] = None
) -> Path:
- log(f'Luks2 encrypting: {self.luks_dev_path}', level=logging.INFO)
+ info(f'Luks2 encrypting: {self.luks_dev_path}')
byte_password = self._password_bytes()
@@ -95,21 +94,21 @@ class Luks2:
try:
SysCommand(cryptsetup_args)
break
- except SysCallError as error:
+ except SysCallError as err:
time.sleep(storage['DISK_TIMEOUTS'])
if retry_attempt != storage['DISK_RETRY_ATTEMPTS'] - 1:
continue
- if error.exit_code == 1:
- log(f'luks2 partition currently in use: {self.luks_dev_path}')
- log('Attempting to unmount, crypt-close and trying encryption again')
+ if err.exit_code == 1:
+ info(f'luks2 partition currently in use: {self.luks_dev_path}')
+ info('Attempting to unmount, crypt-close and trying encryption again')
self.lock()
# Then try again to set up the crypt-device
SysCommand(cryptsetup_args)
else:
- raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {error}')
+ raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}')
return key_file
@@ -119,7 +118,7 @@ class Luks2:
try:
return SysCommand(command).decode().strip() # type: ignore
except SysCallError as err:
- log(f'Unable to get UUID for Luks device: {self.luks_dev_path}', level=logging.INFO)
+ info(f'Unable to get UUID for Luks device: {self.luks_dev_path}')
raise err
def is_unlocked(self) -> bool:
@@ -133,7 +132,7 @@ class Luks2:
:param key_file: An alternative key file
:type key_file: Path
"""
- log(f'Unlocking luks2 device: {self.luks_dev_path}', level=logging.DEBUG)
+ debug(f'Unlocking luks2 device: {self.luks_dev_path}')
if not self.mapper_name:
raise ValueError('mapper name missing')
@@ -170,11 +169,11 @@ class Luks2:
for child in lsblk_info.children:
# Unmount the child location
for mountpoint in child.mountpoints:
- log(f'Unmounting {mountpoint}', level=logging.DEBUG)
+ debug(f'Unmounting {mountpoint}')
disk.device_handler.umount(mountpoint, recursive=True)
# And close it if possible.
- log(f"Closing crypt device {child.name}", level=logging.DEBUG)
+ debug(f"Closing crypt device {child.name}")
SysCommand(f"cryptsetup close {child.name}")
self._mapper_dev = None
@@ -194,10 +193,10 @@ class Luks2:
if key_file.exists():
if not override:
- log(f'Key file {key_file} already exists, keeping existing')
+ info(f'Key file {key_file} already exists, keeping existing')
return
else:
- log(f'Key file {key_file} already exists, overriding')
+ info(f'Key file {key_file} already exists, overriding')
key_file_path.mkdir(parents=True, exist_ok=True)
@@ -210,7 +209,7 @@ class Luks2:
self._crypttab(crypttab_path, key_file, options=["luks", "key-slot=1"])
def _add_key(self, key_file: Path):
- log(f'Adding additional key-file {key_file}', level=logging.INFO)
+ info(f'Adding additional key-file {key_file}')
command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}'
worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'})
@@ -230,7 +229,7 @@ class Luks2:
key_file: Path,
options: List[str]
) -> None:
- log(f'Adding crypttab entry for key {key_file}', level=logging.INFO)
+ info(f'Adding crypttab entry for key {key_file}')
with open(crypttab_path, 'a') as crypttab:
opt = ','.join(options)
diff --git a/archinstall/lib/menu/abstract_menu.py b/archinstall/lib/menu/abstract_menu.py
index e44d65a4..2bd56374 100644
--- a/archinstall/lib/menu/abstract_menu.py
+++ b/archinstall/lib/menu/abstract_menu.py
@@ -1,11 +1,10 @@
from __future__ import annotations
-import logging
from typing import Callable, Any, List, Iterator, Tuple, Optional, Dict, TYPE_CHECKING
from .menu import Menu, MenuSelectionType
-from ..locale_helpers import set_keyboard_language
-from ..output import log
+from ..locale import set_keyboard_language
+from ..output import error
from ..translationhandler import TranslationHandler, Language
if TYPE_CHECKING:
@@ -211,7 +210,7 @@ class AbstractMenu:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
# TODO: skip processing when it comes from a planified exit
if len(args) >= 2 and args[1]:
- log(args[1], level=logging.ERROR, fg='red')
+ error(args[1])
print(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")
raise args[1]
@@ -483,7 +482,7 @@ class AbstractMenu:
yield item
def _select_archinstall_language(self, preset: Language) -> Language:
- from ..user_interaction.general_conf import select_archinstall_language
+ from ..interactions.general_conf import select_archinstall_language
language = select_archinstall_language(self.translation_handler.translated_languages, preset)
self._translation_handler.activate(language)
return language
diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py
index f3fdb85f..768dfe55 100644
--- a/archinstall/lib/menu/menu.py
+++ b/archinstall/lib/menu/menu.py
@@ -6,11 +6,8 @@ from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional, Callable
from simple_term_menu import TerminalMenu # type: ignore
from ..exceptions import RequirementError
-from ..output import log
+from ..output import debug
-from collections.abc import Iterable
-import sys
-import logging
if TYPE_CHECKING:
_: Any
@@ -127,33 +124,15 @@ class Menu(TerminalMenu):
:param extra_bottom_space: Add an extra empty line at the end of the menu
:type extra_bottom_space: bool
"""
- # we guarantee the inmutability of the options outside the class.
- # an unknown number of iterables (.keys(),.values(),generator,...) can't be directly copied, in this case
- # we recourse to make them lists before, but thru an exceptions
- # this is the old code, which is not maintenable with more types
- # options = copy(list(p_options) if isinstance(p_options,(type({}.keys()),type({}.values()))) else p_options)
- # We check that the options are iterable. If not we abort. Else we copy them to lists
- # it options is a dictionary we use the values as entries of the list
- # if options is a string object, each character becomes an entry
- # if options is a list, we implictily build a copy to maintain immutability
- if not isinstance(p_options,Iterable):
- log(f"Objects of type {type(p_options)} is not iterable, and are not supported at Menu",fg="red")
- log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING)
- raise RequirementError("Menu() requires an iterable as option.")
-
- if isinstance(p_options,dict):
+ if isinstance(p_options, Dict):
options = list(p_options.keys())
else:
options = list(p_options)
if not options:
- log(" * Menu didn't find any options to choose from * ", fg='red')
- log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING)
raise RequirementError('Menu.__init__() requires at least one option to proceed.')
if any([o for o in options if not isinstance(o, str)]):
- log(" * Menu options must be of type string * ", fg='red')
- log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING)
raise RequirementError('Menu.__init__() requires the options to be of type string')
if sort:
@@ -343,7 +322,7 @@ class Menu(TerminalMenu):
idx = self._menu_options.index(self._default_menu_value)
indexes.append(idx)
except (IndexError, ValueError):
- log(f'Error finding index of {p}: {self._menu_options}', level=logging.DEBUG)
+ debug(f'Error finding index of {p}: {self._menu_options}')
if len(indexes) == 0:
indexes.append(0)
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index c6c5c8e4..62a0b081 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -1,4 +1,3 @@
-import logging
import pathlib
import urllib.error
import urllib.request
@@ -6,7 +5,7 @@ from typing import Union, Iterable, Dict, Any, List
from dataclasses import dataclass
from .general import SysCommand
-from .output import log
+from .output import info, warn
from .exceptions import SysCallError
from .storage import storage
@@ -136,7 +135,7 @@ def use_mirrors(
regions: Dict[str, Iterable[str]],
destination: str = '/etc/pacman.d/mirrorlist'
):
- log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
+ info(f'A new package mirror-list has been created: {destination}')
with open(destination, 'w') as mirrorlist:
for region, mirrors in regions.items():
for mirror in mirrors:
@@ -170,7 +169,7 @@ def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
try:
response = urllib.request.urlopen(url)
except urllib.error.URLError as err:
- log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange")
+ warn(f'Could not fetch an active mirror-list: {err}')
return regions
mirrorlist = response.read()
diff --git a/archinstall/lib/models/bootloader.py b/archinstall/lib/models/bootloader.py
index 38254c99..e21cda33 100644
--- a/archinstall/lib/models/bootloader.py
+++ b/archinstall/lib/models/bootloader.py
@@ -1,12 +1,11 @@
from __future__ import annotations
-import logging
import sys
from enum import Enum
from typing import List
-from ..hardware import has_uefi
-from ..output import log
+from ..hardware import SysInfo
+from ..output import warn
class Bootloader(Enum):
@@ -23,7 +22,7 @@ class Bootloader(Enum):
@classmethod
def get_default(cls) -> Bootloader:
- if has_uefi():
+ if SysInfo.has_uefi():
return Bootloader.Systemd
else:
return Bootloader.Grub
@@ -35,6 +34,6 @@ class Bootloader(Enum):
if bootloader not in cls.values():
values = ', '.join(cls.values())
- log(f'Invalid bootloader value "{bootloader}". Allowed values: {values}', level=logging.WARN)
+ warn(f'Invalid bootloader value "{bootloader}". Allowed values: {values}')
sys.exit(1)
return Bootloader(bootloader)
diff --git a/archinstall/lib/models/network_configuration.py b/archinstall/lib/models/network_configuration.py
index a8795fc1..93dd1c44 100644
--- a/archinstall/lib/models/network_configuration.py
+++ b/archinstall/lib/models/network_configuration.py
@@ -1,11 +1,10 @@
from __future__ import annotations
-import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING, Tuple
-from ..output import log
+from ..output import debug
from ..profile import ProfileConfiguration
if TYPE_CHECKING:
@@ -138,8 +137,7 @@ class NetworkConfigurationHandler:
iface = manual_config.get('iface', None)
if iface is None:
- log(_('No iface specified for manual configuration'))
- exit(1)
+ raise ValueError('No iface specified for manual configuration')
if manual_config.get('dhcp', False) or not any([manual_config.get(v, '') for v in ['ip', 'gateway', 'dns']]):
configurations.append(
@@ -148,8 +146,7 @@ class NetworkConfigurationHandler:
else:
ip = manual_config.get('ip', '')
if not ip:
- log(_('Manual nic configuration with no auto DHCP requires an IP address'), fg='red')
- exit(1)
+ raise ValueError('Manual nic configuration with no auto DHCP requires an IP address')
dns = manual_config.get('dns', [])
if not isinstance(dns, list):
@@ -173,8 +170,7 @@ class NetworkConfigurationHandler:
return NicType(nic_type)
except ValueError:
options = [e.value for e in NicType]
- log(_('Unknown nic type: {}. Possible values are {}').format(nic_type, options), fg='red')
- exit(1)
+ raise ValueError(f'Unknown nic type: {nic_type}. Possible values are {options}')
def parse_arguments(self, config: Any):
if isinstance(config, list): # new data format
@@ -187,4 +183,4 @@ class NetworkConfigurationHandler:
else: # manual configuration settings
self._configuration = self._parse_manual_config([config])
else:
- log(f'Unable to parse network configuration: {config}', level=logging.DEBUG)
+ debug(f'Unable to parse network configuration: {config}')
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index b858daaf..6906c320 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -1,4 +1,3 @@
-import logging
import os
import socket
import ssl
@@ -8,18 +7,16 @@ from urllib.error import URLError
from urllib.parse import urlencode
from urllib.request import urlopen
-from .exceptions import HardwareIncompatibilityError, SysCallError
-from .general import SysCommand
-from .output import log
+from .exceptions import SysCallError
+from .output import error, info, debug
from .pacman import run_pacman
-from .storage import storage
def get_hw_addr(ifname :str) -> str:
import fcntl
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
- return ':'.join('%02x' % b for b in info[18:24])
+ ret = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
+ return ':'.join('%02x' % b for b in ret[18:24])
def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
@@ -36,26 +33,26 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
def check_mirror_reachable() -> bool:
- log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO)
+ info("Testing connectivity to the Arch Linux mirrors...")
try:
run_pacman("-Sy")
return True
except SysCallError as err:
if os.geteuid() != 0:
- log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
- log(f'exit_code: {err.exit_code}, Error: {err.message}', level=logging.DEBUG)
+ error("check_mirror_reachable() uses 'pacman -Sy' which requires root.")
+ debug(f'exit_code: {err.exit_code}, Error: {err.message}')
return False
def update_keyring() -> bool:
- log("Updating archlinux-keyring ...", level=logging.INFO)
+ info("Updating archlinux-keyring ...")
try:
run_pacman("-Sy --noconfirm archlinux-keyring")
return True
except SysCallError:
if os.geteuid() != 0:
- log("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.", level=logging.ERROR, fg="red")
+ error("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.")
return False
@@ -80,38 +77,6 @@ def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str
return result
-def wireless_scan(interface :str) -> None:
- interfaces = enrich_iface_types(list(list_interfaces().values()))
- if interfaces[interface] != 'WIRELESS':
- raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
-
- try:
- SysCommand(f"iwctl station {interface} scan")
- except SysCallError as error:
- raise SystemError(f"Could not scan for wireless networks: {error}")
-
- if '_WIFI' not in storage:
- storage['_WIFI'] = {}
- if interface not in storage['_WIFI']:
- storage['_WIFI'][interface] = {}
-
- storage['_WIFI'][interface]['scanning'] = True
-
-
-# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
-def get_wireless_networks(interface :str) -> None:
- # TODO: Make this oneliner pritter to check if the interface is scanning or not.
- # TODO: Rename this to list_wireless_networks() as it doesn't return anything
- if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
- import time
-
- wireless_scan(interface)
- time.sleep(5)
-
- for line in SysCommand(f"iwctl station {interface} get-networks"):
- print(line)
-
-
def fetch_data_from_url(url: str, params: Optional[Dict] = None) -> str:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index d65f835f..bd31b5b3 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -1,15 +1,16 @@
import logging
import os
import sys
+from enum import Enum
+
from pathlib import Path
from typing import Dict, Union, List, Any, Callable, Optional
+from dataclasses import asdict, is_dataclass
from .storage import storage
-from dataclasses import asdict, is_dataclass
class FormattedOutput:
-
@classmethod
def values(
cls,
@@ -118,7 +119,7 @@ class FormattedOutput:
class Journald:
@staticmethod
- def log(message :str, level :int = logging.DEBUG) -> None:
+ def log(message: str, level: int = logging.DEBUG) -> None:
try:
import systemd.journal # type: ignore
except ModuleNotFoundError:
@@ -134,16 +135,37 @@ class Journald:
log_adapter.log(level, message)
-# TODO: Replace log() for session based logging.
-class SessionLogging:
- def __init__(self):
- pass
+def check_log_permissions():
+ filename = storage.get('LOG_FILE', None)
+
+ if not filename:
+ return
+
+ log_dir = storage.get('LOG_PATH', Path('./'))
+ absolute_logfile = log_dir / filename
+
+ try:
+ log_dir.mkdir(exist_ok=True, parents=True)
+ with absolute_logfile.open('a') as fp:
+ fp.write('')
+ except PermissionError:
+ # Fallback to creating the log file in the current folder
+ fallback_log_file = Path('./').absolute() / filename
+ absolute_logfile = fallback_log_file
+ absolute_logfile.mkdir(exist_ok=True, parents=True)
+ storage['LOG_PATH'] = Path('./').absolute()
+ err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {fallback_log_file} instead."
+ warn(err_string)
-# Found first reference here: https://stackoverflow.com/questions/7445658/how-to-detect-if-the-console-does-support-ansi-escape-codes-in-python
-# And re-used this: https://github.com/django/django/blob/master/django/core/management/color.py#L12
-def supports_color() -> bool:
+
+def _supports_color() -> bool:
"""
+ Found first reference here:
+ https://stackoverflow.com/questions/7445658/how-to-detect-if-the-console-does-support-ansi-escape-codes-in-python
+ And re-used this:
+ https://github.com/django/django/blob/master/django/core/management/color.py#L12
+
Return True if the running system's terminal supports color,
and False otherwise.
"""
@@ -154,13 +176,30 @@ def supports_color() -> bool:
return supported_platform and is_a_tty
-# Heavily influenced by: https://github.com/django/django/blob/ae8338daf34fd746771e0678081999b656177bae/django/utils/termcolors.py#L13
-# Color options here: https://askubuntu.com/questions/528928/how-to-do-underline-bold-italic-strikethrough-color-background-and-size-i
-def stylize_output(text: str, *opts :str, **kwargs) -> str:
+class Font(Enum):
+ bold = '1'
+ italic = '3'
+ underscore = '4'
+ blink = '5'
+ reverse = '7'
+ conceal = '8'
+
+
+def _stylize_output(
+ text: str,
+ fg: str,
+ bg: Optional[str],
+ reset: bool,
+ font: List[Font] = [],
+) -> str:
"""
+ Heavily influenced by:
+ https://github.com/django/django/blob/ae8338daf34fd746771e0678081999b656177bae/django/utils/termcolors.py#L13
+ Color options here:
+ https://askubuntu.com/questions/528928/how-to-do-underline-bold-italic-strikethrough-color-background-and-size-i
+
Adds styling to a text given a set of color arguments.
"""
- opt_dict = {'bold': '1', 'italic': '3', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'}
colors = {
'black' : '0',
'red' : '1',
@@ -178,65 +217,72 @@ def stylize_output(text: str, *opts :str, **kwargs) -> str:
'darkgray' : '8;5;240',
'lightgray' : '8;5;256'
}
+
foreground = {key: f'3{colors[key]}' for key in colors}
background = {key: f'4{colors[key]}' for key in colors}
- reset = '0'
-
code_list = []
- if text == '' and len(opts) == 1 and opts[0] == 'reset':
- return '\x1b[%sm' % reset
- for k, v in kwargs.items():
- if k == 'fg':
- code_list.append(foreground[str(v)])
- elif k == 'bg':
- code_list.append(background[str(v)])
+ if text == '' and reset:
+ return '\x1b[%sm' % '0'
+
+ code_list.append(foreground[str(fg)])
+
+ if bg:
+ code_list.append(background[str(bg)])
+
+ for o in font:
+ code_list.append(o.value)
+
+ ansi = ';'.join(code_list)
+
+ return f'\033[{ansi}m{text}\033[0m'
+
- for o in opts:
- if o in opt_dict:
- code_list.append(opt_dict[o])
+def info(*msgs: str):
+ log(*msgs, level=logging.INFO)
- if 'noreset' not in opts:
- text = '%s\x1b[%sm' % (text or '', reset)
- return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '')
+def debug(*msgs: str):
+ log(*msgs, level=logging.DEBUG)
-def log(*args :str, **kwargs :Union[str, int, Dict[str, Union[str, int]]]) -> None:
- string = orig_string = ' '.join([str(x) for x in args])
+def error(*msgs: str):
+ log(*msgs, level=logging.ERROR, fg='red')
+
+
+def warn(*msgs: str):
+ log(*msgs, level=logging.WARNING, fg='yellow')
+
+
+def log(
+ *msgs: str,
+ level: int = logging.INFO,
+ fg: str = 'white',
+ bg: Optional[str] = None,
+ reset: bool = False,
+ font: List[Font] = []
+):
+ text = orig_string = ' '.join([str(x) for x in msgs])
# Attempt to colorize the output if supported
# Insert default colors and override with **kwargs
- if supports_color():
- kwargs = {'fg': 'white', **kwargs}
- string = stylize_output(string, **kwargs)
+ if _supports_color():
+ text = _stylize_output(text, fg, bg, reset, font)
# If a logfile is defined in storage,
# we use that one to output everything
if filename := storage.get('LOG_FILE', None):
- absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
+ log_dir = storage.get('LOG_PATH', Path('./'))
+ absolute_logfile = log_dir / filename
- try:
- Path(absolute_logfile).parents[0].mkdir(exist_ok=True, parents=True)
- with open(absolute_logfile, 'a') as log_file:
- log_file.write("")
- except PermissionError:
- # Fallback to creating the log file in the current folder
- err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {Path('./').absolute() / filename} instead."
- absolute_logfile = Path('./').absolute() / filename
- absolute_logfile.parents[0].mkdir(exist_ok=True)
- absolute_logfile = str(absolute_logfile)
- storage['LOG_PATH'] = './'
- log(err_string, fg="red")
-
- with open(absolute_logfile, 'a') as log_file:
- log_file.write(f"{orig_string}\n")
-
- Journald.log(string, level=int(str(kwargs.get('level', logging.INFO))))
+ with open(absolute_logfile, 'a') as fp:
+ fp.write(f"{orig_string}\n")
+
+ Journald.log(text, level=level)
# Finally, print the log unless we skipped it based on level.
# We use sys.stdout.write()+flush() instead of print() to try and
# fix issue #94
- if kwargs.get('level', logging.INFO) != logging.DEBUG or storage.get('arguments', {}).get('verbose', False):
- sys.stdout.write(f"{string}\n")
+ if level != logging.DEBUG or storage.get('arguments', {}).get('verbose', False):
+ sys.stdout.write(f"{text}\n")
sys.stdout.flush()
diff --git a/archinstall/lib/pacman.py b/archinstall/lib/pacman.py
index 0dfd5afa..f5514f05 100644
--- a/archinstall/lib/pacman.py
+++ b/archinstall/lib/pacman.py
@@ -1,10 +1,9 @@
-import logging
import pathlib
import time
from typing import TYPE_CHECKING, Any
from .general import SysCommand
-from .output import log
+from .output import warn, error
if TYPE_CHECKING:
_: Any
@@ -19,14 +18,14 @@ def run_pacman(args :str, default_cmd :str = 'pacman') -> SysCommand:
pacman_db_lock = pathlib.Path('/var/lib/pacman/db.lck')
if pacman_db_lock.exists():
- log(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.'), level=logging.WARNING, fg="red")
+ warn(_('Pacman is already running, waiting maximum 10 minutes for it to terminate.'))
started = time.time()
while pacman_db_lock.exists():
time.sleep(0.25)
if time.time() - started > (60 * 10):
- log(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.'), level=logging.WARNING, fg="red")
+ error(_('Pre-existing pacman lock never exited. Please clean up any existing pacman sessions before using archinstall.'))
exit(1)
return SysCommand(f'{default_cmd} {args}')
diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py
index b1ece04f..4ccb0666 100644
--- a/archinstall/lib/plugins.py
+++ b/archinstall/lib/plugins.py
@@ -1,6 +1,5 @@
import hashlib
import importlib
-import logging
import os
import sys
import urllib.parse
@@ -9,7 +8,7 @@ from importlib import metadata
from pathlib import Path
from typing import Optional, List
-from .output import log
+from .output import error, info, warn
from .storage import storage
plugins = {}
@@ -24,11 +23,13 @@ for plugin_definition in metadata.entry_points().select(group='archinstall.plugi
try:
plugins[plugin_definition.name] = plugin_entrypoint()
except Exception as err:
- log(f'Error: {err}', level=logging.ERROR)
- log(f"The above error was detected when loading the plugin: {plugin_definition}", fg="red", level=logging.ERROR)
+ error(
+ f'Error: {err}',
+ f"The above error was detected when loading the plugin: {plugin_definition}"
+ )
-def localize_path(path: Path) -> Path:
+def _localize_path(path: Path) -> Path:
"""
Support structures for load_plugin()
"""
@@ -45,7 +46,7 @@ def localize_path(path: Path) -> Path:
return path
-def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str]:
+def _import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str]:
if not namespace:
namespace = os.path.basename(path)
@@ -61,8 +62,10 @@ def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str
return namespace
except Exception as err:
- log(f'Error: {err}', level=logging.ERROR)
- log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR)
+ error(
+ f'Error: {err}',
+ f"The above error was detected when loading the plugin: {path}"
+ )
try:
del sys.modules[namespace]
@@ -72,7 +75,7 @@ def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str
return namespace
-def find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]:
+def _find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]:
indices = [idx for idx, elem in enumerate(haystack) if elem == needle]
if n <= len(indices):
return indices[n - 1]
@@ -82,34 +85,36 @@ def find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]:
def load_plugin(path: Path):
namespace: Optional[str] = None
parsed_url = urllib.parse.urlparse(str(path))
- log(f"Loading plugin from url {parsed_url}.", level=logging.INFO)
+ info(f"Loading plugin from url {parsed_url}")
# The Profile was not a direct match on a remote URL
if not parsed_url.scheme:
# Path was not found in any known examples, check if it's an absolute path
if os.path.isfile(path):
- namespace = import_via_path(path)
+ namespace = _import_via_path(path)
elif parsed_url.scheme in ('https', 'http'):
- localized = localize_path(path)
- namespace = import_via_path(localized)
+ localized = _localize_path(path)
+ namespace = _import_via_path(localized)
if namespace and namespace in sys.modules:
# Version dependency via __archinstall__version__ variable (if present) in the plugin
# Any errors in version inconsistency will be handled through normal error handling if not defined.
if hasattr(sys.modules[namespace], '__archinstall__version__'):
- archinstall_major_and_minor_version = float(storage['__version__'][:find_nth(storage['__version__'], '.', 2)])
+ archinstall_major_and_minor_version = float(storage['__version__'][:_find_nth(storage['__version__'], '.', 2)])
if sys.modules[namespace].__archinstall__version__ < archinstall_major_and_minor_version:
- log(f"Plugin {sys.modules[namespace]} does not support the current Archinstall version.", fg="red", level=logging.ERROR)
+ error(f"Plugin {sys.modules[namespace]} does not support the current Archinstall version.")
# Locate the plugin entry-point called Plugin()
# This in accordance with the entry_points() from setup.cfg above
if hasattr(sys.modules[namespace], 'Plugin'):
try:
plugins[namespace] = sys.modules[namespace].Plugin()
- log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO)
+ info(f"Plugin {plugins[namespace]} has been loaded.")
except Exception as err:
- log(f'Error: {err}', level=logging.ERROR)
- log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR)
+ error(
+ f'Error: {err}',
+ f"The above error was detected when initiating the plugin: {path}"
+ )
else:
- log(f"Plugin '{path}' is missing a valid entry-point or is corrupt.", fg="yellow", level=logging.WARNING)
+ warn(f"Plugin '{path}' is missing a valid entry-point or is corrupt.")
diff --git a/archinstall/lib/profile/profile_menu.py b/archinstall/lib/profile/profile_menu.py
index 6462685a..213466a6 100644
--- a/archinstall/lib/profile/profile_menu.py
+++ b/archinstall/lib/profile/profile_menu.py
@@ -6,7 +6,7 @@ from archinstall.default_profiles.profile import Profile, GreeterType
from .profile_model import ProfileConfiguration
from ..hardware import AVAILABLE_GFX_DRIVERS
from ..menu import Menu, MenuSelectionType, AbstractSubMenu, Selector
-from ..user_interaction.system_conf import select_driver
+from ..interactions.system_conf import select_driver
if TYPE_CHECKING:
_: Any
diff --git a/archinstall/lib/profile/profiles_handler.py b/archinstall/lib/profile/profiles_handler.py
index 6ed95f8e..16fef251 100644
--- a/archinstall/lib/profile/profiles_handler.py
+++ b/archinstall/lib/profile/profiles_handler.py
@@ -1,7 +1,6 @@
from __future__ import annotations
import importlib.util
-import logging
import sys
from collections import Counter
from functools import cached_property
@@ -15,7 +14,7 @@ from .profile_model import ProfileConfiguration
from ..hardware import AVAILABLE_GFX_DRIVERS
from ..menu import MenuSelectionType, Menu, MenuSelection
from ..networking import list_interfaces, fetch_data_from_url
-from ..output import log
+from ..output import error, debug, info, warn
from ..storage import storage
if TYPE_CHECKING:
@@ -106,7 +105,7 @@ class ProfileHandler:
invalid = ', '.join([k for k, v in resolved.items() if v is None])
if invalid:
- log(f'No profile definition found: {invalid}')
+ info(f'No profile definition found: {invalid}')
custom_settings = profile_config.get('custom_settings', {})
for profile in valid:
@@ -216,7 +215,7 @@ class ProfileHandler:
install_session.add_additional_packages(additional_pkg)
except Exception as err:
- log(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}", level=logging.WARNING, fg="yellow")
+ warn(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}")
# Prep didn't run, so there's no driver to install
install_session.add_additional_packages(['xorg-server', 'xorg-xinit'])
@@ -250,7 +249,7 @@ class ProfileHandler:
self.add_custom_profiles(profiles)
except ValueError:
err = str(_('Unable to fetch profile from specified url: {}')).format(url)
- log(err, level=logging.ERROR, fg="red")
+ error(err)
def _load_profile_class(self, module: ModuleType) -> List[Profile]:
"""
@@ -264,7 +263,7 @@ class ProfileHandler:
if isinstance(cls_, Profile):
profiles.append(cls_)
except Exception:
- log(f'Cannot import {module}, it does not appear to be a Profile class', level=logging.DEBUG)
+ debug(f'Cannot import {module}, it does not appear to be a Profile class')
return profiles
@@ -278,7 +277,7 @@ class ProfileHandler:
if len(duplicates) > 0:
err = str(_('Profiles must have unique name, but profile definitions with duplicate name found: {}')).format(duplicates[0][0])
- log(err, level=logging.ERROR, fg="red")
+ error(err)
sys.exit(1)
def _is_legacy(self, file: Path) -> bool:
@@ -297,15 +296,15 @@ class ProfileHandler:
Process a file for profile definitions
"""
if self._is_legacy(file):
- log(f'Cannot import {file} because it is no longer supported, please use the new profile format')
+ info(f'Cannot import {file} because it is no longer supported, please use the new profile format')
return []
if not file.is_file():
- log(f'Cannot find profile file {file}')
+ info(f'Cannot find profile file {file}')
return []
name = file.name.removesuffix(file.suffix)
- log(f'Importing profile: {file}', level=logging.DEBUG)
+ debug(f'Importing profile: {file}')
try:
spec = importlib.util.spec_from_file_location(name, file)
@@ -315,7 +314,7 @@ class ProfileHandler:
spec.loader.exec_module(imported)
return self._load_profile_class(imported)
except Exception as e:
- log(f'Unable to parse file {file}: {e}', level=logging.ERROR)
+ error(f'Unable to parse file {file}: {e}')
return []
diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py
deleted file mode 100644
index b177052b..00000000
--- a/archinstall/lib/services.py
+++ /dev/null
@@ -1,11 +0,0 @@
-import os
-from .general import SysCommand
-
-
-def service_state(service_name: str) -> str:
- if os.path.splitext(service_name)[1] != '.service':
- service_name += '.service' # Just to be safe
-
- state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'}))
-
- return state.strip().decode('UTF-8')
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index 5a54d816..2f256e5d 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -11,8 +11,8 @@ from pathlib import Path
storage: Dict[str, Any] = {
'PROFILE': Path(__file__).parent.parent.joinpath('default_profiles'),
- 'LOG_PATH': '/var/log/archinstall',
- 'LOG_FILE': 'install.log',
+ 'LOG_PATH': Path('/var/log/archinstall'),
+ 'LOG_FILE': Path('install.log'),
'MOUNT_POINT': Path('/mnt/archinstall'),
'ENC_IDENTIFIER': 'ainst',
'DISK_TIMEOUTS' : 1, # seconds
diff --git a/archinstall/lib/translationhandler.py b/archinstall/lib/translationhandler.py
index 0d74f974..5f0f0695 100644
--- a/archinstall/lib/translationhandler.py
+++ b/archinstall/lib/translationhandler.py
@@ -1,14 +1,14 @@
from __future__ import annotations
import json
-import logging
import os
import gettext
from dataclasses import dataclass
from pathlib import Path
from typing import List, Dict, Any, TYPE_CHECKING, Optional
-from .exceptions import TranslationError
+
+from .output import error, debug
if TYPE_CHECKING:
_: Any
@@ -80,8 +80,8 @@ class TranslationHandler:
language = Language(abbr, lang, translation, percent, translated_lang)
languages.append(language)
- except FileNotFoundError as error:
- raise TranslationError(f"Could not locate language file for '{lang}': {error}")
+ except FileNotFoundError as err:
+ raise FileNotFoundError(f"Could not locate language file for '{lang}': {err}")
return languages
@@ -89,12 +89,12 @@ class TranslationHandler:
"""
Set the provided font as the new terminal font
"""
- from .general import SysCommand, log
+ from .general import SysCommand
try:
- log(f'Setting font: {font}', level=logging.DEBUG)
+ debug(f'Setting font: {font}')
SysCommand(f'setfont {font}')
except Exception:
- log(f'Unable to set font {font}', level=logging.ERROR)
+ error(f'Unable to set font {font}')
def _load_language_mappings(self) -> List[Dict[str, Any]]:
"""
diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py
deleted file mode 100644
index 5ee89de0..00000000
--- a/archinstall/lib/user_interaction/__init__.py
+++ /dev/null
@@ -1,10 +0,0 @@
-from .manage_users_conf import ask_for_additional_users
-from .locale_conf import select_locale_lang, select_locale_enc
-from .system_conf import select_kernel, select_driver, ask_for_bootloader, ask_for_swap
-from .network_conf import ask_to_configure_network
-from .general_conf import (
- ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions,
- select_archinstall_language, ask_additional_packages_to_install,
- select_additional_repositories, ask_hostname, add_number_of_parrallel_downloads
-)
-from .utils import get_password
diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py
deleted file mode 100644
index e05b9afe..00000000
--- a/archinstall/lib/user_interaction/save_conf.py
+++ /dev/null
@@ -1,113 +0,0 @@
-from __future__ import annotations
-
-import logging
-
-from pathlib import Path
-from typing import Any, Dict, TYPE_CHECKING
-
-from ..general import SysCommand
-from ..menu import Menu
-from ..menu.menu import MenuSelectionType
-from ..output import log
-from ..configuration import ConfigurationOutput
-
-if TYPE_CHECKING:
- _: Any
-
-
-def save_config(config: Dict):
- def preview(selection: str):
- if options['user_config'] == selection:
- serialized = config_output.user_config_to_json()
- return f'{config_output.user_configuration_file}\n{serialized}'
- elif options['user_creds'] == selection:
- if maybe_serial := config_output.user_credentials_to_json():
- return f'{config_output.user_credentials_file}\n{maybe_serial}'
- else:
- return str(_('No configuration'))
- elif options['all'] == selection:
- output = f'{config_output.user_configuration_file}\n'
- if config_output.user_credentials_to_json():
- output += f'{config_output.user_credentials_file}\n'
- return output[:-1]
- return None
-
- config_output = ConfigurationOutput(config)
-
- options = {
- 'user_config': str(_('Save user configuration')),
- 'user_creds': str(_('Save user credentials')),
- 'disk_layout': str(_('Save disk layout')),
- 'all': str(_('Save all'))
- }
-
- choice = Menu(
- _('Choose which configuration to save'),
- list(options.values()),
- sort=False,
- skip=True,
- preview_size=0.75,
- preview_command=preview
- ).run()
-
- if choice.type_ == MenuSelectionType.Skip:
- return
-
- save_config_value = choice.single_value
- saving_key = [k for k, v in options.items() if v == save_config_value][0]
-
- dirs_to_exclude = [
- '/bin',
- '/dev',
- '/lib',
- '/lib64',
- '/lost+found',
- '/opt',
- '/proc',
- '/run',
- '/sbin',
- '/srv',
- '/sys',
- '/usr',
- '/var',
- ]
-
- log('Ignore configuration option folders: ' + ','.join(dirs_to_exclude), level=logging.DEBUG)
- log(_('Finding possible directories to save configuration files ...'), level=logging.INFO)
-
- find_exclude = '-path ' + ' -prune -o -path '.join(dirs_to_exclude) + ' -prune '
- file_picker_command = f'find / {find_exclude} -o -type d -print0'
-
- directories = SysCommand(file_picker_command).decode()
-
- if directories is None:
- raise ValueError('Failed to retrieve possible configuration directories')
-
- possible_save_dirs = list(filter(None, directories.split('\x00')))
-
- selection = Menu(
- _('Select directory (or directories) for saving configuration files'),
- possible_save_dirs,
- multi=True,
- skip=True,
- allow_reset=False,
- ).run()
-
- match selection.type_:
- case MenuSelectionType.Skip:
- return
-
- save_dirs = selection.multi_value
-
- log(f'Saving {saving_key} configuration files to {save_dirs}', level=logging.DEBUG)
-
- if save_dirs is not None:
- for save_dir_str in save_dirs:
- save_dir = Path(save_dir_str)
- if options['user_config'] == save_config_value:
- config_output.save_user_config(save_dir)
- elif options['user_creds'] == save_config_value:
- config_output.save_user_creds(save_dir)
- elif options['all'] == save_config_value:
- config_output.save_user_config(save_dir)
- config_output.save_user_creds(save_dir)
diff --git a/archinstall/lib/utils/util.py b/archinstall/lib/utils/util.py
index ded480ae..34716f4a 100644
--- a/archinstall/lib/utils/util.py
+++ b/archinstall/lib/utils/util.py
@@ -1,7 +1,7 @@
from pathlib import Path
from typing import Any, TYPE_CHECKING, Optional
-from ..output import log
+from ..output import info
if TYPE_CHECKING:
_: Any
@@ -16,7 +16,7 @@ def prompt_dir(text: str, header: Optional[str] = None) -> Path:
dest_path = Path(path)
if dest_path.exists() and dest_path.is_dir():
return dest_path
- log(_('Not a valid directory: {}').format(dest_path), fg='red')
+ info(_('Not a valid directory: {}').format(dest_path))
def is_subpath(first: Path, second: Path):