Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk.py28
-rw-r--r--archinstall/lib/general.py6
-rw-r--r--archinstall/lib/installer.py19
-rw-r--r--archinstall/lib/luks.py3
-rw-r--r--archinstall/lib/networking.py1
-rw-r--r--archinstall/lib/profiles.py4
-rw-r--r--archinstall/lib/storage.py2
-rw-r--r--archinstall/lib/user_interaction.py27
8 files changed, 48 insertions, 42 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index 65d1599b..85e8a402 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -17,7 +17,8 @@ MBR = 0b00000010
# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
-class BlockDevice():
+
+class BlockDevice:
def __init__(self, path, info=None):
if not info:
# If we don't give any information, we need to auto-fill it.
@@ -76,7 +77,8 @@ class BlockDevice():
if self.info['type'] == 'loop':
for drive in json.loads(b''.join(sys_command(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == self.path: continue
+ if not drive['name'] == self.path:
+ continue
return drive['back-file']
elif self.info['type'] == 'disk':
@@ -91,8 +93,8 @@ class BlockDevice():
else:
log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
- # if not stat.S_ISBLK(os.stat(full_path).st_mode):
- # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
+ # if not stat.S_ISBLK(os.stat(full_path).st_mode):
+ # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property
def partitions(self):
@@ -153,7 +155,7 @@ class BlockDevice():
self.part_cache = OrderedDict()
-class Partition():
+class Partition:
def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
if not part_id:
part_id = os.path.basename(path)
@@ -236,7 +238,7 @@ class Partition():
for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
return f"/dev/{parent}"
- # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
+ # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path
def detect_inner_filesystem(self, password):
@@ -351,9 +353,9 @@ class Partition():
self.filesystem = 'f2fs'
elif filesystem == 'crypto_LUKS':
- # from .luks import luks2
- # encrypted_partition = luks2(self, None, None)
- # encrypted_partition.format(path)
+ # from .luks import luks2
+ # encrypted_partition = luks2(self, None, None)
+ # encrypted_partition.format(path)
self.filesystem = 'crypto_LUKS'
else:
@@ -378,7 +380,8 @@ class Partition():
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
- if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
+ if not self.filesystem:
+ raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
pathlib.Path(target).mkdir(parents=True, exist_ok=True)
@@ -425,7 +428,7 @@ class Partition():
return True
-class Filesystem():
+class Filesystem:
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
@@ -566,7 +569,8 @@ def all_disks(*args, **kwargs):
drives = OrderedDict()
# for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
for drive in json.loads(b''.join(sys_command('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part': continue
+ if not kwargs['partitions'] and drive['type'] == 'part':
+ continue
drives[drive['path']] = BlockDevice(drive['path'], drive)
return drives
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index b65e2593..816fa755 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -170,7 +170,7 @@ class sys_command:
'ended': self.ended,
'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
- 'exit_code': self.exit_code
+ 'exit_code': self.exit_code,
}
def peak(self, output: Union[str, bytes]) -> bool:
@@ -256,7 +256,7 @@ class sys_command:
original = trigger
trigger = bytes(original, 'UTF-8')
self.kwargs['events'][trigger] = self.kwargs['events'][original]
- del (self.kwargs['events'][original])
+ del self.kwargs['events'][original]
if type(self.kwargs['events'][trigger]) != bytes:
self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
@@ -269,7 +269,7 @@ class sys_command:
last_trigger_pos = trigger_pos
os.write(child_fd, self.kwargs['events'][trigger])
- del (self.kwargs['events'][trigger])
+ del self.kwargs['events'][trigger]
broke = True
break
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index bac19007..139d9d33 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -9,7 +9,7 @@ from .user_interaction import *
__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
-class Installer():
+class Installer:
"""
`Installer()` is the wrapper for most basic installation steps.
It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
@@ -127,7 +127,8 @@ class Installer():
return [step for step, flag in self.helper_flags.items() if flag is False]
def pacstrap(self, *packages, **kwargs):
- if type(packages[0]) in (list, tuple): packages = packages[0]
+ if type(packages[0]) in (list, tuple):
+ packages = packages[0]
self.log(f'Installing packages: {packages}', level=logging.INFO)
if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0:
@@ -158,7 +159,8 @@ class Installer():
fh.write(hostname + '\n')
def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
- if not len(locale): return True
+ if not len(locale):
+ return True
with open(f'{self.target}/etc/locale.gen', 'a') as fh:
fh.write(f'{locale}.{encoding} {encoding}\n')
@@ -168,8 +170,10 @@ class Installer():
return True if sys_command(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
def set_timezone(self, zone, *args, **kwargs):
- if not zone: return True
- if not len(zone): return True # Redundant
+ if not zone:
+ return True
+ if not len(zone):
+ return True # Redundant
if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists():
(pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
@@ -263,6 +267,7 @@ class Installer():
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
+
def post_install_enable_networkd_resolved(*args, **kwargs):
self.enable_service('systemd-networkd', 'systemd-resolved')
@@ -332,9 +337,7 @@ class Installer():
self.helper_flags['base-strapped'] = True
with open(f"{self.target}/etc/fstab", "a") as fstab:
- fstab.write(
- "\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n"
- ) # Redundant \n at the start? who knows?
+ fstab.write("\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n") # Redundant \n at the start? who knows?
## TODO: Support locale and timezone
# os.remove(f'{self.target}/etc/localtime')
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index e6e1c897..1cb6777f 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -5,7 +5,7 @@ from .general import *
from .output import log
-class luks2():
+class luks2:
def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
self.password = password
self.partition = partition
@@ -120,6 +120,7 @@ class luks2():
:type mountpoint: str
"""
from .disk import get_filesystem_type
+
if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead?
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index 3e5ed4e7..e12d9cc3 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -68,6 +68,7 @@ def get_wireless_networks(interface):
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
if not '_WIFI' in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time
+
wireless_scan(interface)
time.sleep(5)
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 2f97231c..894b8fcb 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -14,7 +14,7 @@ from .storage import storage
def grab_url_data(path):
- safe_path = path[:path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))])
+ safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
@@ -75,7 +75,7 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
if filter_top_level_profiles:
for profile in list(cache.keys()):
if Profile(None, profile).is_top_level_profile() is False:
- del (cache[profile])
+ del cache[profile]
return cache
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index 53d5e938..42214572 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -18,5 +18,5 @@ storage = {
'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
'LOG_PATH': '/var/log/archinstall',
'LOG_FILE': 'install.log',
- 'MOUNT_POINT': '/mnt'
+ 'MOUNT_POINT': '/mnt',
}
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 69689479..f39eea1c 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -23,6 +23,7 @@ from .profiles import Profile
# TODO: Some inconsistencies between the selection processes.
# Some return the keys from the options, some the values?
+
def get_terminal_height():
return shutil.get_terminal_size().lines
@@ -104,7 +105,7 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
max_num_of_columns = get_terminal_width() // longest_line
max_options_in_cells = max_num_of_columns * (get_terminal_height() - margin_bottom)
- if (len(options) > max_options_in_cells):
+ if len(options) > max_options_in_cells:
for index, option in enumerate(options):
print(f"{index}: {option}")
return 1, index
@@ -214,8 +215,10 @@ class MiniCurses:
self._cursor_x += len(text)
def clear(self, x, y):
- if x < 0: x = 0
- if y < 0: y = 0
+ if x < 0:
+ x = 0
+ if y < 0:
+ y = 0
# import time
# sys.stdout.write(f"Clearing from: {x, y}")
@@ -401,8 +404,7 @@ def ask_to_configure_network():
for index, mode in enumerate(modes):
print(f"{index}: {mode}")
- mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ",
- options_output=False)
+ mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ", options_output=False)
if mode == 'IP':
while 1:
ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip()
@@ -450,11 +452,10 @@ def ask_for_disk_layout():
options = {
'keep-existing': 'Keep existing partition layout and select which ones to use where',
'format-all': 'Format entire drive and setup a basic partition scheme',
- 'abort': 'Abort the installation'
+ 'abort': 'Abort the installation',
}
- value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ",
- allow_empty_input=False, sort=True)
+ value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ", allow_empty_input=False, sort=True)
return next((key for key, val in options.items() if val == value), None)
@@ -466,8 +467,7 @@ def ask_for_main_filesystem_format():
'f2fs': 'f2fs'
}
- value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ",
- allow_empty_input=False)
+ value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ", allow_empty_input=False)
return next((key for key, val in options.items() if val == value), None)
@@ -584,8 +584,7 @@ def select_profile(options):
print(' -- They might make it easier to install things like desktop environments. --')
print(' -- (Leave blank and hit enter to skip this step and continue) --')
- selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ',
- options_output=False)
+ selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ', options_output=False)
if selected_profile:
return Profile(None, selected_profile)
else:
@@ -675,9 +674,7 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
print_large_list(regions, margin_bottom=4)
print(' -- You can skip this step by leaving the option blank --')
- selected_mirror = generic_select(regions,
- 'Select one of the above regions to download packages from (by number or full name): ',
- options_output=False)
+ selected_mirror = generic_select(regions, 'Select one of the above regions to download packages from (by number or full name): ', options_output=False)
if not selected_mirror:
# Returning back empty options which can be both used to
# do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining