index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Anton Hvornum <anton@hvornum.se> | 2021-05-15 22:54:22 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-15 22:54:22 +0000 |
commit | e3c8692bfa65d9b689bb50f9a4469989332adde4 (patch) | |
tree | 313530a7dc056fee85c7a2ce6e31b12dd8e61d29 /archinstall | |
parent | 92e8cdae175c162d38253e6376a94828d1c3987a (diff) | |
parent | 0ce2ffa4cf9638ddbcace2f726e6c5ebbbe2ac75 (diff) |
-rw-r--r-- | archinstall/__init__.py | 10 | ||||
-rw-r--r-- | archinstall/lib/disk.py | 107 | ||||
-rw-r--r-- | archinstall/lib/general.py | 24 | ||||
-rw-r--r-- | archinstall/lib/hardware.py | 30 | ||||
-rw-r--r-- | archinstall/lib/installer.py | 107 | ||||
-rw-r--r-- | archinstall/lib/locale_helpers.py | 4 | ||||
-rw-r--r-- | archinstall/lib/luks.py | 19 | ||||
-rw-r--r-- | archinstall/lib/mirrors.py | 8 | ||||
-rw-r--r-- | archinstall/lib/networking.py | 13 | ||||
-rw-r--r-- | archinstall/lib/output.py | 14 | ||||
-rw-r--r-- | archinstall/lib/profiles.py | 10 | ||||
-rw-r--r-- | archinstall/lib/services.py | 2 | ||||
-rw-r--r-- | archinstall/lib/storage.py | 2 | ||||
-rw-r--r-- | archinstall/lib/user_interaction.py | 41 |
diff --git a/archinstall/__init__.py b/archinstall/__init__.py index f1d8341e..18c83a31 100644 --- a/archinstall/__init__.py +++ b/archinstall/__init__.py @@ -17,9 +17,9 @@ from .lib.user_interaction import * __version__ = "2.2.0.dev1" -## Basic version of arg.parse() supporting: -## --key=value -## --boolean +# Basic version of arg.parse() supporting: +# --key=value +# --boolean arguments = {} positionals = [] for arg in sys.argv[1:]: @@ -33,8 +33,8 @@ for arg in sys.argv[1:]: positionals.append(arg) -# TODO: Learn the dark arts of argparse... -# (I summon thee dark spawn of cPython) +# TODO: Learn the dark arts of argparse... (I summon thee dark spawn of cPython) + def run_as_a_module(): """ diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py index c099cca1..f8703ae3 100644 --- a/archinstall/lib/disk.py +++ b/archinstall/lib/disk.py @@ -2,9 +2,10 @@ import glob import pathlib import re from collections import OrderedDict +from typing import Optional from .general import * -from .hardware import hasUEFI +from .hardware import has_uefi from .output import log ROOT_DIR_PATTERN = re.compile('^.*?/devices') @@ -17,7 +18,8 @@ MBR = 0b00000010 # libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) # libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p) -class BlockDevice(): + +class BlockDevice: def __init__(self, path, info=None): if not info: # If we don't give any information, we need to auto-fill it. @@ -75,8 +77,9 @@ class BlockDevice(): raise DiskError(f'Could not locate backplane info for "{self.path}"') if self.info['type'] == 'loop': - for drive in json.loads(b''.join(sys_command(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']: - if not drive['name'] == self.path: continue + for drive in json.loads(b''.join(SysCommand(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']: + if not drive['name'] == self.path: + continue return drive['back-file'] elif self.info['type'] == 'disk': @@ -91,15 +94,15 @@ class BlockDevice(): else: log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG) - # if not stat.S_ISBLK(os.stat(full_path).st_mode): - # raise DiskError(f'Selected disk "{full_path}" is not a block device.') + # if not stat.S_ISBLK(os.stat(full_path).st_mode): + # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @property def partitions(self): - o = b''.join(sys_command(['partprobe', self.path])) + o = b''.join(SysCommand(['partprobe', self.path])) # o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev))) - o = b''.join(sys_command(['/usr/bin/lsblk', '-J', self.path])) + o = b''.join(SysCommand(['/usr/bin/lsblk', '-J', self.path])) if b'not a block device' in o: raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') @@ -113,7 +116,7 @@ class BlockDevice(): for part in r['blockdevices'][0]['children']: part_id = part['name'][len(os.path.basename(self.path)):] if part_id not in self.part_cache: - ## TODO: Force over-write even if in cache? + # TODO: Force over-write even if in cache? if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: self.part_cache[part_id] = Partition(root_path + part_id, self, part_id=part_id, size=part['size']) @@ -136,7 +139,7 @@ class BlockDevice(): This is more reliable than relying on /dev/disk/by-partuuid as it doesn't seam to be able to detect md raid partitions. """ - lsblk = b''.join(sys_command(f'lsblk -J -o+UUID {self.path}')) + lsblk = b''.join(SysCommand(f'lsblk -J -o+UUID {self.path}')) for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: return partition.get('uuid', None) @@ -153,7 +156,7 @@ class BlockDevice(): self.part_cache = OrderedDict() -class Partition(): +class Partition: def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): if not part_id: part_id = os.path.basename(path) @@ -177,11 +180,11 @@ class Partition(): if self.mountpoint != mount_information.get('target', None) and mountpoint: raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}") - if (target := mount_information.get('target', None)): + if target := mount_information.get('target', None): self.mountpoint = target if not self.filesystem and autodetect_filesystem: - if (fstype := mount_information.get('fstype', get_filesystem_type(path))): + if fstype := mount_information.get('fstype', get_filesystem_type(path)): self.filesystem = fstype if self.filesystem == 'crypto_LUKS': @@ -213,7 +216,7 @@ class Partition(): This is more reliable than relying on /dev/disk/by-partuuid as it doesn't seam to be able to detect md raid partitions. """ - lsblk = b''.join(sys_command(f'lsblk -J -o+PARTUUID {self.path}')) + lsblk = b''.join(SysCommand(f'lsblk -J -o+PARTUUID {self.path}')) for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']: return partition.get('partuuid', None) return None @@ -233,10 +236,10 @@ class Partition(): @property def real_device(self): - for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']: - if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))): + for blockdevice in json.loads(b''.join(SysCommand('lsblk -J')).decode('UTF-8'))['blockdevices']: + if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)): return f"/dev/{parent}" - # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') + # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') return self.path def detect_inner_filesystem(self, password): @@ -257,11 +260,11 @@ class Partition(): temporary_path = pathlib.Path(temporary_mountpoint) temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') files = len(glob.glob(f"{temporary_mountpoint}/*")) - sys_command(f'/usr/bin/umount {temporary_mountpoint}') + SysCommand(f'/usr/bin/umount {temporary_mountpoint}') temporary_path.rmdir() @@ -324,36 +327,36 @@ class Partition(): log(f'Formatting {path} -> {filesystem}', level=logging.INFO) if filesystem == 'btrfs': - o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}')) + o = b''.join(SysCommand(f'/usr/bin/mkfs.btrfs -f {path}')) if b'UUID' not in o: raise DiskError(f'Could not format {path} with {filesystem} because: {o}') self.filesystem = 'btrfs' elif filesystem == 'vfat': - o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {path}')) + o = b''.join(SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}')) if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: raise DiskError(f'Could not format {path} with {filesystem} because: {o}') self.filesystem = 'vfat' elif filesystem == 'ext4': - if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0: raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'ext4' elif filesystem == 'xfs': - if (handle := sys_command(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0: raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'xfs' elif filesystem == 'f2fs': - if (handle := sys_command(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: + if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0: raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') self.filesystem = 'f2fs' elif filesystem == 'crypto_LUKS': - # from .luks import luks2 - # encrypted_partition = luks2(self, None, None) - # encrypted_partition.format(path) + # from .luks import luks2 + # encrypted_partition = luks2(self, None, None) + # encrypted_partition.format(path) self.filesystem = 'crypto_LUKS' else: @@ -371,23 +374,24 @@ class Partition(): return parent elif 'children' in data: for child in data['children']: - if (parent := self.find_parent_of(child, name, parent=data['name'])): + if parent := self.find_parent_of(child, name, parent=data['name']): return parent def mount(self, target, fs=None, options=''): if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: - if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') + if not self.filesystem: + raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') fs = self.filesystem pathlib.Path(target).mkdir(parents=True, exist_ok=True) try: if options: - sys_command(f'/usr/bin/mount -o {options} {self.path} {target}') + SysCommand(f'/usr/bin/mount -o {options} {self.path} {target}') else: - sys_command(f'/usr/bin/mount {self.path} {target}') + SysCommand(f'/usr/bin/mount {self.path} {target}') except SysCallError as err: raise err @@ -396,14 +400,14 @@ class Partition(): def unmount(self): try: - exit_code = sys_command(f'/usr/bin/umount {self.path}').exit_code + exit_code = SysCommand(f'/usr/bin/umount {self.path}').exit_code except SysCallError as err: exit_code = err.exit_code # Without to much research, it seams that low error codes are errors. # And above 8k is indicators such as "/dev/x not mounted.". # So anything in between 0 and 8k are errors (?). - if exit_code > 0 and exit_code < 8000: + if 0 < exit_code < 8000: raise err self.mountpoint = None @@ -416,8 +420,8 @@ class Partition(): """ The support for a filesystem (this partition) is tested by calling partition.format() with a path set to '/dev/null' which returns two exceptions: - 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported - 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type + 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported + 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type """ try: self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) @@ -428,7 +432,7 @@ class Partition(): return True -class Filesystem(): +class Filesystem: # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them # as well as close any crypto handles. @@ -446,7 +450,7 @@ class Filesystem(): else: raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') elif self.mode == MBR: - if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0: + if SysCommand(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0: return self else: raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos') @@ -468,7 +472,7 @@ class Filesystem(): # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] - b''.join(sys_command('sync')) + b''.join(SysCommand('sync')) return True def find_partition(self, mountpoint): @@ -477,7 +481,7 @@ class Filesystem(): return partition def raw_parted(self, string: str): - x = sys_command(f'/usr/bin/parted -s {string}') + x = SysCommand(f'/usr/bin/parted -s {string}') return x def parted(self, string: str): @@ -491,8 +495,8 @@ class Filesystem(): def use_entire_disk(self, root_filesystem_type='ext4'): log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG) - if hasUEFI(): - self.add_partition('primary', start='1MiB', end='513MiB', format='fat32') + if has_uefi(): + self.add_partition('primary', start='1MiB', end='513MiB', partition_format='fat32') self.set_name(0, 'EFI') self.set(0, 'boot on') # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"? @@ -517,17 +521,17 @@ class Filesystem(): self.blockdevice.partition[0].target_mountpoint = '/' self.blockdevice.partition[0].allow_formatting = True - def add_partition(self, type, start, end, format=None): + def add_partition(self, partition_type, start, end, partition_format=None): log(f'Adding partition to {self.blockdevice}', level=logging.INFO) previous_partitions = self.blockdevice.partitions if self.mode == MBR: if len(self.blockdevice.partitions) > 3: DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions") - if format: - partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {format} {start} {end}') == 0 + if partition_format: + partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}') == 0 else: - partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {start} {end}') == 0 + partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {start} {end}') == 0 if partitioning: start_wait = time.time() @@ -568,8 +572,9 @@ def all_disks(*args, **kwargs): kwargs.setdefault("partitions", False) drives = OrderedDict() # for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']: - for drive in json.loads(b''.join(sys_command('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']: - if not kwargs['partitions'] and drive['type'] == 'part': continue + for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']: + if not kwargs['partitions'] and drive['type'] == 'part': + continue drives[drive['path']] = BlockDevice(drive['path'], drive) return drives @@ -600,7 +605,7 @@ def harddrive(size=None, model=None, fuzzy=False): def get_mount_info(path): try: - output = b''.join(sys_command(f'/usr/bin/findmnt --json {path}')) + output = b''.join(SysCommand(f'/usr/bin/findmnt --json {path}')) except SysCallError: return {} @@ -615,7 +620,7 @@ def get_mount_info(path): def get_partitions_in_use(mountpoint): try: - output = b''.join(sys_command(f'/usr/bin/findmnt --json -R {mountpoint}')) + output = b''.join(SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}')) except SysCallError: return {} @@ -634,7 +639,7 @@ def get_partitions_in_use(mountpoint): def get_filesystem_type(path): try: - handle = sys_command(f"blkid -o value -s TYPE {path}") + handle = SysCommand(f"blkid -o value -s TYPE {path}") return b''.join(handle).strip().decode('UTF-8') except SysCallError: return None @@ -642,7 +647,7 @@ def get_filesystem_type(path): def disk_layouts(): try: - handle = sys_command("lsblk -f -o+TYPE,SIZE -J") + handle = SysCommand("lsblk -f -o+TYPE,SIZE -J") return json.loads(b''.join(handle).decode('UTF-8')) except SysCallError as err: log(f"Could not return disk layouts: {err}") diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index b65e2593..9fbf2654 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -42,7 +42,7 @@ def locate_binary(name): break # Don't recurse -class JSON_Encoder: +class JsonEncoder: def _encode(obj): if isinstance(obj, dict): # We'll need to iterate not just the value that default() usually gets passed @@ -54,12 +54,12 @@ class JSON_Encoder: # This, is a EXTREMELY ugly hack.. but it's the only quick way I can think of to trigger a encoding of sub-dictionaries. val = json.loads(json.dumps(val, cls=JSON)) else: - val = JSON_Encoder._encode(val) + val = JsonEncoder._encode(val) if type(key) == str and key[0] == '!': - copy[JSON_Encoder._encode(key)] = '******' + copy[JsonEncoder._encode(key)] = '******' else: - copy[JSON_Encoder._encode(key)] = val + copy[JsonEncoder._encode(key)] = val return copy elif hasattr(obj, 'json'): return obj.json() @@ -78,18 +78,20 @@ class JSON_Encoder: class JSON(json.JSONEncoder, json.JSONDecoder): def _encode(self, obj): - return JSON_Encoder._encode(obj) + return JsonEncoder._encode(obj) def encode(self, obj): return super(JSON, self).encode(self._encode(obj)) -class sys_command: +class SysCommand: """ Stolen from archinstall_gui """ - def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars={}, *args, **kwargs): + def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars=None, *args, **kwargs): + if environment_vars is None: + environment_vars = {} kwargs.setdefault("worker_id", gen_uid()) kwargs.setdefault("emulate", False) kwargs.setdefault("suppress_errors", False) @@ -170,7 +172,7 @@ class sys_command: 'ended': self.ended, 'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)), 'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None, - 'exit_code': self.exit_code + 'exit_code': self.exit_code, } def peak(self, output: Union[str, bytes]) -> bool: @@ -256,7 +258,7 @@ class sys_command: original = trigger trigger = bytes(original, 'UTF-8') self.kwargs['events'][trigger] = self.kwargs['events'][original] - del (self.kwargs['events'][original]) + del self.kwargs['events'][original] if type(self.kwargs['events'][trigger]) != bytes: self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8') @@ -269,7 +271,7 @@ class sys_command: last_trigger_pos = trigger_pos os.write(child_fd, self.kwargs['events'][trigger]) - del (self.kwargs['events'][trigger]) + del self.kwargs['events'][trigger] broke = True break @@ -334,4 +336,4 @@ def prerequisite_check(): def reboot(): - o = b''.join(sys_command("/usr/bin/reboot")) + o = b''.join(SysCommand("/usr/bin/reboot")) diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py index d1723cde..f527b5da 100644 --- a/archinstall/lib/hardware.py +++ b/archinstall/lib/hardware.py @@ -3,7 +3,7 @@ import os import subprocess from typing import Optional -from .general import sys_command +from .general import SysCommand from .networking import list_interfaces, enrich_iface_types __packages__ = [ @@ -56,48 +56,48 @@ AVAILABLE_GFX_DRIVERS = { } -def hasWifi() -> bool: +def has_wifi() -> bool: return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values() -def hasAMDCPU() -> bool: +def has_amd_cpu() -> bool: if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode(): return True return False -def hasIntelCPU() -> bool: +def has_intel_cpu() -> bool: if subprocess.check_output("lscpu | grep Intel", shell=True).strip().decode(): return True return False -def hasUEFI() -> bool: +def has_uefi() -> bool: return os.path.isdir('/sys/firmware/efi') -def graphicsDevices() -> dict: +def graphics_devices() -> dict: cards = {} - for line in sys_command("lspci"): + for line in SysCommand("lspci"): if b' VGA ' in line: _, identifier = line.split(b': ', 1) cards[identifier.strip().lower().decode('UTF-8')] = line return cards -def hasNvidiaGraphics() -> bool: - return any('nvidia' in x for x in graphicsDevices()) +def has_nvidia_graphics() -> bool: + return any('nvidia' in x for x in graphics_devices()) -def hasAmdGraphics() -> bool: - return any('amd' in x for x in graphicsDevices()) +def has_amd_graphics() -> bool: + return any('amd' in x for x in graphics_devices()) -def hasIntelGraphics() -> bool: - return any('intel' in x for x in graphicsDevices()) +def has_intel_graphics() -> bool: + return any('intel' in x for x in graphics_devices()) -def cpuVendor() -> Optional[str]: +def cpu_vendor() -> Optional[str]: cpu_info = json.loads(subprocess.check_output("lscpu -J", shell=True).decode('utf-8'))['lscpu'] for info in cpu_info: if info.get('field', None): @@ -106,7 +106,7 @@ def cpuVendor() -> Optional[str]: return None -def isVM() -> bool: +def is_vm() -> bool: try: subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a non-zero exit code if it is not on a virtual machine return True diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index bac19007..aa2ea920 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -9,7 +9,7 @@ from .user_interaction import * __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"] -class Installer(): +class Installer: """ `Installer()` is the wrapper for most basic installation steps. It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. @@ -34,7 +34,11 @@ class Installer(): """ - def __init__(self, target, *, base_packages=__packages__[:3], kernels=['linux']): + def __init__(self, target, *, base_packages=None, kernels=None): + if base_packages is None: + base_packages = __packages__[:3] + if kernels is None: + kernels = ['linux'] self.target = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) @@ -107,7 +111,7 @@ class Installer(): # Copy over the install log (if there is one) to the install medium if # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. if self.helper_flags.get('base-strapped', False) is True: - if (filename := storage.get('LOG_FILE', None)): + if filename := storage.get('LOG_FILE', None): absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"): @@ -127,11 +131,12 @@ class Installer(): return [step for step, flag in self.helper_flags.items() if flag is False] def pacstrap(self, *packages, **kwargs): - if type(packages[0]) in (list, tuple): packages = packages[0] + if type(packages[0]) in (list, tuple): + packages = packages[0] self.log(f'Installing packages: {packages}', level=logging.INFO) - if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0: - if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0: + if (sync_mirrors := SysCommand('/usr/bin/pacman -Syy')).exit_code == 0: + if (pacstrap := SysCommand(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0: return True else: self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO) @@ -144,7 +149,7 @@ class Installer(): def genfstab(self, flags='-pU'): self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) - fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log + fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').trace_log with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh: fstab_fh.write(fstab) @@ -158,22 +163,25 @@ class Installer(): fh.write(hostname + '\n') def set_locale(self, locale, encoding='UTF-8', *args, **kwargs): - if not len(locale): return True + if not len(locale): + return True with open(f'{self.target}/etc/locale.gen', 'a') as fh: fh.write(f'{locale}.{encoding} {encoding}\n') with open(f'{self.target}/etc/locale.conf', 'w') as fh: fh.write(f'LANG={locale}.{encoding}\n') - return True if sys_command(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False + return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False def set_timezone(self, zone, *args, **kwargs): - if not zone: return True - if not len(zone): return True # Redundant + if not zone: + return True + if not len(zone): + return True # Redundant if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists(): (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) - sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') + SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') return True else: self.log( @@ -195,7 +203,7 @@ class Installer(): raise ServiceException(f"Unable to start service {service}: {output}") def run_command(self, cmd, *args, **kwargs): - return sys_command(f'/usr/bin/arch-chroot {self.target} {cmd}') + return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}') def arch_chroot(self, cmd, *args, **kwargs): if 'runas' in kwargs: @@ -224,10 +232,10 @@ class Installer(): with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf: netconf.write(str(conf)) - def copy_ISO_network_config(self, enable_services=False): + def copy_iso_network_config(self, enable_services=False): # Copy (if any) iwd password and config files if os.path.isdir('/var/lib/iwd/'): - if (psk_files := glob.glob('/var/lib/iwd/*.psk')): + if psk_files := glob.glob('/var/lib/iwd/*.psk'): if not os.path.isdir(f"{self.target}/var/lib/iwd"): os.makedirs(f"{self.target}/var/lib/iwd") @@ -253,7 +261,7 @@ class Installer(): shutil.copy2(psk, f"{self.target}/var/lib/iwd/{os.path.basename(psk)}") # Copy (if any) systemd-networkd config files - if (netconfigurations := glob.glob('/etc/systemd/network/*')): + if netconfigurations := glob.glob('/etc/systemd/network/*'): if not os.path.isdir(f"{self.target}/etc/systemd/network/"): os.makedirs(f"{self.target}/etc/systemd/network/") @@ -263,6 +271,7 @@ class Installer(): if enable_services: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: + def post_install_enable_networkd_resolved(*args, **kwargs): self.enable_service('systemd-networkd', 'systemd-resolved') @@ -288,13 +297,13 @@ class Installer(): mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") - sys_command(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') + SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') def minimal_installation(self): - ## Add necessary packages if encrypting the drive - ## (encrypted partitions default to btrfs for now, so we need btrfs-progs) - ## TODO: Perhaps this should be living in the function which dictates - ## the partitioning. Leaving here for now. + # Add necessary packages if encrypting the drive + # (encrypted partitions default to btrfs for now, so we need btrfs-progs) + # TODO: Perhaps this should be living in the function which dictates + # the partitioning. Leaving here for now. for partition in self.partitions: if partition.filesystem == 'btrfs': @@ -316,11 +325,11 @@ class Installer(): if 'encrypt' not in self.HOOKS: self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') - if not hasUEFI(): + if not has_uefi(): self.base_packages.append('grub') - if not isVM(): - vendor = cpuVendor() + if not is_vm(): + vendor = cpu_vendor() if vendor == "AuthenticAMD": self.base_packages.append("amd-ucode") elif vendor == "GenuineIntel": @@ -332,11 +341,9 @@ class Installer(): self.helper_flags['base-strapped'] = True with open(f"{self.target}/etc/fstab", "a") as fstab: - fstab.write( - "\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n" - ) # Redundant \n at the start? who knows? + fstab.write("\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n") # Redundant \n at the start? who knows? - ## TODO: Support locale and timezone + # TODO: Support locale and timezone # os.remove(f'{self.target}/etc/localtime') # sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime') # sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime') @@ -344,7 +351,7 @@ class Installer(): self.set_locale('en_US') # TODO: Use python functions for this - sys_command(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') + SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') self.mkinitcpio('-P') @@ -371,16 +378,16 @@ class Installer(): if bootloader == 'systemd-bootctl': self.pacstrap('efibootmgr') - if not hasUEFI(): + if not has_uefi(): raise HardwareIncompatibilityError # TODO: Ideally we would want to check if another config # points towards the same disk and/or partition. # And in which case we should do some clean up. # Install the boot loader - if sys_command(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0: + if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0: # Fallback, try creating the boot loader without touching the EFI variables - sys_command(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install') + SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install') # Modify or create a loader.conf if os.path.isfile(f'{self.target}/boot/loader/loader.conf'): @@ -402,8 +409,8 @@ class Installer(): else: loader.write(f"{line}\n") - ## For some reason, blkid and /dev/disk/by-uuid are not getting along well. - ## And blkid is wrong in terms of LUKS. + # For some reason, blkid and /dev/disk/by-uuid are not getting along well. + # And blkid is wrong in terms of LUKS. # UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip() # Setup the loader entry with open(f'{self.target}/boot/loader/entries/{self.init_time}.conf', 'w') as entry: @@ -411,8 +418,8 @@ class Installer(): entry.write(f'# Created on: {self.init_time}\n') entry.write('title Arch Linux\n') entry.write('linux /vmlinuz-linux\n') - if not isVM(): - vendor = cpuVendor() + if not is_vm(): + vendor = cpu_vendor() if vendor == "AuthenticAMD": entry.write("initrd /amd-ucode.img\n") elif vendor == "GenuineIntel": @@ -420,10 +427,10 @@ class Installer(): else: self.log("unknow cpu vendor, not adding ucode to systemd-boot config") entry.write('initrd /initramfs-linux.img\n') - ## blkid doesn't trigger on loopback devices really well, - ## so we'll use the old manual method until we get that sorted out. + # blkid doesn't trigger on loopback devices really well, + # so we'll use the old manual method until we get that sorted out. - if (real_device := self.detect_encryption(root_partition)): + if real_device := self.detect_encryption(root_partition): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) @@ -439,17 +446,17 @@ class Installer(): elif bootloader == "grub-install": self.pacstrap('grub') - if hasUEFI(): + if has_uefi(): self.pacstrap('efibootmgr') - o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB')) - sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg') + o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB')) + SysCommand('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg') return True else: root_device = subprocess.check_output(f'basename "$(readlink -f /sys/class/block/{root_partition.path.replace("/dev/", "")}/..)"', shell=True).decode().strip() if root_device == "block": root_device = f"{root_partition.path}" - o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc /dev/{root_device}')) - sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg') + o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc /dev/{root_device}')) + SysCommand('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg') self.helper_flags['bootloader'] = bootloader return True else: @@ -473,15 +480,17 @@ class Installer(): sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') return True - def user_create(self, user: str, password=None, groups=[], sudo=False): + def user_create(self, user: str, password=None, groups=None, sudo=False): + if groups is None: + groups = [] self.log(f'Creating user {user}', level=logging.INFO) - o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')) + o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')) if password: self.user_set_pw(user, password) if groups: for group in groups: - o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}')) + o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}')) if sudo and self.enable_sudo(user): self.helper_flags['user'] = True @@ -493,13 +502,13 @@ class Installer(): # This means the root account isn't locked/disabled with * in /etc/passwd self.helper_flags['user'] = True - o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")) + o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")) pass def user_set_shell(self, user, shell): self.log(f'Setting shell for {user} to {shell}', level=logging.INFO) - o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")) + o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")) pass def set_keyboard_language(self, language): diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py index addc8da1..2db429fd 100644 --- a/archinstall/lib/locale_helpers.py +++ b/archinstall/lib/locale_helpers.py @@ -27,9 +27,9 @@ def verify_keyboard_layout(layout): return False -def search_keyboard_layout(filter): +def search_keyboard_layout(layout_filter): for language in list_keyboard_languages(): - if filter.lower() in language.lower(): + if layout_filter.lower() in language.lower(): yield language diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index e6e1c897..6fab9b94 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -5,7 +5,7 @@ from .general import * from .output import log -class luks2(): +class luks2: def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs): self.password = password self.partition = partition @@ -78,7 +78,7 @@ class luks2(): try: # Try to setup the crypt-device - cmd_handle = sys_command(cryptsetup_args) + cmd_handle = SysCommand(cryptsetup_args) except SysCallError as err: if err.exit_code == 256: log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG) @@ -87,7 +87,7 @@ class luks2(): # Get crypt-information about the device by doing a reverse lookup starting with the partition path # For instance: /dev/sda - devinfo = json.loads(b''.join(sys_command(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0] + devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0] # For each child (sub-partition/sub-device) if len(children := devinfo.get('children', [])): @@ -95,14 +95,14 @@ class luks2(): # Unmount the child location if child_mountpoint := child.get('mountpoint', None): log(f'Unmounting {child_mountpoint}', level=logging.DEBUG) - sys_command(f"umount -R {child_mountpoint}") + SysCommand(f"umount -R {child_mountpoint}") # And close it if possible. log(f"Closing crypt device {child['name']}", level=logging.DEBUG) - sys_command(f"cryptsetup close {child['name']}") + SysCommand(f"cryptsetup close {child['name']}") # Then try again to set up the crypt-device - cmd_handle = sys_command(cryptsetup_args) + cmd_handle = SysCommand(cryptsetup_args) else: raise err @@ -120,6 +120,7 @@ class luks2(): :type mountpoint: str """ from .disk import get_filesystem_type + if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead? @@ -127,7 +128,7 @@ class luks2(): while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10: time.sleep(0.025) - sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') + SysCommand(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') if os.path.islink(f'/dev/mapper/{mountpoint}'): self.mapdev = f'/dev/mapper/{mountpoint}' unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False) @@ -138,9 +139,9 @@ class luks2(): if not mountpoint: mountpoint = self.mapdev - sys_command(f'/usr/bin/cryptsetup close {self.mapdev}') + SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}') return os.path.islink(self.mapdev) is False def format(self, path): - if (handle := sys_command(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: + if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0: raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}') diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index e2630710..3215122f 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -15,9 +15,9 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', tm region_list = [] for region in regions.split(','): region_list.append(f'country={region}') - o = b''.join(sys_command((f"/usr/bin/wget 'https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O {tmp_dir}/mirrorlist"))) - o = b''.join(sys_command((f"/usr/bin/sed -i 's/#Server/Server/' {tmp_dir}/mirrorlist"))) - o = b''.join(sys_command((f"/usr/bin/mv {tmp_dir}/mirrorlist {destination}"))) + o = b''.join(SysCommand(f"/usr/bin/wget 'https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O {tmp_dir}/mirrorlist")) + o = b''.join(SysCommand(f"/usr/bin/sed -i 's/#Server/Server/' {tmp_dir}/mirrorlist")) + o = b''.join(SysCommand(f"/usr/bin/mv {tmp_dir}/mirrorlist {destination}")) return True @@ -71,7 +71,7 @@ def use_mirrors(regions: dict, destination='/etc/pacman.d/mirrorlist'): def re_rank_mirrors(top=10, *positionals, **kwargs): - if sys_command((f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0: + if SysCommand(f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist').exit_code == 0: return True return False diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index 3e5ed4e7..b0c4b569 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -5,7 +5,7 @@ import struct from collections import OrderedDict from .exceptions import * -from .general import sys_command +from .general import SysCommand from .storage import storage @@ -53,11 +53,11 @@ def wireless_scan(interface): if interfaces[interface] != 'WIRELESS': raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}") - sys_command(f"iwctl station {interface} scan") + SysCommand(f"iwctl station {interface} scan") - if not '_WIFI' in storage: + if '_WIFI' not in storage: storage['_WIFI'] = {} - if not interface in storage['_WIFI']: + if interface not in storage['_WIFI']: storage['_WIFI'][interface] = {} storage['_WIFI'][interface]['scanning'] = True @@ -66,10 +66,11 @@ def wireless_scan(interface): # TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25 def get_wireless_networks(interface): # TODO: Make this oneliner pritter to check if the interface is scanning or not. - if not '_WIFI' in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False: + if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False: import time + wireless_scan(interface) time.sleep(5) - for line in sys_command(f"iwctl station {interface} get-networks"): + for line in SysCommand(f"iwctl station {interface} get-networks"): print(line) diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py index cce9e88c..8bc6cacb 100644 --- a/archinstall/lib/output.py +++ b/archinstall/lib/output.py @@ -18,7 +18,7 @@ class LogLevels: Debug = 0b111 -class journald(dict): +class Journald(dict): @abc.abstractmethod def log(message, level=logging.DEBUG): try: @@ -83,11 +83,11 @@ def stylize_output(text: str, *opts, **kwargs): color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white') foreground = {color_names[x]: '3%s' % x for x in range(8)} background = {color_names[x]: '4%s' % x for x in range(8)} - RESET = '0' + reset = '0' code_list = [] if text == '' and len(opts) == 1 and opts[0] == 'reset': - return '\x1b[%sm' % RESET + return '\x1b[%sm' % reset for k, v in kwargs.items(): if k == 'fg': code_list.append(foreground[v]) @@ -97,7 +97,7 @@ def stylize_output(text: str, *opts, **kwargs): if o in opt_dict: code_list.append(opt_dict[o]) if 'noreset' not in opts: - text = '%s\x1b[%sm' % (text or '', RESET) + text = '%s\x1b[%sm' % (text or '', reset) return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '') @@ -112,7 +112,7 @@ def log(*args, **kwargs): # If a logfile is defined in storage, # we use that one to output everything - if (filename := storage.get('LOG_FILE', None)): + if filename := storage.get('LOG_FILE', None): absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) try: @@ -155,13 +155,13 @@ def log(*args, **kwargs): log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True) kwargs['level'] = logging.DEBUG - if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and not 'force' in kwargs: + if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and 'force' not in kwargs: # Level on log message was Debug, but output level is set to Info. # In that case, we'll drop it. return None try: - journald.log(string, level=kwargs.get('level', logging.INFO)) + Journald.log(string, level=kwargs.get('level', logging.INFO)) except ModuleNotFoundError: pass # Ignore writing to journald diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py index 2f97231c..871e6223 100644 --- a/archinstall/lib/profiles.py +++ b/archinstall/lib/profiles.py @@ -14,7 +14,7 @@ from .storage import storage def grab_url_data(path): - safe_path = path[:path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))]) + safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))]) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE @@ -75,7 +75,7 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof if filter_top_level_profiles: for profile in list(cache.keys()): if Profile(None, profile).is_top_level_profile() is False: - del (cache[profile]) + del cache[profile] return cache @@ -154,7 +154,7 @@ class Script: return self def execute(self): - if not self.namespace in sys.modules or self.spec is None: + if self.namespace not in sys.modules or self.spec is None: self.load_instructions() self.spec.loader.exec_module(sys.modules[self.namespace]) @@ -163,8 +163,10 @@ class Script: class Profile(Script): - def __init__(self, installer, path, args={}): + def __init__(self, installer, path, args=None): super(Profile, self).__init__(path, installer) + if args is None: + args = {} def __dump__(self, *args, **kwargs): return {'path': self.path} diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py index 537a2f84..6f8f2a87 100644 --- a/archinstall/lib/services.py +++ b/archinstall/lib/services.py @@ -5,6 +5,6 @@ def service_state(service_name: str): if os.path.splitext(service_name)[1] != '.service': service_name += '.service' # Just to be safe - state = b''.join(sys_command(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'})) + state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'})) return state.strip().decode('UTF-8') diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py index 53d5e938..42214572 100644 --- a/archinstall/lib/storage.py +++ b/archinstall/lib/storage.py @@ -18,5 +18,5 @@ storage = { 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing. 'LOG_PATH': '/var/log/archinstall', 'LOG_FILE': 'install.log', - 'MOUNT_POINT': '/mnt' + 'MOUNT_POINT': '/mnt', } diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index 69689479..c640cb83 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -12,8 +12,8 @@ import time import tty from .exceptions import * -from .general import sys_command -from .hardware import AVAILABLE_GFX_DRIVERS, hasUEFI +from .general import SysCommand +from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout from .networking import list_interfaces from .output import log @@ -23,6 +23,7 @@ from .profiles import Profile # TODO: Some inconsistencies between the selection processes. # Some return the keys from the options, some the values? + def get_terminal_height(): return shutil.get_terminal_size().lines @@ -84,7 +85,7 @@ def do_countdown(): def get_password(prompt="Enter a password: "): - while (passwd := getpass.getpass(prompt)): + while passwd := getpass.getpass(prompt): passwd_verification = getpass.getpass(prompt='And one more time for verification: ') if passwd != passwd_verification: log(' * Passwords did not match * ', fg='red') @@ -104,7 +105,7 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '): max_num_of_columns = get_terminal_width() // longest_line max_options_in_cells = max_num_of_columns * (get_terminal_height() - margin_bottom) - if (len(options) > max_options_in_cells): + if len(options) > max_options_in_cells: for index, option in enumerate(options): print(f"{index}: {option}") return 1, index @@ -214,8 +215,10 @@ class MiniCurses: self._cursor_x += len(text) def clear(self, x, y): - if x < 0: x = 0 - if y < 0: y = 0 + if x < 0: + x = 0 + if y < 0: + y = 0 # import time # sys.stdout.write(f"Clearing from: {x, y}") @@ -243,7 +246,7 @@ class MiniCurses: return True # Move back to the current known position (BACKSPACE doesn't updated x-pos) sys.stdout.flush() - sys.stdout.write("\033[%dG" % (self._cursor_x)) + sys.stdout.write("\033[%dG" % self._cursor_x) sys.stdout.flush() # Write a blank space @@ -253,7 +256,7 @@ class MiniCurses: # And move back again sys.stdout.flush() - sys.stdout.write("\033[%dG" % (self._cursor_x)) + sys.stdout.write("\033[%dG" % self._cursor_x) sys.stdout.flush() self._cursor_x -= 1 @@ -360,7 +363,7 @@ def ask_for_a_timezone(): def ask_for_bootloader() -> str: bootloader = "systemd-bootctl" - if hasUEFI() == False: + if not has_uefi(): bootloader = "grub-install" else: bootloader_choice = input("Would you like to use GRUB as a bootloader instead of systemd-boot? [y/N] ").lower() @@ -401,8 +404,7 @@ def ask_to_configure_network(): for index, mode in enumerate(modes): print(f"{index}: {mode}") - mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ", - options_output=False) + mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ", options_output=False) if mode == 'IP': while 1: ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip() @@ -450,11 +452,10 @@ def ask_for_disk_layout(): options = { 'keep-existing': 'Keep existing partition layout and select which ones to use where', 'format-all': 'Format entire drive and setup a basic partition scheme', - 'abort': 'Abort the installation' + 'abort': 'Abort the installation', } - value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ", - allow_empty_input=False, sort=True) + value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ", allow_empty_input=False, sort=True) return next((key for key, val in options.items() if val == value), None) @@ -466,8 +467,7 @@ def ask_for_main_filesystem_format(): 'f2fs': 'f2fs' } - value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ", - allow_empty_input=False) + value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ", allow_empty_input=False) return next((key for key, val in options.items() if val == value), None) @@ -584,8 +584,7 @@ def select_profile(options): print(' -- They might make it easier to install things like desktop environments. --') print(' -- (Leave blank and hit enter to skip this step and continue) --') - selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ', - options_output=False) + selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ', options_output=False) if selected_profile: return Profile(None, selected_profile) else: @@ -675,9 +674,7 @@ def select_mirror_regions(mirrors, show_top_mirrors=True): print_large_list(regions, margin_bottom=4) print(' -- You can skip this step by leaving the option blank --') - selected_mirror = generic_select(regions, - 'Select one of the above regions to download packages from (by number or full name): ', - options_output=False) + selected_mirror = generic_select(regions, 'Select one of the above regions to download packages from (by number or full name): ', options_output=False) if not selected_mirror: # Returning back empty options which can be both used to # do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining @@ -708,7 +705,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS): default_option = options["All open-source (default)"] if drivers: - lspci = sys_command('/usr/bin/lspci') + lspci = SysCommand('/usr/bin/lspci') for line in lspci.trace_log.split(b'\r\n'): if b' vga ' in line.lower(): if b'nvidia' in line.lower(): |