Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk.py80
-rw-r--r--archinstall/lib/exceptions.py2
-rw-r--r--archinstall/lib/general.py10
-rw-r--r--archinstall/lib/hardware.py54
-rw-r--r--archinstall/lib/installer.py300
-rw-r--r--archinstall/lib/locale_helpers.py14
-rw-r--r--archinstall/lib/luks.py11
-rw-r--r--archinstall/lib/mirrors.py9
-rw-r--r--archinstall/lib/networking.py4
-rw-r--r--archinstall/lib/output.py6
-rw-r--r--archinstall/lib/profiles.py65
-rw-r--r--archinstall/lib/services.py2
-rw-r--r--archinstall/lib/storage.py2
-rw-r--r--archinstall/lib/user_interaction.py153
14 files changed, 493 insertions, 219 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index 4ea9f214..67c2bdcd 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -25,11 +25,12 @@ class BlockDevice():
self.path = path
self.info = info
+ self.keep_partitions = True
self.part_cache = OrderedDict()
- # TODO: Currently disk encryption is a BIT missleading.
+ # TODO: Currently disk encryption is a BIT misleading.
# It's actually partition-encryption, but for future-proofing this
# I'm placing the encryption password on a BlockDevice level.
- self.encryption_passwoed = None
+ self.encryption_password = None
def __repr__(self, *args, **kwargs):
return f"BlockDevice({self.device})"
@@ -126,6 +127,18 @@ class BlockDevice():
def partition_table_type(self):
return GPT
+ @property
+ def uuid(self):
+ log(f'BlockDevice().uuid is untested!', level=LOG_LEVELS.Warning, fg='yellow')
+ """
+ Returns the disk UUID as returned by lsblk.
+ This is more reliable than relying on /dev/disk/by-partuuid as
+ it doesn't seam to be able to detect md raid partitions.
+ """
+ lsblk = b''.join(sys_command(f'lsblk -J -o+UUID {self.path}'))
+ for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
+ return partition.get('uuid', None)
+
def has_partitions(self):
return len(self.partitions)
@@ -166,7 +179,7 @@ class Partition():
self.mountpoint = target
if not self.filesystem and autodetect_filesystem:
- if (fstype := mount_information.get('fstype', get_filesystem_type(self.real_device))):
+ if (fstype := mount_information.get('fstype', get_filesystem_type(path))):
self.filesystem = fstype
if self.filesystem == 'crypto_LUKS':
@@ -187,9 +200,9 @@ class Partition():
mount_repr = f", rel_mountpoint={self.target_mountpoint}"
if self._encrypted:
- return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
+ return f'Partition(path={self.path}, size={self.size}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
else:
- return f'Partition(path={self.path}, fs={self.filesystem}{mount_repr})'
+ return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})'
@property
def uuid(self) -> str:
@@ -215,13 +228,14 @@ class Partition():
self._encrypted = value
@property
+ def parent(self):
+ return self.real_device
+
+ @property
def real_device(self):
- if not self._encrypted:
- return self.path
- else:
- for blockdevice in json.loads(b''.join(sys_command(['lsblk', '-J'])).decode('UTF-8'))['blockdevices']:
- if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
- return f"/dev/{parent}"
+ for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
+ if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
+ return f"/dev/{parent}"
# raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path
@@ -285,10 +299,10 @@ class Partition():
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem=None, path=None, allow_formatting=None, log_formating=True):
+ def format(self, filesystem=None, path=None, allow_formatting=None, log_formatting=True):
"""
Format can be given an overriding path, for instance /dev/null to test
- the formating functionality and in essence the support for the given filesystem.
+ the formatting functionality and in essence the support for the given filesystem.
"""
if filesystem is None:
filesystem = self.filesystem
@@ -306,7 +320,7 @@ class Partition():
if not allow_formatting:
raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
- if log_formating:
+ if log_formatting:
log(f'Formatting {path} -> {filesystem}', level=LOG_LEVELS.Info)
if filesystem == 'btrfs':
@@ -366,14 +380,16 @@ class Partition():
if not fs:
if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
- ## libc has some issues with loop devices, defaulting back to sys calls
- # ret = libc.mount(self.path.encode(), target.encode(), fs.encode(), 0, options.encode())
- # if ret < 0:
- # errno = ctypes.get_errno()
- # raise OSError(errno, f"Error mounting {self.path} ({fs}) on {target} with options '{options}': {os.strerror(errno)}")
- if sys_command(f'/usr/bin/mount {self.path} {target}').exit_code == 0:
- self.mountpoint = target
- return True
+
+ pathlib.Path(target).mkdir(parents=True, exist_ok=True)
+
+ try:
+ sys_command(f'/usr/bin/mount {self.path} {target}')
+ except SysCallError as err:
+ raise err
+
+ self.mountpoint = target
+ return True
def unmount(self):
try:
@@ -401,7 +417,7 @@ class Partition():
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
"""
try:
- self.format(self.filesystem, '/dev/null', log_formating=False, allow_formatting=True)
+ self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
except SysCallError:
pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err:
@@ -588,6 +604,24 @@ def get_mount_info(path):
return output['filesystems'][0]
+def get_partitions_in_use(mountpoint):
+ try:
+ output = b''.join(sys_command(f'/usr/bin/findmnt --json -R {mountpoint}'))
+ except SysCallError:
+ return {}
+
+ mounts = []
+
+ output = output.decode('UTF-8')
+ output = json.loads(output)
+ for target in output.get('filesystems', []):
+ mounts.append(Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target']))
+
+ for child in target.get('children', []):
+ mounts.append(Partition(child['source'], None, filesystem=child.get('fstype', None), mountpoint=child['target']))
+
+ return mounts
+
def get_filesystem_type(path):
try:
handle = sys_command(f"blkid -o value -s TYPE {path}")
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index a320eef6..49913980 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -18,4 +18,6 @@ class HardwareIncompatibilityError(BaseException):
class PermissionError(BaseException):
pass
class UserError(BaseException):
+ pass
+class ServiceException(BaseException):
pass \ No newline at end of file
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 5b1b3c2a..dc0f018a 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -76,7 +76,7 @@ class sys_command():#Thread):
"""
Stolen from archinstall_gui
"""
- def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, *args, **kwargs):
+ def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars={}, *args, **kwargs):
kwargs.setdefault("worker_id", gen_uid())
kwargs.setdefault("emulate", False)
kwargs.setdefault("suppress_errors", False)
@@ -102,6 +102,7 @@ class sys_command():#Thread):
self.args = args
self.kwargs = kwargs
self.peak_output = peak_output
+ self.environment_vars = environment_vars
self.kwargs.setdefault("worker", None)
self.callback = callback
@@ -200,7 +201,7 @@ class sys_command():#Thread):
# Replace child process with our main process
if not self.kwargs['emulate']:
try:
- os.execv(self.cmd[0], self.cmd)
+ os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
except FileNotFoundError:
self.status = 'done'
self.log(f"{self.cmd[0]} does not exist.", level=LOG_LEVELS.Debug)
@@ -304,6 +305,11 @@ class sys_command():#Thread):
with open(f'{self.cwd}/trace.log', 'wb') as fh:
fh.write(self.trace_log)
+ try:
+ os.close(child_fd)
+ except:
+ pass
+
def prerequisite_check():
if not os.path.isdir("/sys/firmware/efi"):
diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py
index 10f3970f..d6cf982c 100644
--- a/archinstall/lib/hardware.py
+++ b/archinstall/lib/hardware.py
@@ -1,19 +1,42 @@
-import os, subprocess
+import os, subprocess, json
from .general import sys_command
from .networking import list_interfaces, enrichIfaceTypes
+from typing import Optional
-def hasWifi():
+AVAILABLE_GFX_DRIVERS = {
+ # Sub-dicts are layer-2 options to be selected
+ # and lists are a list of packages to be installed
+ 'AMD / ATI' : {
+ 'amd' : ['xf86-video-amdgpu'],
+ 'ati' : ['xf86-video-ati']
+ },
+ 'intel' : ['xf86-video-intel'],
+ 'nvidia' : {
+ 'open-source' : ['xf86-video-nouveau'],
+ 'proprietary' : ['nvidia']
+ },
+ 'mesa' : ['mesa'],
+ 'fbdev' : ['xf86-video-fbdev'],
+ 'vesa' : ['xf86-video-vesa'],
+ 'vmware' : ['xf86-video-vmware']
+}
+
+def hasWifi()->bool:
return 'WIRELESS' in enrichIfaceTypes(list_interfaces().values()).values()
-def hasAMDCPU():
+def hasAMDCPU()->bool:
if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode():
return True
return False
+def hasIntelCPU()->bool:
+ if subprocess.check_output("lscpu | grep Intel", shell=True).strip().decode():
+ return True
+ return False
-def hasUEFI():
+def hasUEFI()->bool:
return os.path.isdir('/sys/firmware/efi')
-def graphicsDevices():
+def graphicsDevices()->dict:
cards = {}
for line in sys_command(f"lspci"):
if b' VGA ' in line:
@@ -21,13 +44,28 @@ def graphicsDevices():
cards[identifier.strip().lower().decode('UTF-8')] = line
return cards
-def hasNvidiaGraphics():
+def hasNvidiaGraphics()->bool:
return any('nvidia' in x for x in graphicsDevices())
-def hasAmdGraphics():
+def hasAmdGraphics()->bool:
return any('amd' in x for x in graphicsDevices())
-def hasIntelGraphics():
+def hasIntelGraphics()->bool:
return any('intel' in x for x in graphicsDevices())
+
+def cpuVendor()-> Optional[str]:
+ cpu_info = json.loads(subprocess.check_output("lscpu -J", shell=True).decode('utf-8'))['lscpu']
+ for info in cpu_info:
+ if info.get('field',None):
+ if info.get('field',None) == "Vendor ID:":
+ return info.get('data',None)
+
+def isVM() -> bool:
+ try:
+ subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a none 0 exit code if it is not on a virtual machine
+ return True
+ except:
+ return False
+
# TODO: Add more identifiers
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 2122ebd9..2f90560f 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -9,6 +9,7 @@ from .mirrors import *
from .systemd import Networkd
from .output import log, LOG_LEVELS
from .storage import storage
+from .hardware import *
# Any package that the Installer() is responsible for (optional and the default ones)
__packages__ = ["base", "base-devel", "linux", "linux-firmware", "efibootmgr", "nano", "ntp", "iwd"]
@@ -38,30 +39,21 @@ class Installer():
:type hostname: str, optional
"""
- def __init__(self, partition, boot_partition, *, base_packages=__base_packages__, profile=None, mountpoint='/mnt', hostname='ArchInstalled', logdir=None, logfile=None):
- self.profile = profile
- self.hostname = hostname
- self.mountpoint = mountpoint
+ def __init__(self, target, *, base_packages='base base-devel linux linux-firmware efibootmgr'):
+ self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
- if logdir:
- storage['LOG_PATH'] = logdir
- if logfile:
- storage['LOG_FILE'] = logfile
-
self.helper_flags = {
- 'bootloader' : False,
'base' : False,
- 'user' : False # Root counts as a user, if additional users are skipped.
+ 'bootloader' : False
}
self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
self.post_base_install = []
- storage['session'] = self
- self.partition = partition
- self.boot_partition = boot_partition
+ storage['session'] = self
+ self.partitions = get_partitions_in_use(self.target)
def log(self, *args, level=LOG_LEVELS.Debug, **kwargs):
"""
@@ -71,15 +63,10 @@ class Installer():
log(*args, level=level, **kwargs)
def __enter__(self, *args, **kwargs):
- if hasUEFI():
- # on bios we don't have a boot partition
- self.partition.mount(self.mountpoint)
- os.makedirs(f'{self.mountpoint}/boot', exist_ok=True)
- self.boot_partition.mount(f'{self.mountpoint}/boot')
return self
def __exit__(self, *args, **kwargs):
- # b''.join(sys_command(f'sync')) # No need to, since the underlaying fs() object will call sync.
+ # b''.join(sys_command(f'sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
@@ -91,7 +78,7 @@ class Installer():
# We avoid printing /mnt/<log path> because that might confuse people if they note it down
# and then reboot, and a identical log file will be found in the ISO medium anyway.
print(f"[!] A log file has been created here: {os.path.join(storage['LOG_PATH'], storage['LOG_FILE'])}")
- print(f" Please submit this issue (and file) to https://github.com/Torxed/archinstall/issues")
+ print(f" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")
raise args[1]
self.genfstab()
@@ -99,13 +86,16 @@ class Installer():
if not (missing_steps := self.post_install_check()):
self.log('Installation completed without any errors. You may now reboot.', bg='black', fg='green', level=LOG_LEVELS.Info)
self.sync_log_to_install_medium()
+
return True
else:
self.log('Some required steps were not successfully installed/configured before leaving the installer:', bg='black', fg='red', level=LOG_LEVELS.Warning)
for step in missing_steps:
self.log(f' - {step}', bg='black', fg='red', level=LOG_LEVELS.Warning)
- self.log(f"Detailed error logs can be found at: {log_path}", level=LOG_LEVELS.Warning)
- self.log(f"Submit this zip file as an issue to https://github.com/Torxed/archinstall/issues", level=LOG_LEVELS.Warning)
+
+ self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=LOG_LEVELS.Warning)
+ self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=LOG_LEVELS.Warning)
+
self.sync_log_to_install_medium()
return False
@@ -116,18 +106,18 @@ class Installer():
if (filename := storage.get('LOG_FILE', None)):
absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
- if not os.path.isdir(f"{self.mountpoint}/{os.path.dirname(absolute_logfile)}"):
- os.makedirs(f"{self.mountpoint}/{os.path.dirname(absolute_logfile)}")
-
- shutil.copy2(absolute_logfile, f"{self.mountpoint}/{absolute_logfile}")
+ if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
+ os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
+
+ shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
return True
def mount(self, partition, mountpoint, create_mountpoint=True):
- if create_mountpoint and not os.path.isdir(f'{self.mountpoint}{mountpoint}'):
- os.makedirs(f'{self.mountpoint}{mountpoint}')
-
- partition.mount(f'{self.mountpoint}{mountpoint}')
+ if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
+ os.makedirs(f'{self.target}{mountpoint}')
+
+ partition.mount(f'{self.target}{mountpoint}')
def post_install_check(self, *args, **kwargs):
return [step for step, flag in self.helper_flags.items() if flag is False]
@@ -137,7 +127,7 @@ class Installer():
self.log(f'Installing packages: {packages}', level=LOG_LEVELS.Info)
if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0:
- if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.mountpoint} {" ".join(packages)}', peak_output=True, **kwargs)).exit_code == 0:
+ if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0:
return True
else:
self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=LOG_LEVELS.Info)
@@ -145,42 +135,41 @@ class Installer():
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=LOG_LEVELS.Info)
def set_mirrors(self, mirrors):
- return use_mirrors(mirrors, destination=f'{self.mountpoint}/etc/pacman.d/mirrorlist')
+ return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
def genfstab(self, flags='-pU'):
- self.log(f"Updating {self.mountpoint}/etc/fstab", level=LOG_LEVELS.Info)
-
- fstab = sys_command(f'/usr/bin/genfstab {flags} {self.mountpoint}').trace_log
- with open(f"{self.mountpoint}/etc/fstab", 'ab') as fstab_fh:
+ self.log(f"Updating {self.target}/etc/fstab", level=LOG_LEVELS.Info)
+
+ fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log
+ with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
fstab_fh.write(fstab)
- if not os.path.isfile(f'{self.mountpoint}/etc/fstab'):
- raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{o}')
+ if not os.path.isfile(f'{self.target}/etc/fstab'):
+ raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{fstab}')
return True
- def set_hostname(self, hostname=None, *args, **kwargs):
- if not hostname: hostname = self.hostname
- with open(f'{self.mountpoint}/etc/hostname', 'w') as fh:
- fh.write(self.hostname + '\n')
+ def set_hostname(self, hostname :str, *args, **kwargs):
+ with open(f'{self.target}/etc/hostname', 'w') as fh:
+ fh.write(hostname + '\n')
def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
if not len(locale): return True
- with open(f'{self.mountpoint}/etc/locale.gen', 'a') as fh:
+ with open(f'{self.target}/etc/locale.gen', 'a') as fh:
fh.write(f'{locale}.{encoding} {encoding}\n')
- with open(f'{self.mountpoint}/etc/locale.conf', 'w') as fh:
+ with open(f'{self.target}/etc/locale.conf', 'w') as fh:
fh.write(f'LANG={locale}.{encoding}\n')
- return True if sys_command(f'/usr/bin/arch-chroot {self.mountpoint} locale-gen').exit_code == 0 else False
+ return True if sys_command(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
def set_timezone(self, zone, *args, **kwargs):
if not zone: return True
if not len(zone): return True # Redundant
if (pathlib.Path("/usr")/"share"/"zoneinfo"/zone).exists():
- (pathlib.Path(self.mountpoint)/"etc"/"localtime").unlink(missing_ok=True)
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
+ (pathlib.Path(self.target)/"etc"/"localtime").unlink(missing_ok=True)
+ sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
else:
self.log(
@@ -195,16 +184,21 @@ class Installer():
if self.enable_service('ntpd'):
return True
- def enable_service(self, service):
- self.log(f'Enabling service {service}', level=LOG_LEVELS.Info)
- return self.arch_chroot(f'systemctl enable {service}').exit_code == 0
+ def enable_service(self, *services):
+ for service in services:
+ self.log(f'Enabling service {service}', level=LOG_LEVELS.Info)
+ if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
+ raise ServiceException(f"Unable to start service {service}: {output}")
def run_command(self, cmd, *args, **kwargs):
- return sys_command(f'/usr/bin/arch-chroot {self.mountpoint} {cmd}')
+ return sys_command(f'/usr/bin/arch-chroot {self.target} {cmd}')
def arch_chroot(self, cmd, *args, **kwargs):
return self.run_command(cmd)
+ def drop_to_shell(self):
+ subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
+
def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
if dhcp:
conf = Networkd(Match={"Name": nic}, Network={"DHCP": "yes"})
@@ -219,16 +213,16 @@ class Installer():
network["DNS"] = dns
conf = Networkd(Match={"Name": nic}, Network=network)
-
- with open(f"{self.mountpoint}/etc/systemd/network/10-{nic}.network", "a") as netconf:
+
+ with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
def copy_ISO_network_config(self, enable_services=False):
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
if (psk_files := glob.glob('/var/lib/iwd/*.psk')):
- if not os.path.isdir(f"{self.mountpoint}/var/lib/iwd"):
- os.makedirs(f"{self.mountpoint}/var/lib/iwd")
+ if not os.path.isdir(f"{self.target}/var/lib/iwd"):
+ os.makedirs(f"{self.target}/var/lib/iwd")
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
@@ -248,81 +242,103 @@ class Installer():
self.enable_service('iwd')
for psk in psk_files:
- shutil.copy2(psk, f"{self.mountpoint}/var/lib/iwd/{os.path.basename(psk)}")
+ shutil.copy2(psk, f"{self.target}/var/lib/iwd/{os.path.basename(psk)}")
# Copy (if any) systemd-networkd config files
if (netconfigurations := glob.glob('/etc/systemd/network/*')):
- if not os.path.isdir(f"{self.mountpoint}/etc/systemd/network/"):
- os.makedirs(f"{self.mountpoint}/etc/systemd/network/")
+ if not os.path.isdir(f"{self.target}/etc/systemd/network/"):
+ os.makedirs(f"{self.target}/etc/systemd/network/")
for netconf_file in netconfigurations:
- shutil.copy2(netconf_file, f"{self.mountpoint}/etc/systemd/network/{os.path.basename(netconf_file)}")
+ shutil.copy2(netconf_file, f"{self.target}/etc/systemd/network/{os.path.basename(netconf_file)}")
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
def post_install_enable_networkd_resolved(*args, **kwargs):
- self.enable_service('systemd-networkd')
- self.enable_service('systemd-resolved')
-
+ self.enable_service('systemd-networkd', 'systemd-resolved')
self.post_base_install.append(post_install_enable_networkd_resolved)
# Otherwise, we can go ahead and enable the services
else:
- self.enable_service('systemd-networkd')
- self.enable_service('systemd-resolved')
+ self.enable_service('systemd-networkd', 'systemd-resolved')
+
return True
+ def detect_encryption(self, partition):
+ if partition.encrypted:
+ return partition
+ elif partition.parent not in partition.path and Partition(partition.parent, None, autodetect_filesystem=True).filesystem == 'crypto_LUKS':
+ return Partition(partition.parent, None, autodetect_filesystem=True)
+
+ return False
+
def minimal_installation(self):
- ## Add nessecary packages if encrypting the drive
+ ## Add necessary packages if encrypting the drive
## (encrypted partitions default to btrfs for now, so we need btrfs-progs)
## TODO: Perhaps this should be living in the function which dictates
## the partitioning. Leaving here for now.
- if self.partition.filesystem == 'btrfs':
- #if self.partition.encrypted:
- self.base_packages.append('btrfs-progs')
- if self.partition.filesystem == 'xfs':
- self.base_packages.append('xfsprogs')
- if self.partition.filesystem == 'f2fs':
- self.base_packages.append('f2fs-tools')
- if not(hasUEFI()):
+
+ MODULES = []
+ BINARIES = []
+ FILES = []
+ HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
+
+ for partition in self.partitions:
+ if partition.filesystem == 'btrfs':
+ #if partition.encrypted:
+ self.base_packages.append('btrfs-progs')
+ if partition.filesystem == 'xfs':
+ self.base_packages.append('xfsprogs')
+ if partition.filesystem == 'f2fs':
+ self.base_packages.append('f2fs-tools')
+
+ # Configure mkinitcpio to handle some specific use cases.
+ if partition.filesystem == 'btrfs':
+ if 'btrfs' not in MODULES:
+ MODULES.append('btrfs')
+ if '/usr/bin/btrfs-progs' not in BINARIES:
+ BINARIES.append('/usr/bin/btrfs')
+
+ if self.detect_encryption(partition):
+ if 'encrypt' not in HOOKS:
+ HOOKS.insert(HOOKS.index('filesystems'), 'encrypt')
+
+ if not(hasUEFI()): # TODO: Allow for grub even on EFI
self.base_packages.append('grub')
+
self.pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True
#self.genfstab()
-
- with open(f"{self.mountpoint}/etc/fstab", "a") as fstab:
+ if not isVM():
+ vendor = cpuVendor()
+ if vendor == "AuthenticAMD":
+ self.base_packages.append("amd-ucode")
+ elif vendor == "GenuineIntel":
+ self.base_packages.append("intel-ucode")
+ else:
+ self.log("Unknown cpu vendor not installing ucode")
+ with open(f"{self.target}/etc/fstab", "a") as fstab:
fstab.write(
"\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n"
) # Redundant \n at the start? who knows?
## TODO: Support locale and timezone
- #os.remove(f'{self.mountpoint}/etc/localtime')
- #sys_command(f'/usr/bin/arch-chroot {self.mountpoint} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
+ #os.remove(f'{self.target}/etc/localtime')
+ #sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
#sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime')
- self.set_hostname()
+ self.set_hostname('archinstall')
self.set_locale('en_US')
# TODO: Use python functions for this
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} chmod 700 /root')
-
- # Configure mkinitcpio to handle some specific use cases.
- # TODO: Yes, we should not overwrite the entire thing, but for now this should be fine
- # since we just installed the base system.
- if self.partition.filesystem == 'btrfs':
- with open(f'{self.mountpoint}/etc/mkinitcpio.conf', 'w') as mkinit:
- mkinit.write('MODULES=(btrfs)\n')
- mkinit.write('BINARIES=(/usr/bin/btrfs)\n')
- mkinit.write('FILES=()\n')
- mkinit.write('HOOKS=(base udev autodetect modconf block encrypt filesystems keymap keyboard fsck)\n')
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} mkinitcpio -p linux')
- elif self.partition.encrypted:
- with open(f'{self.mountpoint}/etc/mkinitcpio.conf', 'w') as mkinit:
- mkinit.write('MODULES=()\n')
- mkinit.write('BINARIES=()\n')
- mkinit.write('FILES=()\n')
- mkinit.write('HOOKS=(base udev autodetect modconf block encrypt filesystems keymap keyboard fsck)\n')
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} mkinitcpio -p linux')
+ sys_command(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
+
+ with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
+ mkinit.write(f"MODULES=({' '.join(MODULES)})\n")
+ mkinit.write(f"BINARIES=({' '.join(BINARIES)})\n")
+ mkinit.write(f"FILES=({' '.join(FILES)})\n")
+ mkinit.write(f"HOOKS=({' '.join(HOOKS)})\n")
+ sys_command(f'/usr/bin/arch-chroot {self.target} mkinitcpio -p linux')
self.helper_flags['base'] = True
@@ -334,7 +350,15 @@ class Installer():
return True
def add_bootloader(self, bootloader='systemd-bootctl'):
- self.log(f'Adding bootloader {bootloader} to {self.boot_partition}', level=LOG_LEVELS.Info)
+ boot_partition = None
+ root_partition = None
+ for partition in self.partitions:
+ if partition.mountpoint == self.target+'/boot':
+ boot_partition = partition
+ elif partition.mountpoint == self.target:
+ root_partition = partition
+
+ self.log(f'Adding bootloader {bootloader} to {boot_partition}', level=LOG_LEVELS.Info)
if bootloader == 'systemd-bootctl':
if not hasUEFI():
@@ -344,19 +368,19 @@ class Installer():
# And in which case we should do some clean up.
# Install the boot loader
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} bootctl --no-variables --path=/boot install')
+ sys_command(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
# Modify or create a loader.conf
- if os.path.isfile(f'{self.mountpoint}/boot/loader/loader.conf'):
- with open(f'{self.mountpoint}/boot/loader/loader.conf', 'r') as loader:
+ if os.path.isfile(f'{self.target}/boot/loader/loader.conf'):
+ with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader:
loader_data = loader.read().split('\n')
else:
loader_data = [
f"default {self.init_time}",
f"timeout 5"
]
-
- with open(f'{self.mountpoint}/boot/loader/loader.conf', 'w') as loader:
+
+ with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
for line in loader_data:
if line[:8] == 'default ':
loader.write(f'default {self.init_time}\n')
@@ -366,49 +390,47 @@ class Installer():
## For some reason, blkid and /dev/disk/by-uuid are not getting along well.
## And blkid is wrong in terms of LUKS.
#UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
-
# Setup the loader entry
- with open(f'{self.mountpoint}/boot/loader/entries/{self.init_time}.conf', 'w') as entry:
+ with open(f'{self.target}/boot/loader/entries/{self.init_time}.conf', 'w') as entry:
entry.write(f'# Created by: archinstall\n')
entry.write(f'# Created on: {self.init_time}\n')
entry.write(f'title Arch Linux\n')
entry.write(f'linux /vmlinuz-linux\n')
+ if not isVM():
+ vendor = cpuVendor()
+ if vendor == "AuthenticAMD":
+ entry.write("initrd /amd-ucode.img\n")
+ elif vendor == "GenuineIntel":
+ entry.write("initrd /intel-ucode.img\n")
+ else:
+ self.log("unknow cpu vendor, not adding ucode to systemd-boot config")
entry.write(f'initrd /initramfs-linux.img\n')
## blkid doesn't trigger on loopback devices really well,
## so we'll use the old manual method until we get that sorted out.
- if self.partition.encrypted:
- log(f"Identifying root partition by DISK-UUID on {self.partition}, looking for '{os.path.basename(self.partition.real_device)}'.", level=LOG_LEVELS.Debug)
- for root, folders, uids in os.walk('/dev/disk/by-uuid'):
- for uid in uids:
- real_path = os.path.realpath(os.path.join(root, uid))
-
- log(f"Checking root partition match {os.path.basename(real_path)} against {os.path.basename(self.partition.real_device)}: {os.path.basename(real_path) == os.path.basename(self.partition.real_device)}", level=LOG_LEVELS.Debug)
- if not os.path.basename(real_path) == os.path.basename(self.partition.real_device): continue
-
- entry.write(f'options cryptdevice=UUID={uid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n')
-
- self.helper_flags['bootloader'] = bootloader
- return True
- break
+ if (real_device := self.detect_encryption(root_partition)):
+ # TODO: We need to detect if the encrypted device is a whole disk encryption,
+ # or simply a partition encryption. Right now we assume it's a partition (and we always have)
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=LOG_LEVELS.Debug)
+ entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n')
else:
- log(f"Identifying root partition by PART-UUID on {self.partition}, looking for '{os.path.basename(self.partition.path)}'.", level=LOG_LEVELS.Debug)
- entry.write(f'options root=PARTUUID={self.partition.uuid} rw intel_pstate=no_hwp\n')
+ log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=LOG_LEVELS.Debug)
+ entry.write(f'options root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp\n')
- self.helper_flags['bootloader'] = bootloader
- return True
+ self.helper_flags['bootloader'] = bootloader
+ return True
- raise RequirementError(f"Could not identify the UUID of {self.partition}, there for {self.mountpoint}/boot/loader/entries/arch.conf will be broken until fixed.")
+ raise RequirementError(f"Could not identify the UUID of {self.partition}, there for {self.target}/boot/loader/entries/arch.conf will be broken until fixed.")
elif bootloader == "grub-install":
if hasUEFI():
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.mountpoint} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB'))
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB'))
sys_command('/usr/bin/arch-chroot grub-mkconfig -o /boot/grub/grub.cfg')
else:
- root_device = subprocess.check_output(f'basename "$(readlink -f "/sys/class/block/{self.partition.path.strip("/dev/")}/..")',shell=True).decode().strip()
+ root_device = subprocess.check_output(f'basename "$(readlink -f "/sys/class/block/{root_partition.path.strip("/dev/")}/..")', shell=True).decode().strip()
if root_device == "block":
- root_device = f"{self.partition.path}"
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.mountpoint} grub-install --target=--target=i386-pc /dev/{root_device}'))
+ root_device = f"{root_partition.path}"
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=--target=i386-pc /dev/{root_device}'))
sys_command('/usr/bin/arch-chroot grub-mkconfig -o /boot/grub/grub.cfg')
else:
raise RequirementError(f"Unknown (or not yet implemented) bootloader added to add_bootloader(): {bootloader}")
@@ -421,7 +443,7 @@ class Installer():
# The tricky thing with doing the import archinstall.session instead is that
# profiles might be run from a different chroot, and there's no way we can
# guarantee file-path safety when accessing the installer object that way.
- # Doing the __builtins__ replacement, ensures that the global vriable "installation"
+ # Doing the __builtins__ replacement, ensures that the global variable "installation"
# is always kept up to date. It's considered a nasty hack - but it's a safe way
# of ensuring 100% accuracy of archinstall session variables.
__builtins__['installation'] = self
@@ -434,19 +456,19 @@ class Installer():
def enable_sudo(self, entity :str, group=False):
self.log(f'Enabling sudo permissions for {entity}.', level=LOG_LEVELS.Info)
- with open(f'{self.mountpoint}/etc/sudoers', 'a') as sudoers:
+ with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
def user_create(self, user :str, password=None, groups=[], sudo=False):
self.log(f'Creating user {user}', level=LOG_LEVELS.Info)
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.mountpoint} useradd -m -G wheel {user}'))
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}'))
if password:
self.user_set_pw(user, password)
if groups:
for group in groups:
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.mountpoint} gpasswd -a {user} {group}'))
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}'))
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
@@ -458,12 +480,20 @@ class Installer():
# This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True
- o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.mountpoint} sh -c \"echo '{user}:{password}' | chpasswd\""))
+ o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\""))
+ pass
+
+ def user_set_shell(self, user, shell):
+ self.log(f'Setting shell for {user} to {shell}', level=LOG_LEVELS.Info)
+
+ o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
pass
def set_keyboard_language(self, language):
if len(language.strip()):
- with open(f'{self.mountpoint}/etc/vconsole.conf', 'w') as vconsole:
+ with open(f'{self.target}/etc/vconsole.conf', 'w') as vconsole:
vconsole.write(f'KEYMAP={language}\n')
vconsole.write(f'FONT=lat9w-16\n')
+ else:
+ self.log(f'Keyboard language was not changed from default (no language specified).', fg="yellow", level=LOG_LEVELS.Info)
return True
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
index 523a23d5..736bfc47 100644
--- a/archinstall/lib/locale_helpers.py
+++ b/archinstall/lib/locale_helpers.py
@@ -1,29 +1,25 @@
+import subprocess
import os
from .exceptions import *
# from .general import sys_command
-def list_keyboard_languages(layout='qwerty'):
+def list_keyboard_languages():
locale_dir = '/usr/share/kbd/keymaps/'
if not os.path.isdir(locale_dir):
raise RequirementError(f'Directory containing locales does not exist: {locale_dir}')
for root, folders, files in os.walk(locale_dir):
- # Since qwerty is under /i386/ but other layouts are
- # in different spots, we'll need to filter the last foldername
- # of the path to verify against the desired layout.
- if os.path.basename(root) != layout:
- continue
for file in files:
if os.path.splitext(file)[1] == '.gz':
yield file.strip('.gz').strip('.map')
-def search_keyboard_layout(filter, layout='qwerty'):
- for language in list_keyboard_languages(layout):
+def search_keyboard_layout(filter):
+ for language in list_keyboard_languages():
if filter.lower() in language.lower():
yield language
def set_keyboard_language(locale):
- return os.system(f'loadkeys {locale}') == 0
+ return subprocess.call(['loadkeys', locale]) == 0
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 62067ec1..ca077b3d 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,5 +1,7 @@
import os
import shlex
+import time
+import pathlib
from .exceptions import *
from .general import *
from .disk import Partition
@@ -43,8 +45,6 @@ class luks2():
return True
def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
- # TODO: We should be able to integrate this into the main log some how.
- # Perhaps post-mortem?
if not self.partition.allow_formatting:
raise DiskError(f'Could not encrypt volume {self.partition} due to it having a formatting lock.')
@@ -116,7 +116,7 @@ class luks2():
def unlock(self, partition, mountpoint, key_file):
"""
- Mounts a lukts2 compatible partition to a certain mountpoint.
+ Mounts a luks2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
@@ -125,6 +125,11 @@ class luks2():
from .disk import get_filesystem_type
if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead?
+
+ wait_timer = time.time()
+ while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10:
+ time.sleep(0.025)
+
sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
if os.path.islink(f'/dev/mapper/{mountpoint}'):
self.mapdev = f'/dev/mapper/{mountpoint}'
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index d7d35782..04f47c0d 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -74,10 +74,15 @@ def re_rank_mirrors(top=10, *positionals, **kwargs):
def list_mirrors():
url = f"https://archlinux.org/mirrorlist/?protocol=https&ip_version=4&ip_version=6&use_mirror_status=on"
-
- response = urllib.request.urlopen(url)
regions = {}
+ try:
+ response = urllib.request.urlopen(url)
+ except urllib.error.URLError as err:
+ log(f'Could not fetch an active mirror-list: {err}', level=LOG_LEVELS.Warning, fg="yellow")
+ return regions
+
+
region = 'Unknown region'
for line in response.readlines():
if len(line.strip()) == 0:
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index 882bcff3..2dc8be9b 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -56,7 +56,7 @@ def wirelessScan(interface):
storage['_WIFI'][interface]['scanning'] = True
-# TOOD: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
+# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
def getWirelessNetworks(interface):
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
if not '_WIFI' in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
@@ -65,4 +65,4 @@ def getWirelessNetworks(interface):
time.sleep(5)
for line in sys_command(f"iwctl station {interface} get-networks"):
- print(line) \ No newline at end of file
+ print(line)
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index 537fb695..6b184b4b 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -6,7 +6,7 @@ from pathlib import Path
from .storage import storage
# TODO: use logging's built in levels instead.
-# Altough logging is threaded and I wish to avoid that.
+# Although logging is threaded and I wish to avoid that.
# It's more Pythonistic or w/e you want to call it.
class LOG_LEVELS:
Critical = 0b001
@@ -88,7 +88,7 @@ def log(*args, **kwargs):
# Attempt to colorize the output if supported
# Insert default colors and override with **kwargs
if supports_color():
- kwargs = {'bg' : 'black', 'fg': 'white', **kwargs}
+ kwargs = {'fg': 'white', **kwargs}
string = stylize_output(string, **kwargs)
# If a logfile is defined in storage,
@@ -130,4 +130,4 @@ def log(*args, **kwargs):
# We use sys.stdout.write()+flush() instead of print() to try and
# fix issue #94
sys.stdout.write(f"{string}\n")
- sys.stdout.flush() \ No newline at end of file
+ sys.stdout.flush()
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 70c21a67..c5f63e72 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -112,11 +112,11 @@ class Script():
if f"{self.profile}" in self.examples:
return self.localize_path(self.examples[self.profile]['path'])
- # TODO: Redundant, the below block shouldnt be needed as profiles are stripped of their .py, but just in case for now:
+ # TODO: Redundant, the below block shouldn't be needed as profiles are stripped of their .py, but just in case for now:
elif f"{self.profile}.py" in self.examples:
return self.localize_path(self.examples[f"{self.profile}.py"]['path'])
- # Path was not found in any known examples, check if it's an abolute path
+ # Path was not found in any known examples, check if it's an absolute path
if os.path.isfile(self.profile):
return self.profile
@@ -156,7 +156,7 @@ class Profile(Script):
def install(self):
# Before installing, revert any temporary changes to the namespace.
- # This ensures that the namespace during installation is the original initation namespace.
+ # This ensures that the namespace during installation is the original initiation namespace.
# (For instance awesome instead of aweosme.py or app-awesome.py)
self.namespace = self.original_namespace
return self.execute()
@@ -177,6 +177,7 @@ class Profile(Script):
if hasattr(imported, '_prep_function'):
return True
return False
+
def has_post_install(self):
with open(self.path, 'r') as source:
source_data = source.read()
@@ -193,6 +194,56 @@ class Profile(Script):
if hasattr(imported, '_post_install'):
return True
+ def is_top_level_profile(self):
+ with open(self.path, 'r') as source:
+ source_data = source.read()
+
+ # TODO: I imagine that there is probably a better way to write this.
+ return 'top_level_profile = True' in source_data
+
+ @property
+ def packages(self) -> list:
+ """
+ Returns a list of packages baked into the profile definition.
+ If no package definition has been done, .packages() will return None.
+ """
+ with open(self.path, 'r') as source:
+ source_data = source.read()
+
+ # Some crude safety checks, make sure the imported profile has
+ # a __name__ check before importing.
+ #
+ # If the requirements are met, import with .py in the namespace to not
+ # trigger a traditional:
+ # if __name__ == 'moduleName'
+ if '__name__' in source_data and '__packages__' in source_data:
+ with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
+ if hasattr(imported, '__packages__'):
+ return imported.__packages__
+ return None
+
+
+ def has_post_install(self):
+ with open(self.path, 'r') as source:
+ source_data = source.read()
+
+ # Some crude safety checks, make sure the imported profile has
+ # a __name__ check and if so, check if it's got a _prep_function()
+ # we can call to ask for more user input.
+ #
+ # If the requirements are met, import with .py in the namespace to not
+ # trigger a traditional:
+ # if __name__ == 'moduleName'
+ if '__name__' in source_data and '_post_install' in source_data:
+ with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
+ if hasattr(imported, '_post_install'):
+ return True
+
+ def is_top_level_profile(self):
+ with open(self.path, 'r') as source:
+ source_data = source.read()
+ return 'top_level_profile = True' in source_data
+
@property
def packages(self) -> list:
"""
@@ -231,11 +282,11 @@ class Application(Profile):
if f"{self.profile}" in self.examples:
return self.localize_path(self.examples[self.profile]['path'])
- # TODO: Redundant, the below block shouldnt be needed as profiles are stripped of their .py, but just in case for now:
+ # TODO: Redundant, the below block shouldn't be needed as profiles are stripped of their .py, but just in case for now:
elif f"{self.profile}.py" in self.examples:
return self.localize_path(self.examples[f"{self.profile}.py"]['path'])
- # Path was not found in any known examples, check if it's an abolute path
+ # Path was not found in any known examples, check if it's an absolute path
if os.path.isfile(self.profile):
return os.path.basename(self.profile)
@@ -247,7 +298,7 @@ class Application(Profile):
def install(self):
# Before installing, revert any temporary changes to the namespace.
- # This ensures that the namespace during installation is the original initation namespace.
+ # This ensures that the namespace during installation is the original initiation namespace.
# (For instance awesome instead of aweosme.py or app-awesome.py)
self.namespace = self.original_namespace
- return self.execute() \ No newline at end of file
+ return self.execute()
diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py
index 8fcdd296..bb6f64f2 100644
--- a/archinstall/lib/services.py
+++ b/archinstall/lib/services.py
@@ -7,6 +7,6 @@ def service_state(service_name: str):
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe
- state = b''.join(sys_command(f'systemctl show -p SubState --value {service_name}'))
+ state = b''.join(sys_command(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS' : '0'}))
return state.strip().decode('UTF-8')
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index 9bda017d..43d088bb 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -14,7 +14,7 @@ storage = {
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'profiles'),
#os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
],
- 'UPSTREAM_URL' : 'https://raw.githubusercontent.com/Torxed/archinstall/master/profiles',
+ 'UPSTREAM_URL' : 'https://raw.githubusercontent.com/archlinux/archinstall/master/profiles',
'PROFILE_DB' : None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
'LOG_PATH' : '/var/log/archinstall',
'LOG_FILE' : 'install.log',
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 58f88bd2..822f63be 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -1,10 +1,13 @@
-import getpass, pathlib, os, shutil
+import getpass, pathlib, os, shutil, re
+import sys, time, signal
from .exceptions import *
from .profiles import Profile
from .locale_helpers import search_keyboard_layout
from .output import log, LOG_LEVELS
from .storage import storage
from .networking import list_interfaces
+from .general import sys_command
+from .hardware import AVAILABLE_GFX_DRIVERS
## TODO: Some inconsistencies between the selection processes.
## Some return the keys from the options, some the values?
@@ -18,11 +21,56 @@ def get_terminal_width():
def get_longest_option(options):
return max([len(x) for x in options])
+def check_for_correct_username(username):
+ if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
+ return True
+ log(
+ "The username you entered is invalid. Try again",
+ level=LOG_LEVELS.Warning,
+ fg='red'
+ )
+ return False
+
+def do_countdown():
+ SIG_TRIGGER = False
+ def kill_handler(sig, frame):
+ print()
+ exit(0)
+
+ def sig_handler(sig, frame):
+ global SIG_TRIGGER
+ SIG_TRIGGER = True
+ signal.signal(signal.SIGINT, kill_handler)
+
+ original_sigint_handler = signal.getsignal(signal.SIGINT)
+ signal.signal(signal.SIGINT, sig_handler)
+
+ for i in range(5, 0, -1):
+ print(f"{i}", end='')
+
+ for x in range(4):
+ sys.stdout.flush()
+ time.sleep(0.25)
+ print(".", end='')
+
+ if SIG_TRIGGER:
+ abort = input('\nDo you really want to abort (y/n)? ')
+ if abort.strip() != 'n':
+ exit(0)
+
+ if SIG_TRIGGER is False:
+ sys.stdin.read()
+ SIG_TRIGGER = False
+ signal.signal(signal.SIGINT, sig_handler)
+ print()
+ signal.signal(signal.SIGINT, original_sigint_handler)
+ return True
+
def get_password(prompt="Enter a password: "):
while (passwd := getpass.getpass(prompt)):
passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
if passwd != passwd_verification:
- log(' * Passwords did not match * ', bg='black', fg='red')
+ log(' * Passwords did not match * ', fg='red')
continue
if len(passwd.strip()) <= 0:
@@ -50,14 +98,16 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
def ask_for_superuser_account(prompt='Create a required super-user with sudo privileges: ', forced=False):
while 1:
new_user = input(prompt).strip(' ')
-
+
if not new_user and forced:
# TODO: make this text more generic?
# It's only used to create the first sudo user when root is disabled in guided.py
- log(' * Since root is disabled, you need to create a least one (super) user!', bg='black', fg='red')
+ log(' * Since root is disabled, you need to create a least one (super) user!', fg='red')
continue
elif not new_user and not forced:
raise UserError("No superuser was created.")
+ elif not check_for_correct_username(new_user):
+ continue
password = get_password(prompt=f'Password for user {new_user}: ')
return {new_user: {"!password" : password}}
@@ -70,6 +120,8 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
new_user = input(prompt).strip(' ')
if not new_user:
break
+ if not check_for_correct_username(new_user):
+ continue
password = get_password(prompt=f'Password for user {new_user}: ')
if input("Should this user be a sudo (super) user (y/N): ").strip(' ').lower() in ('y', 'yes'):
@@ -80,7 +132,9 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
return users, super_users
def ask_for_a_timezone():
- timezone = input('Enter a valid timezone (Example: Europe/Stockholm): ').strip()
+ timezone = input('Enter a valid timezone (examples: Europe/Stockholm, US/Eastern) or press enter to use UTC: ').strip()
+ if timezone == '':
+ timezone = 'UTC'
if (pathlib.Path("/usr")/"share"/"zoneinfo"/timezone).exists():
return timezone
else:
@@ -89,6 +143,14 @@ def ask_for_a_timezone():
level=LOG_LEVELS.Warning,
fg='red'
)
+
+def ask_for_audio_selection():
+ audio = "pulseaudio" # Default for most desktop environments
+ pipewire_choice = input("Would you like to install pipewire instead of pulseaudio as the default audio server? [Y/n] ").lower()
+ if pipewire_choice in ("y", ""):
+ audio = "pipewire"
+
+ return audio
def ask_to_configure_network():
# Optionally configure one network interface.
@@ -110,7 +172,6 @@ def ask_to_configure_network():
log(
"You need to enter a valid IP in IP-config mode.",
level=LOG_LEVELS.Warning,
- bg='black',
fg='red'
)
@@ -127,7 +188,7 @@ def ask_to_configure_network():
elif nic:
return nic
- return None
+ return {}
def ask_for_disk_layout():
options = {
@@ -153,7 +214,7 @@ def ask_for_main_filesystem_format():
def generic_select(options, input_text="Select one of the above by index or absolute value: ", sort=True):
"""
A generic select function that does not output anything
- other than the options and their indexs. As an example:
+ other than the options and their indexes. As an example:
generic_select(["first", "second", "third option"])
1: first
@@ -173,7 +234,7 @@ def generic_select(options, input_text="Select one of the above by index or abso
return None
elif selected_option.isdigit():
selected_option = int(selected_option)
- if selected_option >= len(options):
+ if selected_option > len(options):
raise RequirementError(f'Selected option "{selected_option}" is out of range')
selected_option = options[selected_option]
elif selected_option in options:
@@ -198,8 +259,10 @@ def select_disk(dict_o_disks):
if len(drives) >= 1:
for index, drive in enumerate(drives):
print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})")
- drive = input('Select one of the above disks (by number or full path): ')
- if drive.isdigit():
+ drive = input('Select one of the above disks (by number or full path) or write /mnt to skip partitioning: ')
+ if drive.strip() == '/mnt':
+ return None
+ elif drive.isdigit():
drive = int(drive)
if drive >= len(drives):
raise DiskError(f'Selected option "{drive}" is out of range')
@@ -262,6 +325,8 @@ def select_language(options, show_only_country_codes=True):
:return: The language/dictionary key of the selected language
:rtype: str
"""
+ DEFAULT_KEYBOARD_LANGUAGE = 'us'
+
if show_only_country_codes:
languages = sorted([language for language in list(options) if len(language) == 2])
else:
@@ -271,9 +336,12 @@ def select_language(options, show_only_country_codes=True):
for index, language in enumerate(languages):
print(f"{index}: {language}")
- print(' -- You can enter ? or help to search for more languages --')
+ print(' -- You can enter ? or help to search for more languages, or skip to use US layout --')
selected_language = input('Select one of the above keyboard languages (by number or full name): ')
- if selected_language.lower() in ('?', 'help'):
+
+ if len(selected_language.strip()) == 0:
+ return DEFAULT_KEYBOARD_LANGUAGE
+ elif selected_language.lower() in ('?', 'help'):
while True:
filter_string = input('Search for layout containing (example: "sv-"): ')
new_options = list(search_keyboard_layout(filter_string))
@@ -286,6 +354,7 @@ def select_language(options, show_only_country_codes=True):
elif selected_language.isdigit() and (pos := int(selected_language)) <= len(languages)-1:
selected_language = languages[pos]
+ return selected_language
# I'm leaving "options" on purpose here.
# Since languages possibly contains a filtered version of
# all possible language layouts, and we might want to write
@@ -293,9 +362,9 @@ def select_language(options, show_only_country_codes=True):
# go through the search step.
elif selected_language in options:
selected_language = options[options.index(selected_language)]
+ return selected_language
else:
- RequirementError("Selected language does not exist.")
- return selected_language
+ raise RequirementError("Selected language does not exist.")
raise RequirementError("Selecting languages require a least one language to be given as an option.")
@@ -319,26 +388,64 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
selected_mirrors = {}
if len(regions) >= 1:
- print_large_list(regions, margin_bottom=2)
+ print_large_list(regions, margin_bottom=4)
print(' -- You can skip this step by leaving the option blank --')
selected_mirror = input('Select one of the above regions to download packages from (by number or full name): ')
if len(selected_mirror.strip()) == 0:
+ # Returning back empty options which can be both used to
+ # do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining
return {}
- elif selected_mirror.isdigit() and (pos := int(selected_mirror)) <= len(regions)-1:
+ elif selected_mirror.isdigit() and int(selected_mirror) <= len(regions)-1:
+ # I'm leaving "mirrors" on purpose here.
+ # Since region possibly contains a known region of
+ # all possible regions, and we might want to write
+ # for instance Sweden (if we know that exists) without having to
+ # go through the search step.
region = regions[int(selected_mirror)]
selected_mirrors[region] = mirrors[region]
- # I'm leaving "mirrors" on purpose here.
- # Since region possibly contains a known region of
- # all possible regions, and we might want to write
- # for instance Sweden (if we know that exists) without having to
- # go through the search step.
elif selected_mirror in mirrors:
selected_mirrors[selected_mirror] = mirrors[selected_mirror]
else:
- RequirementError("Selected region does not exist.")
+ raise RequirementError("Selected region does not exist.")
return selected_mirrors
raise RequirementError("Selecting mirror region require a least one region to be given as an option.")
+
+def select_driver(options=AVAILABLE_GFX_DRIVERS):
+ """
+ Some what convoluted function, which's job is simple.
+ Select a graphics driver from a pre-defined set of popular options.
+
+ (The template xorg is for beginner users, not advanced, and should
+ there for appeal to the general public first and edge cases later)
+ """
+ if len(options) >= 1:
+ lspci = sys_command(f'/usr/bin/lspci')
+ for line in lspci.trace_log.split(b'\r\n'):
+ if b' vga ' in line.lower():
+ if b'nvidia' in line.lower():
+ print(' ** nvidia card detected, suggested driver: nvidia **')
+ elif b'amd' in line.lower():
+ print(' ** AMD card detected, suggested driver: AMD / ATI **')
+
+ selected_driver = generic_select(options, input_text="Select your graphics card driver: ", sort=True)
+ initial_option = selected_driver
+
+ if type(options[initial_option]) == dict:
+ driver_options = sorted(options[initial_option].keys())
+
+ selected_driver_package_group = generic_select(driver_options, input_text=f"Which driver-type do you want for {initial_option}: ")
+ if selected_driver_package_group in options[initial_option].keys():
+ print(options[initial_option][selected_driver_package_group])
+ selected_driver = options[initial_option][selected_driver_package_group]
+ else:
+ raise RequirementError(f"Selected driver-type does not exist for {initial_option}.")
+
+ return selected_driver_package_group
+
+ return selected_driver
+
+ raise RequirementError("Selecting drivers require a least one profile to be given as an option.")