Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/installer.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/installer.py')
-rw-r--r--archinstall/lib/installer.py286
1 files changed, 171 insertions, 115 deletions
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index c02d5717..6e4a6193 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,5 +1,4 @@
import time
-from typing import Union
import logging
import os
import shutil
@@ -7,6 +6,8 @@ import shlex
import pathlib
import subprocess
import glob
+from types import ModuleType
+from typing import Union, Dict, Any, List, Optional, Iterator, Mapping
from .disk import get_partitions_in_use, Partition
from .general import SysCommand, generate_password
from .hardware import has_uefi, is_vm, cpu_vendor
@@ -30,29 +31,29 @@ __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
class InstallationFile:
- def __init__(self, installation, filename, owner, mode="w"):
+ def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
self.installation = installation
self.filename = filename
self.owner = owner
self.mode = mode
self.fh = None
- def __enter__(self):
+ def __enter__(self) -> 'InstallationFile':
self.fh = open(self.filename, self.mode)
return self
- def __exit__(self, *args):
+ def __exit__(self, *args :str) -> None:
self.fh.close()
self.installation.chown(self.owner, self.filename)
- def write(self, data: Union[str, bytes]):
+ def write(self, data: Union[str, bytes]) -> int:
return self.fh.write(data)
- def read(self, *args):
+ def read(self, *args) -> Union[str, bytes]:
return self.fh.read(*args)
- def poll(self, *args):
- return self.fh.poll(*args)
+# def poll(self, *args) -> bool:
+# return self.fh.poll(*args)
def accessibility_tools_in_use() -> bool:
@@ -84,11 +85,12 @@ class Installer:
"""
- def __init__(self, target, *, base_packages=None, kernels=None):
+ def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None):
if base_packages is None:
base_packages = __packages__[:3]
if kernels is None:
kernels = ['linux']
+
self.kernels = kernels
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
@@ -119,18 +121,17 @@ class Installer:
self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
self.KERNEL_PARAMS = []
- def log(self, *args, level=logging.DEBUG, **kwargs):
+ def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
"""
installer.log() wraps output.log() mainly to set a default log-level for this install session.
Any manual override can be done per log() call.
"""
log(*args, level=level, **kwargs)
- def __enter__(self, *args, **kwargs):
+ def __enter__(self, *args :str, **kwargs :str) -> 'Installer':
return self
- def __exit__(self, *args, **kwargs):
- # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
+ def __exit__(self, *args :str, **kwargs :str) -> None:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
@@ -163,10 +164,10 @@ class Installer:
return False
@property
- def partitions(self):
+ def partitions(self) -> List[Partition]:
return get_partitions_in_use(self.target)
- def sync_log_to_install_medium(self):
+ def sync_log_to_install_medium(self) -> bool:
# Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
if self.helper_flags.get('base-strapped', False) is True:
@@ -180,90 +181,111 @@ class Installer:
return True
- def mount_ordered_layout(self, layouts: dict):
- from .luks import luks2
-
- mountpoints = {}
- for blockdevice in layouts:
- for partition in layouts[blockdevice]['partitions']:
- if (subvolumes := partition.get('btrfs', {}).get('subvolumes', {})):
- if partition.get('encrypted',False):
- if partition.get('mountpoint',None):
- ppath = partition['mountpoint']
- else:
- ppath = partition['device_instance'].path
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop"
- # Immediately unlock the encrypted device to format the inner volume
- with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=False) as unlocked_device:
- unlocked_device.mount(f"{self.target}/")
- try:
- manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes,unlocked_device)
- except Exception as e:
- # every exception unmounts the physical volume. Otherwise we let the system in an unstable state
- unlocked_device.unmount()
- raise e
- unlocked_device.unmount()
- # TODO generate key
- else:
- self.mount(partition['device_instance'],"/")
- try:
- manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes)
- except Exception as e:
- # every exception unmounts the physical volume. Otherwise we let the system in an unstable state
- partition['device_instance'].unmount()
- raise e
- partition['device_instance'].unmount()
- else:
- mountpoints[partition['mountpoint']] = partition
- for mountpoint in sorted([mnt_dest for mnt_dest in mountpoints.keys() if mnt_dest is not None]):
- partition = mountpoints[mountpoint]
- if partition.get('encrypted', False) and not partition.get('subvolume',None):
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
- if not (password := partition.get('!password', None)):
- raise RequirementError(f"Missing mountpoint {mountpoint} encryption password in layout: {partition}")
-
- with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device:
- if partition.get('generate-encryption-key-file'):
- if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
- cryptkey_dir.mkdir(parents=True)
-
- # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key
- # if we name the device to "xyzloop".
- encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key"
- with open(f"{self.target}{encryption_key_path}", "w") as keyfile:
- keyfile.write(generate_password(length=512))
+ def _create_keyfile(self,luks_handle , partition :dict, password :str):
+ """ roiutine to create keyfiles, so it can be moved elsewere
+ """
+ if partition.get('generate-encryption-key-file'):
+ if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
+ cryptkey_dir.mkdir(parents=True)
+ # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key
+ # if we name the device to "xyzloop".
+ if partition.get('mountpoint',None):
+ encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key"
+ else:
+ encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key"
+ with open(f"{self.target}{encryption_key_path}", "w") as keyfile:
+ keyfile.write(generate_password(length=512))
- os.chmod(f"{self.target}{encryption_key_path}", 0o400)
+ os.chmod(f"{self.target}{encryption_key_path}", 0o400)
- luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password)
- luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"])
+ luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password)
+ luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"])
- log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {unlocked_device}", level=logging.INFO)
- unlocked_device.mount(f"{self.target}{mountpoint}")
+ def _has_root(self, partition :dict) -> bool:
+ """
+ Determine if an encrypted partition contains root in it
+ """
+ if partition.get("mountpoint") is None:
+ if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})):
+ for mountpoint in [sub_list[subvolume] if isinstance(sub_list[subvolume],str) else sub_list[subvolume].get("mountpoint") for subvolume in sub_list]:
+ if mountpoint == '/':
+ return True
+ return False
+ else:
+ return False
+ elif partition.get("mountpoint") == '/':
+ return True
+ else:
+ return False
+ def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
+ from .luks import luks2
+ # set the partitions as a list not part of a tree (which we don't need anymore (i think)
+ list_part = []
+ list_luks_handles = []
+ for blockdevice in layouts:
+ list_part.extend(layouts[blockdevice]['partitions'])
+
+ # we manage the encrypted partititons
+ for partition in [entry for entry in list_part if entry.get('encrypted',False)]:
+ # open the luks device and all associate stuff
+ if not (password := partition.get('!password', None)):
+ raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}")
+ # i change a bit the naming conventions for the loop device
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
else:
- log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
- if partition.get('options',[]):
- mount_options = ','.join(partition['options'])
- partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options)
- else:
- partition['device_instance'].mount(f"{self.target}{mountpoint}")
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
+ # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
+ with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device:
+ if partition.get('generate-encryption-key-file',False) and not self._has_root(partition):
+ list_luks_handles.append([luks_handle,partition,password])
+ # this way all the requesrs will be to the dm_crypt device and not to the physical partition
+ partition['device_instance'] = unlocked_device
+
+ # we manage the btrfs partitions
+ for partition in [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]:
+ self.mount(partition['device_instance'],"/")
+ try:
+ new_mountpoints = manage_btrfs_subvolumes(self,partition)
+ except Exception as e:
+ # every exception unmounts the physical volume. Otherwise we let the system in an unstable state
+ partition['device_instance'].unmount()
+ raise e
+ partition['device_instance'].unmount()
+ if new_mountpoints:
+ list_part.extend(new_mountpoints)
+
+ # we mount. We need to sort by mountpoint to get a good working order
+ for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']):
+ mountpoint = partition['mountpoint']
+ log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
+ if partition.get('filesystem',{}).get('mount_options',[]):
+ mount_options = ','.join(partition['filesystem']['mount_options'])
+ partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options)
+ else:
+ partition['device_instance'].mount(f"{self.target}{mountpoint}")
time.sleep(1)
try:
get_mount_info(f"{self.target}{mountpoint}", traverse=False)
except DiskError:
raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
- def mount(self, partition, mountpoint, create_mountpoint=True):
+ # once everything is mounted, we generate the key files in the correct place
+ for handle in list_luks_handles:
+ ppath = handle[1]['device_instance'].path
+ log(f"creating key-file for {ppath}",level=logging.INFO)
+ self._create_keyfile(handle[0],handle[1],handle[2])
+
+ def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None:
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
partition.mount(f'{self.target}{mountpoint}')
- def post_install_check(self, *args, **kwargs):
+ def post_install_check(self, *args :str, **kwargs :str) -> List[bool]:
return [step for step, flag in self.helper_flags.items() if flag is False]
- def pacstrap(self, *packages, **kwargs):
+ def pacstrap(self, *packages :str, **kwargs :str) -> bool:
if type(packages[0]) in (list, tuple):
packages = packages[0]
@@ -284,7 +306,7 @@ class Installer:
else:
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO)
- def set_mirrors(self, mirrors):
+ def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None:
for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'):
if result := plugin.on_mirrors(mirrors):
@@ -292,7 +314,7 @@ class Installer:
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
- def genfstab(self, flags='-pU'):
+ def genfstab(self, flags :str = '-pU') -> bool:
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
@@ -307,11 +329,11 @@ class Installer:
return True
- def set_hostname(self, hostname: str, *args, **kwargs):
+ def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
- def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
+ def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool:
if not len(locale):
return True
@@ -322,7 +344,7 @@ class Installer:
return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
- def set_timezone(self, zone, *args, **kwargs):
+ def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool:
if not zone:
return True
if not len(zone):
@@ -337,6 +359,7 @@ class Installer:
(pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
+
else:
self.log(
f"Time zone {zone} does not exist, continuing with system default.",
@@ -344,11 +367,13 @@ class Installer:
fg='red'
)
- def activate_ntp(self):
+ return False
+
+ def activate_ntp(self) -> None:
log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO)
self.activate_time_syncronization()
- def activate_time_syncronization(self):
+ def activate_time_syncronization(self) -> None:
self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO)
self.enable_service('systemd-timesyncd')
@@ -361,11 +386,11 @@ class Installer:
with Boot(self) as session:
session.SysCommand(["timedatectl", "set-ntp", 'true'])
- def enable_espeakup(self):
+ def enable_espeakup(self) -> None:
self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO)
self.enable_service('espeakup')
- def enable_service(self, *services):
+ def enable_service(self, *services :str) -> None:
for service in services:
self.log(f'Enabling service {service}', level=logging.INFO)
if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
@@ -375,19 +400,27 @@ class Installer:
if hasattr(plugin, 'on_service'):
plugin.on_service(service)
- def run_command(self, cmd, *args, **kwargs):
+ def run_command(self, cmd :str, *args :str, **kwargs :str) -> None:
return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
- def arch_chroot(self, cmd, run_as=None):
+ def arch_chroot(self, cmd :str, run_as :Optional[str] = None):
if run_as:
cmd = f"su - {run_as} -c {shlex.quote(cmd)}"
return self.run_command(cmd)
- def drop_to_shell(self):
+ def drop_to_shell(self) -> None:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
- def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
+ def configure_nic(self,
+ nic :str,
+ dhcp :bool = True,
+ ip :Optional[str] = None,
+ gateway :Optional[str] = None,
+ dns :Optional[str] = None,
+ *args :str,
+ **kwargs :str
+ ) -> None:
from .systemd import Networkd
if dhcp:
@@ -412,7 +445,7 @@ class Installer:
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
- def copy_iso_network_config(self, enable_services=False):
+ def copy_iso_network_config(self, enable_services :bool = False) -> bool:
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
if psk_files := glob.glob('/var/lib/iwd/*.psk'):
@@ -427,7 +460,7 @@ class Installer:
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
# base is not installed yet.
- def post_install_enable_iwd_service(*args, **kwargs):
+ def post_install_enable_iwd_service(*args :str, **kwargs :str):
self.enable_service('iwd')
self.post_base_install.append(post_install_enable_iwd_service)
@@ -452,7 +485,7 @@ class Installer:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
- def post_install_enable_networkd_resolved(*args, **kwargs):
+ def post_install_enable_networkd_resolved(*args :str, **kwargs :str):
self.enable_service('systemd-networkd', 'systemd-resolved')
self.post_base_install.append(post_install_enable_networkd_resolved)
@@ -462,7 +495,7 @@ class Installer:
return True
- def detect_encryption(self, partition):
+ def detect_encryption(self, partition :Partition) -> bool:
part = Partition(partition.parent, None, autodetect_filesystem=True)
if partition.encrypted:
return partition
@@ -471,7 +504,7 @@ class Installer:
return False
- def mkinitcpio(self, *flags):
+ def mkinitcpio(self, *flags :str) -> bool:
for plugin in plugins.values():
if hasattr(plugin, 'on_mkinitcpio'):
# Allow plugins to override the usage of mkinitcpio altogether.
@@ -483,9 +516,10 @@ class Installer:
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
- SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
- def minimal_installation(self):
+ return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0
+
+ def minimal_installation(self) -> bool:
# Add necessary packages if encrypting the drive
# (encrypted partitions default to btrfs for now, so we need btrfs-progs)
# TODO: Perhaps this should be living in the function which dictates
@@ -562,7 +596,7 @@ class Installer:
return True
- def setup_swap(self, kind='zram'):
+ def setup_swap(self, kind :str = 'zram') -> bool:
if kind == 'zram':
self.log(f"Setting up swap on zram")
self.pacstrap('zram-generator')
@@ -578,7 +612,18 @@ class Installer:
else:
raise ValueError(f"Archinstall currently only supports setting up swap on zram")
- def add_bootloader(self, bootloader='systemd-bootctl'):
+ def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool:
+ """
+ Adds a bootloader to the installation instance.
+ Archinstall supports one of three types:
+ * systemd-bootctl
+ * grub
+ * efistub (beta)
+
+ :param bootloader: Can be one of the three strings
+ 'systemd-bootctl', 'grub' or 'efistub' (beta)
+ """
+
for plugin in plugins.values():
if hasattr(plugin, 'on_add_bootloader'):
# Allow plugins to override the boot-loader handling.
@@ -669,6 +714,7 @@ class Installer:
base_path,bind_path = split_bind_name(str(root_partition.path))
if bind_path is not None: # and root_fs_type == 'btrfs':
options_entry = f"rootflags=subvol={bind_path} " + options_entry
+
if real_device := self.detect_encryption(root_partition):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
@@ -757,10 +803,19 @@ class Installer:
return True
- def add_additional_packages(self, *packages):
+ def add_additional_packages(self, *packages :str) -> bool:
return self.pacstrap(*packages)
- def install_profile(self, profile):
+ def install_profile(self, profile :str) -> ModuleType:
+ """
+ Installs a archinstall profile script (.py file).
+ This profile can be either local, remote or part of the library.
+
+ :param profile: Can be a local path or a remote path (URL)
+ :return: Returns the imported script as a module, this way
+ you can access any remaining functions exposed by the profile.
+ :rtype: module
+ """
storage['installation_session'] = self
if type(profile) == str:
@@ -769,13 +824,13 @@ class Installer:
self.log(f'Installing network profile {profile}', level=logging.INFO)
return profile.install()
- def enable_sudo(self, entity: str, group=False):
+ def enable_sudo(self, entity: str, group :bool = False) -> bool:
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
- def user_create(self, user: str, password=None, groups=None, sudo=False):
+ def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None:
if groups is None:
groups = []
@@ -789,7 +844,8 @@ class Installer:
if not handled_by_plugin:
self.log(f'Creating user {user}', level=logging.INFO)
- SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')
+ if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0:
+ raise SystemError(f"Could not create user inside installation: {output}")
for plugin in plugins.values():
if hasattr(plugin, 'on_user_created'):
@@ -806,24 +862,24 @@ class Installer:
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
- def user_set_pw(self, user, password):
+ def user_set_pw(self, user :str, password :str) -> bool:
self.log(f'Setting password for {user}', level=logging.INFO)
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True
- SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"").exit_code == 0
- def user_set_shell(self, user, shell):
+ def user_set_shell(self, user :str, shell :str) -> bool:
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
- SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0
- def chown(self, owner, path, options=[]):
- return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}")
+ def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0
- def create_file(self, filename, owner=None):
+ def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
return InstallationFile(self, filename, owner)
def set_keyboard_language(self, language: str) -> bool: