#!/usr/bin/python3 import traceback import os, re, struct, sys, json, pty, shlex import urllib.request, urllib.parse, ssl, signal import time from glob import glob from select import epoll, EPOLLIN, EPOLLHUP from socket import socket, inet_ntoa, AF_INET, AF_INET6, AF_PACKET from collections import OrderedDict as oDict from subprocess import Popen, STDOUT, PIPE, check_output from random import choice from string import ascii_uppercase, ascii_lowercase, digits from hashlib import sha512 from threading import Thread, enumerate as tenum if os.path.isfile('./SAFETY_LOCK'): SAFETY_LOCK = True else: SAFETY_LOCK = False profiles_path = 'https://raw.githubusercontent.com/Torxed/archinstall/master/deployments' rootdir_pattern = re.compile('^.*?/devices') harddrives = oDict() commandlog = [] worker_history = oDict() instructions = oDict() args = {} import logging from systemd.journal import JournalHandler # Custom adapter to pre-pend the 'origin' key. # TODO: Should probably use filters: https://docs.python.org/3/howto/logging-cookbook.html#using-filters-to-impart-contextual-information class CustomAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return '[{}] {}'.format(self.extra['origin'], msg), kwargs logger = logging.getLogger() # __name__ journald_handler = JournalHandler() journald_handler.setFormatter(logging.Formatter('[{levelname}] {message}', style='{')) logger.addHandler(journald_handler) logger.setLevel(logging.DEBUG) class LOG_LEVELS: CRITICAL = 1 ERROR = 2 WARNING = 3 INFO = 4 DEBUG = 5 LOG_LEVEL = 4 def log(*msg, origin='UNKNOWN', level=5, **kwargs): if level <= LOG_LEVEL: msg = [item.decode('UTF-8', errors='backslashreplace') if type(item) == bytes else item for item in msg] msg = [str(item) if type(item) != str else item for item in msg] log_adapter = CustomAdapter(logger, {'origin': origin}) if level <= 1: log_adapter.critical(' '.join(msg)) elif level <= 2: log_adapter.error(' '.join(msg)) elif level <= 3: log_adapter.warning(' '.join(msg)) elif level <= 4: log_adapter.info(' '.join(msg)) else: log_adapter.debug(' '.join(msg)) ## == Profiles Path can be set via --profiles-path=/path ## This just sets the default path if the parameter is omitted. try: import psutil except: ## Time to monkey patch in all the stats and psutil fuctions if it isn't installed. class mem(): def __init__(self, free, percent=-1): self.free = free self.percent = percent class disk(): def __init__(self, size, free, percent): self.total = size self.used = 0 self.free = free self.percent = percent class iostat(): def __init__(self, interface, bytes_sent=0, bytes_recv=0): self.interface = interface self.bytes_recv = int(bytes_recv) self.bytes_sent = int(bytes_sent) def __repr__(self, *positionals, **kwargs): return f'iostat@{self.interface}[bytes_sent: {self.bytes_sent}, bytes_recv: {self.bytes_recv}]' class psutil(): def cpu_percent(interval=0): ## This just counts the ammount of time the CPU has spent. Find a better way! with cmd("grep 'cpu ' /proc/stat | awk '{usage=($2+$4)*100/($2+$4+$5)} END {print usage}'") as output: for line in output: return float(line.strip().decode('UTF-8')) def virtual_memory(): with cmd("grep 'MemFree: ' /proc/meminfo | awk '{free=($2)} END {print free}'") as output: for line in output: return mem(float(line.strip().decode('UTF-8'))) def disk_usage(partition): disk_stats = os.statvfs(partition) free_size = disk_stats.f_bfree * disk_stats.f_bsize disk_size = disk_stats.f_blocks * disk_stats.f_bsize percent = (100/disk_size)*free_size return disk(disk_size, free_size, percent) def net_if_addrs(): interfaces = {} for root, folders, files in os.walk('/sys/class/net/'): for name in folders: interfaces[name] = {} return interfaces def net_io_counters(pernic=True): data = {} for interface in psutil.net_if_addrs().keys(): with cmd("grep '{interface}:' /proc/net/dev | awk '{{recv=$2}}{{send=$10}} END {{print send,recv}}'".format(interface=interface)) as output: for line in output: data[interface] = iostat(interface, *line.strip().decode('UTF-8').split(' ',1)) return data ## FIXME: dependency checks (fdisk, lsblk etc) def sig_handler(signal, frame): print('\nAborting further installation steps!') print(' Here\'s a summary of the commandline:') print(f' {sys.argv}') exit(0) signal.signal(signal.SIGINT, sig_handler) def gen_uid(entropy_length=256): return sha512(os.urandom(entropy_length)).hexdigest() def get_default_gateway_linux(*positionals, **kwargs): """Read the default gateway directly from /proc.""" with open("/proc/net/route") as fh: for line in fh: fields = line.strip().split() if fields[1] != '00000000' or not int(fields[3], 16) & 2: continue return inet_ntoa(struct.pack(" origin/master\nUpdating 339d687..80b97f3\nFast-forward\n README.md | 2 +-\n 1 file changed, 1 insertion(+), 1 deletion(-)\n' if output != b'Already up to date' or branch != 'master': #tmp = re.findall(b'[0-9]+ file changed', output) #print(tmp) #if len(tmp): # num_changes = int(tmp[0].split(b' ',1)[0]) # if(num_changes): if branch != 'master': on_branch = simple_command('(cd /root/archinstall; git branch | grep "*" | cut -d\' \' -f 2)').decode('UTF-8').strip() if on_branch.lower() != branch.lower(): print(f'[N] Changing branch from {on_branch} to {branch}') output = simple_command(f'(cd /root/archinstall; git checkout {branch}; git pull)') print('[N] Rebooting the new branch') if not 'rebooted' in args: os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rebooted','--rerun']) else: os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rerun',]) if not 'rebooted' in args: ## Reboot the script (in same context) print('[N] Rebooting the script') os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rebooted',]) extit(1) def device_state(name, *positionals, **kwargs): # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: if f.read(1) == '1': return path = rootdir_pattern.sub('', os.readlink('/sys/block/{}'.format(name))) hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire") for bus in hotplug_buses: if os.path.exists('/sys/bus/{}'.format(bus)): for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)): device_link = rootdir_pattern.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus))) if re.search(device_link, path): return return True def get_partitions(dev, *positionals, **kwargs): drive_name = os.path.basename(dev) parts = oDict() #o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev))) o = b''.join(sys_command(f'/usr/bin/lsblk -J {dev}', hide_from_log=True)) if b'not a block device' in o: ## TODO: Replace o = sys_command() with code, o = sys_command() ## and make sys_command() return the exit-code, way safer than checking output strings :P return {} if not o[:1] == b'{': print('[E] Error in getting blk devices:', o) exit(1) r = json.loads(o.decode('UTF-8')) if len(r['blockdevices']) and 'children' in r['blockdevices'][0]: for part in r['blockdevices'][0]['children']: #size = os.statvfs(dev + part['name'][len(drive_name):]) parts[part['name'][len(drive_name):]] = { #'size' : size.f_frsize * size.f_bavail, #'blocksize' : size.f_frsize * size.f_blocks 'size' : part['size'] } return parts def get_disk_model(drive): with open(f'/sys/block/{os.path.basename(drive)}/device/model', 'rb') as fh: return fh.read().decode('UTF-8').strip() def get_disk_size(drive): dev_short_name = os.path.basename(drive) with open(f'/sys/block/{dev_short_name}/device/block/{dev_short_name}/size', 'rb') as fh: return ''.join(human_readable_size(fh.read().decode('UTF-8').strip())) def disk_info(drive, *positionals, **kwargs): lkwargs = {**kwargs} lkwargs['emulate'] = False # This is a emulate-safe function. Does not alter filesystem. info = json.loads(b''.join(sys_command(f'lsblk -J -o "NAME,SIZE,FSTYPE,LABEL" {drive}', *positionals, **lkwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices'][0] fileformats = [] labels = [] if 'children' in info: ## Might not be partitioned yet for child in info['children']: if child['fstype'] != None: fileformats.append(child['fstype']) if child['label'] != None: labels.append(child['label']) else: fileformats = ['*Empty Drive*'] labels = ['(no partitions)'] info['fileformats'] = fileformats info['labels'] = labels info['model'] = get_disk_model(drive) return info def cleanup_args(*positionals, **kwargs): for key in args: if args[key] == '': if not args['unattended']: if 'input_redirect' in kwargs: args[key] = kwargs['input_redirect'](key) else: args[key] = input(f'Enter a value for {key}: ') else: args[key] = random_string(32) elif args[key] == '': args[key] = random_string(32) elif args[key] == '': args[key] = gen_yubikey_password() if not args[key]: print('[E] Failed to setup a yubikey password, is it plugged in?') exit(1) def merge_in_includes(instructions, *positionals, **kwargs): if 'args' in instructions: ## == Recursively fetch instructions if "include" is found under {args: ...} while 'include' in instructions['args']: includes = instructions['args']['include'] print('[!] Importing net-deploy target: {}'.format(includes)) del(instructions['args']['include']) if type(includes) in (dict, list): for include in includes: instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True) else: instructions = merge_dicts(instructions, get_instructions(includes), *positionals, **kwargs, before=True) ## Update arguments if we found any for key, val in instructions['args'].items(): args[key] = val if 'args' in instructions: ## TODO: Reuseable code, there's to many get_instructions, merge_dictgs and args updating going on. ## Update arguments if we found any for key, val in instructions['args'].items(): args[key] = val if 'user_args' in kwargs: for key, val in kwargs['user_args'].items(): args[key] = val return instructions def update_drive_list(*positionals, **kwargs): # https://github.com/karelzak/util-linux/blob/f920f73d83f8fd52e7a14ec0385f61fab448b491/disk-utils/fdisk-list.c#L52 for path in glob('/sys/block/*/device'): name = re.sub('.*/(.*?)/device', '\g<1>', path) if device_state(name, *positionals, **kwargs): harddrives[f'/dev/{name}'] = disk_info(f'/dev/{name}', *positionals, **kwargs) def human_readable_size(bits, sizes=[{8 : 'b'}, {1024 : 'kb'}, {1024 : 'mb'}, {1024 : 'gb'}, {1024 : 'tb'}, {1024 : 'zb?'}]): # Not needed if using lsblk. end_human = None for pair in sizes: size, human = list(pair.items())[0] if bits / size > 1: bits = bits/size end_human = human else: break return bits, end_human def human_disk_info(drive): return { 'size' : harddrives[drive]['size'], 'fileformat' : harddrives[drive]['fileformats'], 'labels' : harddrives[drive]['labels'] } def close_disks(): o = simple_command('/usr/bin/umount -R /mnt/boot') o = simple_command('/usr/bin/umount -R /mnt') o = simple_command('/usr/bin/cryptsetup close /dev/mapper/luksdev') def format_disk(drive='drive', start='start', end='size', emulate=False, *positionals, **kwargs): drive = args[drive] start = args[start] end = args[end] if not drive: raise ValueError('Need to supply a drive path, for instance: /dev/sdx') if not SAFETY_LOCK: # dd if=/dev/random of=args['drive'] bs=4096 status=progress # https://github.com/dcantrell/pyparted would be nice, but isn't officially in the repo's #SadPanda #if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0: # return None if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0: return None if sys_command(f'/usr/bin/parted -s {drive} mkpart primary FAT32 1MiB {start}', emulate=emulate, *positionals, **kwargs).exit_code != 0: return None if sys_command(f'/usr/bin/parted -s {drive} name 1 "EFI"', emulate=emulate, *positionals, **kwargs).exit_code != 0: return None if sys_command(f'/usr/bin/parted -s {drive} set 1 esp on', emulate=emulate, *positionals, **kwargs).exit_code != 0: return None if sys_command(f'/usr/bin/parted -s {drive} set 1 boot on', emulate=emulate, *positionals, **kwargs).exit_code != 0: return None if sys_command(f'/usr/bin/parted -s {drive} mkpart primary {start} {end}', emulate=emulate, *positionals, **kwargs).exit_code != 0: return None # TODO: grab partitions after each parted/partition step instead of guessing which partiton is which later on. # Create one, grab partitions - dub that to "boot" or something. do the next partition, grab that and dub it "system".. or something.. # This "assumption" has bit me in the ass so many times now I've stoped counting.. Jerker is right.. Don't do it like this :P return True def multisplit(s, splitters): s = [s,] for key in splitters: ns = [] for obj in s: x = obj.split(key) for index, part in enumerate(x): if len(part): ns.append(part) if index < len(x)-1: ns.append(key) s = ns return s def grab_url_data(path): safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))]) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode=ssl.CERT_NONE response = urllib.request.urlopen(safe_path, context=ssl_context) return response.read() def get_application_instructions(target): instructions = {} try: instructions = grab_url_data('{}/applications/{}.json'.format(args['profiles-path'], target)).decode('UTF-8') print('[N] Found application instructions for: {}'.format(target)) except urllib.error.HTTPError: print('[N] Could not find remote instructions. yrying local instructions under ./deployments/applications') local_path = './deployments/applications' if os.path.isfile('./archinstall.py') else './archinstall/deployments/applications' # Dangerous assumption if os.path.isfile(f'{local_path}/{target}.json'): with open(f'{local_path}/{target}.json', 'r') as fh: instructions = fh.read() print('[N] Found local application instructions for: {}'.format(target)) else: print('[N] No instructions found for: {}'.format(target)) return instructions try: instructions = json.loads(instructions, object_pairs_hook=oDict) except: print('[E] JSON syntax error in {}'.format('{}/applications/{}.json'.format(args['profiles-path'], target))) traceback.print_exc() exit(1) return instructions def get_local_instructions(target): instructions = oDict() local_path = './deployments' if os.path.isfile('./archinstall.py') else './archinstall/deployments' # Dangerous assumption if os.path.isfile(f'{local_path}/{target}.json'): with open(f'{local_path}/{target}.json', 'r') as fh: instructions = fh.read() print('[N] Found local instructions called: {}'.format(target)) else: print('[N] No instructions found called: {}'.format(target)) return instructions def get_instructions(target, *positionals, **kwargs): if not 'profiles-path' in kwargs: kwargs['profiles-path'] = args['profiles-path'] instructions = oDict() if target[0-len('.json'):] == '.json': target = target[:0-len('.json')] log(f'Fetching instructions for {target}', level=4, origin='get_instructions') if get_default_gateway_linux(): try: instructions = grab_url_data(f"{kwargs['profiles-path']}/{target}.json").decode('UTF-8') log(f'Found net-deploy instructions for {target}', level=4, origin='get_instructions') print('[N] Found net-deploy instructions called: {}'.format(target)) except urllib.error.HTTPError: print('[N] Could not find remote instructions. Trying local instructions under ./deployments') log(f'Could not find remote instructions. Trying local instructions under ./deployments', level=4, origin='get_instructions') isntructions = get_local_instructions(target, *positionals) else: isntructions = get_local_instructions(target, *positionals) if type(instructions) not in (dict, oDict,): try: instructions = json.loads(instructions, object_pairs_hook=oDict) except: log(f'JSON syntax error in: {target}', level=4, origin='get_instructions') print('[E] JSON syntax error in {}'.format('{}/{}.json'.format(kwargs['profiles-path'], target))) traceback.print_exc() exit(1) log(f'Final instructions are: {instructions}', level=4, origin='get_instructions') return instructions def merge_dicts(d1, d2, before=True, overwrite=False): """ Merges d2 into d1 """ if before: d1, d2 = d2.copy(), d1.copy() overwrite = True for key, val in d2.items(): if key in d1: if type(d1[key]) in [dict, oDict] and type(d2[key]) in [dict, oDict]: d1[key] = merge_dicts(d1[key] if not before else d2[key], d2[key] if not before else d1[key], before=before, overwrite=overwrite) elif overwrite: d1[key] = val else: d1[key] = val return d1 def random_string(l): return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) for i in range(l)) def phone_home(url): payload = json.dumps({"hostname": args['hostname'], "done" : time.time(), "profile": args['profile'], "drive": args['drive'], "base_status": base_return_code}).encode('utf8') request = urllib.request.Request(url, data=payload, headers={'content-type': 'application/json'}) response = urllib.request.urlopen(request) def get_external_ip(*positionals, **kwargs): result = urllib.request.urlopen("https://hvornum.se/ip/?f=json").read().decode('UTF-8') return json.loads(result)['ip'] def guess_country(ip, *positionals, **kwargs): # python-pygeoip # geoip-database result = None GEOIP_DB = '/usr/share/GeoIP/GeoIP.dat' if os.path.isfile(GEOIP_DB): try: import pygeoip except: ## TODO: Do a best-effort-guess based off the hostname given off the IP instead, if GoeIP doesn't exist. return result gi = pygeoip.GeoIP(GEOIP_DB) result = gi.country_code_by_addr(ip) else: log(f'Missing GeoIP database: {GEOIP_DB}', origin='guess_country', level=LOG_LEVELS.ERROR) return result def setup_args_defaults(args, *positionals, **kwargs): if not 'size' in args: args['size'] = '100%' if not 'mirrors' in args: args['mirrors'] = True if not 'start' in args: args['start'] = '513MiB' if not 'pwfile' in args: args['pwfile'] = '/tmp/diskpw' if not 'hostname' in args: args['hostname'] = 'Archinstall' if not 'packages' in args: args['packages'] = '' # extra packages other than default if not 'post' in args: args['post'] = 'reboot' if not 'password' in args: args['password'] = '0000' # Default disk passord, can be or a fixed string if not 'minimal' in args: args['minimal'] = False if not 'unattended' in args: args['unattended'] = False if not 'profile' in args: args['profile'] = None if not 'skip-encrypt' in args: args['skip-encrypt'] = False if not 'profiles-path' in args: args['profiles-path'] = profiles_path if not 'rerun' in args: args['rerun'] = None if not 'aur-keep' in args: args['aur-keep'] = False if not 'aur-support' in args: args['aur-support'] = True # Support adds yay (https://github.com/Jguer/yay) in installation steps. if not 'ignore-rerun' in args: args['ignore-rerun'] = False if not 'phone-home' in args: args['phone-home'] = False # Setup locales if we didn't get one. if not 'country' in args: country = None if get_default_gateway_linux(): ip = get_external_ip() country = guess_country(ip) args['country'] = 'all' if not country else country if not 'localtime' in args: args['localtime'] = 'Europe/Stockholm' if args['country'] == 'SE' else 'GMT+0' # TODO: Arbitrary for now return args def load_automatic_instructions(*positionals, **kwargs): instructions = oDict() if get_default_gateway_linux(*positionals, **kwargs): locmac = get_local_MACs() if not len(locmac): print('[N] No network interfaces - No net deploy.') else: for mac in locmac: instructions = get_instructions(mac, *positionals, **kwargs) if 'args' in instructions: ## == Recursively fetch instructions if "include" is found under {args: ...} while 'include' in instructions['args']: includes = instructions['args']['include'] print('[!] Importing net-deploy target: {}'.format(includes)) del(instructions['args']['include']) if type(includes) in (dict, list): for include in includes: instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True) else: instructions = merge_dicts(instructions, get_instructions(includes, *positionals, **kwargs), before=True) ## Update arguments if we found any for key, val in instructions['args'].items(): args[key] = val if 'user_args' in kwargs: for key, val in kwargs['user_args'].items(): args[key] = val else: print('[N] No gateway - No net deploy') return instructions def cache_diskpw_on_disk(): if not os.path.isfile(args['pwfile']): #PIN = '0000' with open(args['pwfile'], 'w') as pw: pw.write(args['password']) def refresh_partition_list(drive, *positionals, **kwargs): drive = args[drive] if not 'partitions' in args: args['partitions'] = oDict() for index, part_name in enumerate(sorted(get_partitions(drive, *positionals, **kwargs).keys())): args['partitions'][str(index+1)] = part_name return True def mkfs_fat32(drive, partition, *positionals, **kwargs): drive = args[drive] partition = args['partitions'][partition] o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {drive}{partition}')) if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: return None return True def is_luksdev_mounted(*positionals, **kwargs): o = b''.join(sys_command('/usr/bin/file /dev/mapper/luksdev', hide_from_log=True)) # /dev/dm-0 if b'cannot open' in o: return False return True def mount_luktsdev(drive, partition, keyfile, *positionals, **kwargs): drive = args[drive] partition = args['partitions'][partition] keyfile = args[keyfile] if not is_luksdev_mounted(): o = b''.join(sys_command(f'/usr/bin/cryptsetup open {drive}{partition} luksdev --key-file {keyfile} --type luks2'.format(**args))) return is_luksdev_mounted() def encrypt_partition(drive, partition, keyfile='/tmp/diskpw', *positionals, **kwargs): drive = args[drive] partition = args['partitions'][partition] keyfile = args[keyfile] o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash sha512 --key-size 512 --iter-time 10000 --key-file {keyfile} --use-urandom luksFormat {drive}{partition}')) if not b'Command successful.' in o: return False return True def mkfs_btrfs(drive='/dev/mapper/luksdev', *positionals, **kwargs): o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {drive}')) if not b'UUID' in o: return False return True def mount_luksdev(where='/dev/mapper/luksdev', to='/mnt', *positionals, **kwargs): check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt', *positionals, **kwargs).decode('UTF-8').strip()# /dev/dm-0 if len(check_mounted): return False o = b''.join(sys_command('/usr/bin/mount /dev/mapper/luksdev /mnt', *positionals, **kwargs)) return True def mount_part(drive, partition, mountpoint='/mnt', *positionals, **kwargs): os.makedirs(mountpoint, exist_ok=True) #o = b''.join(sys_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs)) # /dev/dm-0 check_mounted = simple_command(f'/usr/bin/mount | /usr/bin/grep {mountpoint}', *positionals, **kwargs).decode('UTF-8').strip() if len(check_mounted): return False o = b''.join(sys_command(f'/usr/bin/mount {drive}{partition} {mountpoint}', *positionals, **kwargs)) return True def mount_boot(drive, partition, mountpoint='/mnt/boot', *positionals, **kwargs): os.makedirs('/mnt/boot', exist_ok=True) #o = b''.join(sys_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs)) # /dev/dm-0 check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs).decode('UTF-8').strip() if len(check_mounted): return False o = b''.join(sys_command(f'/usr/bin/mount {drive}{partition} {mountpoint}', *positionals, **kwargs)) return True def mount_mountpoints(drive, bootpartition, mountpoint='/mnt', *positionals, **kwargs): drive = args[drive] if args['skip-encrypt']: mount_part(drive, args['partitions']["2"], mountpoint, *positionals, **kwargs) else: mount_luksdev(*positionals, **kwargs) mount_boot(drive, args['partitions'][bootpartition], mountpoint=f'{mountpoint}/boot', *positionals, **kwargs) return True def re_rank_mirrors(top=10, *positionals, **kwargs): if sys_command((f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0: return True return False def filter_mirrors_by_country_list(countries, top=None, *positionals, **kwargs): ## TODO: replace wget with urllib.request (no point in calling syscommand) country_list = [] for country in countries.split(','): country_list.append(f'country={country}') if not SAFETY_LOCK: o = b''.join(sys_command((f"/usr/bin/wget 'https://www.archlinux.org/mirrorlist/?{'&'.join(country_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O /root/mirrorlist"))) o = b''.join(sys_command(("/usr/bin/sed -i 's/#Server/Server/' /root/mirrorlist"))) o = b''.join(sys_command(("/usr/bin/mv /root/mirrorlist /etc/pacman.d/"))) if top: re_rank_mirrors(top, *positionals, **kwargs) or not os.path.isfile('/etc/pacman.d/mirrorlist') return True def add_custom_mirror(name, url, *positionals, **kwargs): if not SAFETY_LOCK: commandlog.append('# Adding custom mirror to /etc/pacman.conf') with open('/etc/pacman.conf', 'a') as mirrorlist: commandlog.append(f'# {name} @ {url}') mirrorlist.write('\n') mirrorlist.write(f'[{name}]\n') mirrorlist.write(f'Server = {url}\n') mirrorlist.write(f'SigLevel = Optional TrustAll\n') return True def add_specific_mirrors(mirrors, *positionals, **kwargs): if not SAFETY_LOCK: commandlog.append('# Adding mirrors to /etc/pacman.d/mirrorlist') with open('/etc/pacman.d/mirrorlist', 'a') as mirrorlist: mirrorlist.write('\n') for url in mirrors: commandlog.append(f'# {url}') mirrorlist.write(f'# {mirrors[url]}\n') mirrorlist.write(f'Server = {url}\n') return True def flush_all_mirrors(*positionals, **kwargs): if not SAFETY_LOCK: commandlog.append('# Flushed /etc/pacman.d/mirrorlist') with open('/etc/pacman.d/mirrorlist', 'w') as mirrorlist: mirrorlist.write('\n') # TODO: Not needed. return True def reboot(*positionals, **kwargs): simple_command('/usr/bin/sync', *positionals, **kwargs).decode('UTF-8').strip() simple_command('/usr/bin/reboot', *positionals, **kwargs).decode('UTF-8').strip() def strap_in_base(*positionals, **kwargs): if not SAFETY_LOCK: if args['aur-support']: args['packages'] += ' git' if (sync_mirrors := sys_command('/usr/bin/pacman -Syy', *positionals, **kwargs)).exit_code == 0: x = sys_command('/usr/bin/pacstrap /mnt base base-devel linux linux-firmware btrfs-progs efibootmgr nano wpa_supplicant dialog {packages}'.format(**args), *positionals, **kwargs) if x.exit_code == 0: return True else: log(f'Could not strap in base: {x.exit_code}', level=3, origin='strap_in_base') else: log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=3, origin='strap_in_base') return False def set_locale(fmt, *positionals, **kwargs): if not '.' in fmt: if fmt.lower() == 'se': fmt = 'en_SE.UTF-8 UTF-8' else: fmt = 'en_US.UTF-8 UTF-8' if not SAFETY_LOCK: o = b''.join(sys_command(f"/usr/bin/arch-chroot /mnt sh -c \"echo '{fmt}' > /etc/locale.gen\"")) o = b''.join(sys_command(f"/usr/bin/arch-chroot /mnt sh -c \"echo 'LANG={fmt.split(' ')[0]}' > /etc/locale.conf\"")) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt locale-gen')) return True def configure_base_system(*positionals, **kwargs): if not SAFETY_LOCK: ## TODO: Replace a lot of these syscalls with just python native operations. o = b''.join(sys_command('/usr/bin/genfstab -pU /mnt >> /mnt/etc/fstab')) if not os.path.isfile('/mnt/etc/fstab'): log(f'Could not locate fstab, strapping in packages most likely failed.', level=3, origin='configure_base_system') return False with open('/mnt/etc/fstab', 'a') as fstab: fstab.write('\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n') # Redundant \n at the start? who knoes? o = b''.join(sys_command('/usr/bin/arch-chroot /mnt rm -f /etc/localtime')) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt ln -s /usr/share/zoneinfo/{localtime} /etc/localtime'.format(**args))) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime')) #o = sys_command('arch-chroot /mnt echo "{hostname}" > /etc/hostname'.format(**args)) #o = sys_command("arch-chroot /mnt sed -i 's/#\(en_US\.UTF-8\)/\1/' /etc/locale.gen") o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo '{hostname}' > /etc/hostname\"".format(**args))) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt chmod 700 /root')) with open('/mnt/etc/mkinitcpio.conf', 'w') as mkinit: ## TODO: Don't replace it, in case some update in the future actually adds something. mkinit.write('MODULES=(btrfs)\n') mkinit.write('BINARIES=(/usr/bin/btrfs)\n') mkinit.write('FILES=()\n') mkinit.write('HOOKS=(base udev autodetect modconf block encrypt filesystems keyboard fsck)\n') o = b''.join(sys_command('/usr/bin/arch-chroot /mnt mkinitcpio -p linux')) return True def setup_bootloader(*positionals, **kwargs): o = b''.join(sys_command('/usr/bin/arch-chroot /mnt bootctl --no-variables --path=/boot install')) with open('/mnt/boot/loader/loader.conf', 'w') as loader: loader.write('default arch\n') loader.write('timeout 5\n') ## For some reason, blkid and /dev/disk/by-uuid are not getting along well. ## And blkid is wrong in terms of LUKS. #UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip() with open('/mnt/boot/loader/entries/arch.conf', 'w') as entry: entry.write('title Arch Linux\n') entry.write('linux /vmlinuz-linux\n') entry.write('initrd /initramfs-linux.img\n') if args['skip-encrypt']: ## NOTE: We could use /dev/disk/by-partuuid but blkid does the same and a lot cleaner UUID = simple_command(f"blkid -s PARTUUID -o value /dev/{os.path.basename(args['drive'])}{args['partitions']['2']}").decode('UTF-8').strip() entry.write('options root=PARTUUID={UUID} rw intel_pstate=no_hwp\n'.format(UUID=UUID)) else: UUID = simple_command(f"ls -l /dev/disk/by-uuid/ | grep {os.path.basename(args['drive'])}{args['partitions']['2']} | awk '{{print $9}}'").decode('UTF-8').strip() entry.write('options cryptdevice=UUID={UUID}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n'.format(UUID=UUID)) return True def add_AUR_support(*positionals, **kwargs): o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "useradd -m -G wheel aibuilder"')) o = b''.join(sys_command("/usr/bin/sed -i 's/# %wheel ALL=(ALL) NO/%wheel ALL=(ALL) NO/' /mnt/etc/sudoers")) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder; git clone https://aur.archlinux.org/yay.git)\\""')) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "chown -R aibuilder.aibuilder /home/aibuilder/yay"')) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder/yay; makepkg -si --noconfirm)\\" >/dev/null"')) ## Do not remove aibuilder just yet, can be used later for aur packages. #o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers')) #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"')) #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"')) return True def run_post_install_steps(*positionals, **kwargs): log(f'Running post installation with input data {instructions}.', level=4, origin='run_post_install_steps') conf = {} if 'post' in instructions: conf = instructions['post'] elif not 'args' in instructions and len(instructions): conf = instructions if 'git-branch' in conf: update_git(conf['git-branch']) del(conf['git-branch']) rerun = args['ignore-rerun'] for title in conf: log(f'Running post installation step {title}', level=4, origin='run_post_install_steps') if args['rerun'] and args['rerun'] != title and not rerun: continue else: rerun = True print('[N] Network Deploy: {}'.format(title)) if type(conf[title]) == str: print('[N] Loading {} configuration'.format(conf[title])) log(f'Loading {conf[title]} configuration', level=4, origin='run_post_install_steps') conf[title] = get_application_instructions(conf[title]) for command in conf[title]: raw_command = command opts = conf[title][command] if type(conf[title][command]) in (dict, oDict) else {} if len(opts): if 'pass-args' in opts or 'format' in opts: command = command.format(**args) ## FIXME: Instead of deleting the two options ## in order to mute command output further down, ## check for a 'debug' flag per command and delete these two if 'pass-args' in opts: del(opts['pass-args']) elif 'format' in opts: del(opts['format']) elif ('debug' in opts and opts['debug']) or ('debug' in conf and conf['debug']): print('[-] Options: {}'.format(opts)) if 'pass-args' in opts and opts['pass-args']: command = command.format(**args) if 'runas' in opts and f'su - {opts["runas"]} -c' not in command: command = command.replace('"', '\\"') command = f'su - {opts["runas"]} -c "{command}"' #print('[N] Command: {} ({})'.format(command, opts)) ## https://superuser.com/questions/1242978/start-systemd-nspawn-and-execute-commands-inside ## !IMPORTANT ## ## arch-chroot mounts /run into the chroot environment, this breaks name resolves for some reason. ## Either skipping mounting /run and using traditional chroot is an option, but using ## `systemd-nspawn -D /mnt --machine temporary` might be a more flexible solution in case of file structure changes. if 'no-chroot' in opts and opts['no-chroot']: log(f'Executing {command} as simple command from live-cd.', level=4, origin='run_post_install_steps') o = simple_command(command, opts, *positionals, **kwargs) elif 'chroot' in opts and opts['chroot']: log(f'Executing {command} in chroot.', level=4, origin='run_post_install_steps') ## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces). ## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option. ## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command. o = simple_command("mount /dev/mapper/luksdev /mnt") o = simple_command("cd /mnt; cp /etc/resolv.conf etc") o = simple_command("cd /mnt; mount -t proc /proc proc") o = simple_command("cd /mnt; mount --make-rslave --rbind /sys sys") o = simple_command("cd /mnt; mount --make-rslave --rbind /dev dev") o = simple_command('chroot /mnt /bin/bash -c "{c}"'.format(c=command), opts=opts, *positionals, **kwargs) o = simple_command("cd /mnt; umount -R dev") o = simple_command("cd /mnt; umount -R sys") o = simple_command("cd /mnt; umount -R proc") else: if 'boot' in opts and opts['boot']: log(f'Executing {command} in boot mode.', level=4, origin='run_post_install_steps') ## So, if we're going to boot this maddafakker up, we'll need to ## be able to login. The quickest way is to just add automatic login.. so lessgo! ## Turns out.. that didn't work exactly as planned.. ## # if not os.path.isdir('/mnt/etc/systemd/system/console-getty.service.d/'): # os.makedirs('/mnt/etc/systemd/system/console-getty.service.d/') # with open('/mnt/etc/systemd/system/console-getty.service.d/override.conf', 'w') as fh: # fh.write('[Service]\n') # fh.write('ExecStart=\n') # fh.write('ExecStart=-/usr/bin/agetty --autologin root -s %I 115200,38400,9600 vt102\n') ## So we'll add a bunch of triggers instead and let the sys_command manually react to them. ## " login" followed by "Passwodd" in case it's been set in a previous step.. usually this shouldn't be nessecary ## since we set the password as the last step. And then the command itself which will be executed by looking for: ## [root@ ~]# defaults = { 'login:' : 'root\n', 'Password:' : args['password']+'\n', '[root@{args["hostname"]} ~]#' : command+'\n', } if not 'events' in opts: opts['events'] = {} events = {**defaults, **opts['events']} del(opts['events']) o = b''.join(sys_command('/usr/bin/systemd-nspawn -D /mnt -b --machine temporary', *positionals, **{'events' : events, **kwargs, **opts})) ## Not needed anymore: And cleanup after out selves.. Don't want to leave any residue.. # os.remove('/mnt/etc/systemd/system/console-getty.service.d/override.conf') else: log(f'Executing {command} in with systemd-nspawn without boot.', level=4, origin='run_post_install_steps') o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D /mnt --machine temporary {command}', *positionals, **{**kwargs, **opts})) if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in o: log(f'{command} failed: {o.decode("UTF-8")}', level=4, origin='run_post_install_steps') print('[W] Post install command failed: {}'.format(o.decode('UTF-8'))) #print(o) return True def create_user(username, password='', groups=[]): if username: o = (f'/usr/bin/arch-chroot /mnt useradd -m -G wheel {username}') if password: o = (f"/usr/bin/arch-chroot /mnt sh -c \"echo '{username}:{password}' | chpasswd\"") if groups: for group in groups: o = (f'/usr/bin/arch-chroot /mnt gpasswd -a {username} {group}') return True def prerequisit_check(): if not os.path.isdir('/sys/firmware/efi'): return False, 'Archinstall only supports UEFI-booted machines.' return True if __name__ == '__main__': if not (prereq := prerequisit_check()) is True: print(f'[E] {prereq[1]}') exit(1) ## Setup some defaults # (in case no command-line parameters or netdeploy-params were given) args = setup_args_defaults(args) user_args = {} positionals = [] for arg in sys.argv[1:]: if '--' == arg[:2]: if '=' in arg: key, val = [x.strip() for x in arg[2:].split('=')] else: key, val = arg[2:], True args[key] = val user_args[key] = val else: positionals.append(arg) update_git() # Breaks and restarts the script if an update was found. update_drive_list() ## == If we got networking, # Try fetching instructions for this box unless a specific profile was given, and execute them. if args['profile'] is None and not args['minimal']: instructions = load_automatic_instructions(user_args=user_args) elif args['profile'] and not args['minimal']: instructions = get_instructions(args['profile']) if len(instructions) <= 0: print('[E] No instructions by the name of {} was found.'.format(args['profile'])) print(' Installation won\'t continue until a valid profile is given.') print(' (this is because --profile was given and a --default is not given)') exit(1) first = True while not args['minimal'] and not args['profile'] and len(instructions) <= 0: profile = input('What template do you want to install: ') instructions = get_instructions(profile) if first and len(instructions) <= 0: print('[E] No instructions by the name of {} was found.'.format(profile)) print(' Installation won\'t continue until a valid profile is given.') print(' (this is because --default is not instructed and no --profile given)') first = False # TODO: Might not need to return anything here, passed by reference? instructions = merge_in_includes(instructions, user_args=user_args) cleanup_args() ## If no drive was found in args, select one. if not 'drive' in args: if len(harddrives): drives = sorted(list(harddrives.keys())) if len(drives) > 1 and 'force' not in args and not 'unattended' in args and ('minimal' in args and 'first-drive' not in args): for index, drive in enumerate(drives): print(f"{index}: {drive} ({harddrives[drive]['size'], harddrives[drive]['fstype'], harddrives[drive]['label']})") drive = input('Select one of the above disks (by number): ') if not drive.isdigit(): raise KeyError("Multiple disks found, --drive=/dev/X not specified (or --force/--first-drive)") drives = [drives[int(drive)]] # Make sure only the selected drive is in the list of options args['drive'] = drives[0] # First drive found else: args['drive'] = None if args['drive'] and args['drive'][0] != '/': ## Remap the selected UUID to the device to be formatted. drive = get_drive_from_uuid(args['drive']) if not drive: print(f'[N] Could not map UUID "{args["drive"]}" to a device. Trying to match via PARTUUID instead!') drive = get_drive_from_part_uuid(args['drive']) if not drive: print(f'[E] Could not map UUID "{args["drive"]}" to a device. Aborting!') exit(1) args['drive'] = drive print(json.dumps(args, indent=4)) if args['minimal'] and not 'force' in args and not 'unattended' in args: if(input('Are these settings OK? (No return beyond this point) N/y: ').lower() != 'y'): exit(1) cache_diskpw_on_disk() #else: # ## TODO: Convert to `rb` instead. # # We shouldn't discriminate \xfu from being a passwd phrase. # with open(args['pwfile'], 'r') as pw: # PIN = pw.read().strip() print() if not args['skip-encrypt']: print('[!] Disk & root PASSWORD is: {}'.format(args['password'])) else: print('[!] root PASSWORD is: {}'.format(args['password'])) print() if not args['rerun'] or args['ignore-rerun']: for i in range(5, 0, -1): print(f'Formatting {args["drive"]} in {i}...') time.sleep(1) close_disks() print(f'[N] Setting up {args["drive"]}.') if not format_disk('drive', start='start', end='size', debug=True): print(f'[E] Coult not format drive {args["drive"]}') exit(1) refresh_partition_list('drive') print(f'[N] Partitions: {len(args["partitions"])} (Boot: {list(args["partitions"].keys())[0]})') if len(args['partitions']) <= 0: print(f'[E] No partitions were created on {args["drive"]}', o) exit(1) if not args['rerun'] or args['ignore-rerun']: if not mkfs_fat32('drive', '1'): print(f'[E] Could not setup {args["drive"]}{args["partitions"]["1"]}') exit(1) if not args['skip-encrypt']: # "--cipher sha512" breaks the shit. # TODO: --use-random instead of --use-urandom print(f'[N] Adding encryption to {args["drive"]}{args["partitions"]["2"]}.') if not encrypt_partition('drive', '2', 'pwfile'): print('[E] Failed to setup disk encryption.', o) exit(1) if not args['skip-encrypt']: if not mount_luktsdev('drive', '2', 'pwfile'): print('[E] Could not open encrypted device.', o) exit(1) if not args['rerun'] or args['ignore-rerun']: print(f'[N] Creating btrfs filesystem inside {args["drive"]}{args["partitions"]["2"]}') on_part = '/dev/mapper/luksdev' if args['skip-encrypt']: on_part = f'{args["drive"]}{args["partitions"]["2"]}' if not mkfs_btrfs(on_part): print('[E] Could not setup btrfs filesystem.') exit(1) mount_mountpoints('drive', '1') if 'mirrors' in args and args['mirrors'] and 'country' in args and get_default_gateway_linux(): print('[N] Reordering mirrors.') filter_mirrors_by_country_list(args['country']) pre_conf = {} if 'pre' in instructions: pre_conf = instructions['pre'] elif 'prerequisits' in instructions: pre_conf = instructions['prerequisits'] if 'git-branch' in pre_conf: update_git(pre_conf['git-branch']) del(pre_conf['git-branch']) rerun = args['ignore-rerun'] ## Prerequisit steps needs to NOT be executed in arch-chroot. ## Mainly because there's no root structure to chroot into. ## But partly because some configurations need to be done against the live CD. ## (For instance, modifying mirrors are done on LiveCD and replicated intwards) for title in pre_conf: print('[N] Network prerequisit step: {}'.format(title)) if args['rerun'] and args['rerun'] != title and not rerun: continue else: rerun = True for command in pre_conf[title]: raw_command = command opts = pre_conf[title][raw_command] if type(pre_conf[title][raw_command]) in (dict, oDict) else {} if len(opts): if 'pass-args' in opts or 'format' in opts: command = command.format(**args) ## FIXME: Instead of deleting the two options ## in order to mute command output further down, ## check for a 'debug' flag per command and delete these two if 'pass-args' in opts: del(opts['pass-args']) elif 'format' in opts: del(opts['format']) elif 'debug' in opts and opts['debug']: print('[N] Complete command-string: '.format(command)) else: print('[-] Options: {}'.format(opts)) #print('[N] Command: {} ({})'.format(raw_command, opts)) o = b''.join(sys_command('{c}'.format(c=command), opts)) if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in b''.join(o): print('[W] Prerequisit step failed: {}'.format(b''.join(o).decode('UTF-8'))) #print(o) if not args['rerun'] or rerun: print('[N] Straping in packages.') base_return_code = strap_in_base() # TODO: check return here? we return based off pacstrap exit code.. Never tired it tho. else: base_return_code = None if not os.path.isdir('/mnt/etc'): # TODO: This might not be the most long term stable thing to rely on... print('[E] Failed to strap in packages', o) exit(1) if not args['rerun'] or rerun: print('[N] Configuring base system.') set_locale('en_US.UTF-8 UTF-8') configure_base_system() ## WORKAROUND: https://github.com/systemd/systemd/issues/13603#issuecomment-552246188 print('[N] Setting up bootloader.') setup_bootloader() if args['aur-support']: print('[N] AUR support demanded, building "yay" before running POST steps.') add_AUR_support() print('[N] AUR support added. use "yay -Syy --noconfirm " to deploy in POST.') ## == Passwords # o = sys_command('arch-chroot /mnt usermod --password {} root'.format(args['password'])) # o = sys_command("arch-chroot /mnt sh -c 'echo {pin} | passwd --stdin root'".format(pin='"{pin}"'.format(**args, pin=args['password'])), echo=True) set_password(user='root', password=args['password']) time.sleep(5) if 'user' in args: create_user(args['user'], args['password'])#, groups=['wheel']) print('[N] Running post installation steps.') run_post_install_steps() time.sleep(2) if args['aur-support'] and not args['aur-keep']: o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers')) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"')) o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"')) if args['phone-home']: phone_home(args['phone-home']) if args['post'] == 'reboot': o = simple_command('/usr/bin/umount -R /mnt') o = simple_command('/usr/bin/reboot now') else: print('Done. "umount -R /mnt; reboot" when you\'re done tinkering.')