index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/general.py | 461 |
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index eb0c5d14..3b62c891 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -1,16 +1,26 @@ -import os, json, hashlib, shlex, sys -import time, pty, logging +import hashlib +import json +import logging +import os +import pty +import shlex +import subprocess +import sys +import time from datetime import datetime, date -from subprocess import Popen, STDOUT, PIPE, check_output from select import epoll, EPOLLIN, EPOLLHUP +from typing import Union + from .exceptions import * from .output import log + def gen_uid(entropy_length=256): return hashlib.sha512(os.urandom(entropy_length)).hexdigest() + def multisplit(s, splitters): - s = [s,] + s = [s, ] for key in splitters: ns = [] for obj in s: @@ -18,38 +28,41 @@ def multisplit(s, splitters): for index, part in enumerate(x): if len(part): ns.append(part) - if index < len(x)-1: + if index < len(x) - 1: ns.append(key) s = ns return s + def locate_binary(name): for PATH in os.environ['PATH'].split(':'): for root, folders, files in os.walk(PATH): for file in files: if file == name: return os.path.join(root, file) - break # Don't recurse + break # Don't recurse + + raise RequirementError(f"Binary {name} does not exist.") + -class JSON_Encoder: +class JsonEncoder: def _encode(obj): if isinstance(obj, dict): - ## We'll need to iterate not just the value that default() usually gets passed - ## But also iterate manually over each key: value pair in order to trap the keys. - + # We'll need to iterate not just the value that default() usually gets passed + # But also iterate manually over each key: value pair in order to trap the keys. + copy = {} for key, val in list(obj.items()): if isinstance(val, dict): - val = json.loads(json.dumps(val, cls=JSON)) # This, is a EXTREMELY ugly hack.. - # But it's the only quick way I can think of to - # trigger a encoding of sub-dictionaries. + # This, is a EXTREMELY ugly hack.. but it's the only quick way I can think of to trigger a encoding of sub-dictionaries. + val = json.loads(json.dumps(val, cls=JSON)) else: - val = JSON_Encoder._encode(val) - + val = JsonEncoder._encode(val) + if type(key) == str and key[0] == '!': - copy[JSON_Encoder._encode(key)] = '******' + copy[JsonEncoder._encode(key)] = '******' else: - copy[JSON_Encoder._encode(key)] = val + copy[JsonEncoder._encode(key)] = val return copy elif hasattr(obj, 'json'): return obj.json() @@ -65,113 +78,134 @@ class JSON_Encoder: else: return obj + class JSON(json.JSONEncoder, json.JSONDecoder): def _encode(self, obj): - return JSON_Encoder._encode(obj) + return JsonEncoder._encode(obj) def encode(self, obj): return super(JSON, self).encode(self._encode(obj)) -class sys_command():#Thread): - """ - Stolen from archinstall_gui - """ - def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars={}, *args, **kwargs): - kwargs.setdefault("worker_id", gen_uid()) - kwargs.setdefault("emulate", False) - kwargs.setdefault("suppress_errors", False) - self.log = kwargs.get('log', log) +class SysCommandWorker: + def __init__(self, cmd, callbacks=None, peak_output=False, environment_vars=None, logfile=None, working_directory='./'): + if not callbacks: + callbacks = {} + if not environment_vars: + environment_vars = {} - if kwargs['emulate']: - self.log(f"Starting command '{cmd}' in emulation mode.", level=logging.DEBUG) + if type(cmd) is str: + cmd = shlex.split(cmd) - if type(cmd) is list: - # if we get a list of arguments - self.raw_cmd = shlex.join(cmd) - self.cmd = cmd - else: - # else consider it a single shell string - # this should only be used if really necessary - self.raw_cmd = cmd - try: - self.cmd = shlex.split(cmd) - except Exception as e: - raise ValueError(f'Incorrect string to split: {cmd}\n{e}') + if cmd[0][0] != '/' and cmd[0][:2] != './': + # "which" doesn't work as it's a builtin to bash. + # It used to work, but for whatever reason it doesn't anymore. + # We there for fall back on manual lookup in os.PATH + cmd[0] = locate_binary(cmd[0]) - self.args = args - self.kwargs = kwargs + self.cmd = cmd + self.callbacks = callbacks self.peak_output = peak_output self.environment_vars = environment_vars + self.logfile = logfile + self.working_directory = working_directory - self.kwargs.setdefault("worker", None) - self.callback = callback - self.pid = None self.exit_code = None - self.started = time.time() + self._trace_log = b'' + self._trace_log_pos = 0 + self.poll_object = epoll() + self.child_fd = None + self.started = None self.ended = None - self.worker_id = kwargs['worker_id'] - self.trace_log = b'' - self.status = 'starting' - user_catalogue = os.path.expanduser('~') + def __contains__(self, key: bytes): + """ + Contains will also move the current buffert position forward. + This is to avoid re-checking the same data when looking for output. + """ + assert type(key) == bytes - if (workdir := kwargs.get('workdir', None)): - self.cwd = workdir - self.exec_dir = workdir - else: - self.cwd = f"{user_catalogue}/.cache/archinstall/workers/{kwargs['worker_id']}/" - self.exec_dir = f'{self.cwd}/{os.path.basename(self.cmd[0])}_workingdir' + if (contains := key in self._trace_log[self._trace_log_pos:]): + self._trace_log_pos += self._trace_log[self._trace_log_pos:].find(key) + len(key) - if not self.cmd[0][0] == '/': - # "which" doesn't work as it's a builtin to bash. - # It used to work, but for whatever reason it doesn't anymore. So back to square one.. + return contains - #self.log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5) - #self.log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5) - self.cmd[0] = locate_binary(self.cmd[0]) + def __iter__(self, *args, **kwargs): + for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'): + if line: + yield line + b'\n' - if not os.path.isdir(self.exec_dir): - os.makedirs(self.exec_dir) + self._trace_log_pos = self._trace_log.rfind(b'\n') - if start_callback: - start_callback(self, *args, **kwargs) - self.run() + def __repr__(self): + self.make_sure_we_are_executing() + return str(self._trace_log) - def __iter__(self, *args, **kwargs): - for line in self.trace_log.split(b'\n'): - yield line + def __enter__(self): + return self - def __repr__(self, *args, **kwargs): - return f"{self.cmd, self.trace_log}" + def __exit__(self, *args): + # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. + # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - def decode(self, fmt='UTF-8'): - return self.trace_log.decode(fmt) + if self.child_fd: + try: + os.close(self.child_fd) + except: + pass - def dump(self): - return { - 'status': self.status, - 'worker_id': self.worker_id, - 'worker_result': self.trace_log.decode('UTF-8'), - 'started': self.started, - 'ended': self.ended, - 'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)), - 'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None, - 'exit_code': self.exit_code - } + if self.peak_output: + # To make sure any peaked output didn't leave us hanging + # on the same line we were on. + sys.stdout.write("\n") + sys.stdout.flush() - def peak(self, output :str): - if type(output) == bytes: - try: - output = output.decode('UTF-8') - except UnicodeDecodeError: - return None + if len(args) >= 2 and args[1]: + log(args[1], level=logging.ERROR, fg='red') + + if self.exit_code != 0: + raise SysCallError(f"{self.cmd} exited with abnormal exit code: {self.exit_code}") + + def is_alive(self): + self.poll() + + if self.started and self.ended is None: + return True + + return False + + def write(self, data: bytes, line_ending=True): + assert type(data) == bytes # TODO: Maybe we can support str as well and encode it + + self.make_sure_we_are_executing() - output = output.strip('\r\n ') - if len(output) <= 0: - return None + os.write(self.child_fd, data + (b'\n' if line_ending else b'')) + def make_sure_we_are_executing(self): + if not self.started: + return self.execute() + + def tell(self) -> int: + self.make_sure_we_are_executing() + return self._trace_log_pos + + def seek(self, pos): + self.make_sure_we_are_executing() + # Safety check to ensure 0 < pos < len(tracelog) + self._trace_log_pos = min(max(0, pos), len(self._trace_log)) + + def peak(self, output: Union[str, bytes]) -> bool: if self.peak_output: + if type(output) == bytes: + try: + output = output.decode('UTF-8') + except UnicodeDecodeError: + return False + + output = output.strip('\r\n ') + if len(output) <= 0: + return False + from .user_interaction import get_terminal_width # Move back to the beginning of the terminal @@ -191,124 +225,133 @@ class sys_command():#Thread): # And print the new output we're peaking on: sys.stdout.write(output) sys.stdout.flush() + return True - def run(self): - self.status = 'running' - old_dir = os.getcwd() - os.chdir(self.exec_dir) - self.pid, child_fd = pty.fork() - if not self.pid: # Child process - # Replace child process with our main process - if not self.kwargs['emulate']: - try: - os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars}) - except FileNotFoundError: - self.status = 'done' - self.log(f"{self.cmd[0]} does not exist.", level=logging.DEBUG) - self.exit_code = 1 - return False - - os.chdir(old_dir) - - poller = epoll() - poller.register(child_fd, EPOLLIN | EPOLLHUP) - - if 'events' in self.kwargs and 'debug' in self.kwargs: - self.log(f'[D] Using triggers for command: {self.cmd}', level=logging.DEBUG) - self.log(json.dumps(self.kwargs['events']), level=logging.DEBUG) + def poll(self): + self.make_sure_we_are_executing() - alive = True - last_trigger_pos = 0 - while alive and not self.kwargs['emulate']: - for fileno, event in poller.poll(0.1): - try: - output = os.read(child_fd, 8192) - self.peak(output) - self.trace_log += output - except OSError: - alive = False - break - - if 'debug' in self.kwargs and self.kwargs['debug'] and len(output): - self.log(self.cmd, 'gave:', output.decode('UTF-8'), level=logging.DEBUG) - - if 'on_output' in self.kwargs: - self.kwargs['on_output'](self.kwargs['worker'], output) - - lower = output.lower() - broke = False - if 'events' in self.kwargs: - for trigger in list(self.kwargs['events']): - if type(trigger) != bytes: - original = trigger - trigger = bytes(original, 'UTF-8') - self.kwargs['events'][trigger] = self.kwargs['events'][original] - del(self.kwargs['events'][original]) - if type(self.kwargs['events'][trigger]) != bytes: - self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8') - - if trigger.lower() in self.trace_log[last_trigger_pos:].lower(): - trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower()) - - if 'debug' in self.kwargs and self.kwargs['debug']: - self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG) - self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG) - - last_trigger_pos = trigger_pos - os.write(child_fd, self.kwargs['events'][trigger]) - del(self.kwargs['events'][trigger]) - broke = True - break - - if broke: - continue - - ## Adding a exit trigger: - if len(self.kwargs['events']) == 0: - if 'debug' in self.kwargs and self.kwargs['debug']: - self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=logging.DEBUG) - - if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower(): - if 'debug' in self.kwargs and self.kwargs['debug']: - self.log(f"{self.cmd[0]} has finished.", level=logging.DEBUG) - alive = False - break - - self.status = 'done' - - if 'debug' in self.kwargs and self.kwargs['debug']: - self.log(f"{self.cmd[0]} waiting for exit code.", level=logging.DEBUG) - - if not self.kwargs['emulate']: + got_output = False + for fileno, event in self.poll_object.poll(0.1): + try: + output = os.read(self.child_fd, 8192) + got_output = True + self.peak(output) + self._trace_log += output + except OSError as err: + self.ended = time.time() + break + + if self.ended or (got_output is False and pid_exists(self.pid) is False): + self.ended = time.time() try: self.exit_code = os.waitpid(self.pid, 0)[1] except ChildProcessError: try: - self.exit_code = os.waitpid(child_fd, 0)[1] + self.exit_code = os.waitpid(self.child_fd, 0)[1] except ChildProcessError: self.exit_code = 1 - else: - self.exit_code = 0 - if 'debug' in self.kwargs and self.kwargs['debug']: - self.log(f"{self.cmd[0]} got exit code: {self.exit_code}", level=logging.DEBUG) + def execute(self) -> bool: + if (old_dir := os.getcwd()) != self.working_directory: + os.chdir(self.working_directory) + + # Note: If for any reason, we get a Python exception between here + # and until os.close(), the traceback will get locked inside + # stdout of the child_fd object. `os.read(self.child_fd, 8192)` is the + # only way to get the traceback without loosing it. + self.pid, self.child_fd = pty.fork() + os.chdir(old_dir) + + if not self.pid: + try: + os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars}) + except FileNotFoundError: + log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red") + self.exit_code = 1 + return False + + self.started = time.time() + self.poll_object.register(self.child_fd, EPOLLIN | EPOLLHUP) + + return True + + def decode(self, encoding='UTF-8'): + return self._trace_log.decode(encoding) + + +class SysCommand: + def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars=None, working_directory='./'): + _callbacks = {} + if callback: + _callbacks['on_end'] = callback + if start_callback: + _callbacks['on_start'] = start_callback + + self.cmd = cmd + self._callbacks = _callbacks + self.peak_output = peak_output + self.environment_vars = environment_vars + self.working_directory = working_directory + + self.session = None + self.create_session() + + def __enter__(self): + return self.session - if 'ignore_errors' in self.kwargs: - self.exit_code = 0 + def __exit__(self, *args, **kwargs): + # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync. + # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - if self.exit_code != 0 and not self.kwargs['suppress_errors']: - #self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG) - #self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=logging.ERROR) - raise SysCallError(message=f"{self.trace_log.decode('UTF-8')}\n'{self.raw_cmd}' did not exit gracefully (trace log above), exit code: {self.exit_code}", exit_code=self.exit_code) + if len(args) >= 2 and args[1]: + log(args[1], level=logging.ERROR, fg='red') - self.ended = time.time() - with open(f'{self.cwd}/trace.log', 'wb') as fh: - fh.write(self.trace_log) + def __iter__(self, *args, **kwargs): + + for line in self.session: + yield line + + def __repr__(self, *args, **kwargs): + return self.session._trace_log.decode('UTF-8') + + def __json__(self): + return { + 'cmd': self.cmd, + 'callbacks': self._callbacks, + 'peak': self.peak_output, + 'environment_vars': self.environment_vars, + 'session': True if self.session else False + } + + def create_session(self): + if self.session: + return True try: - os.close(child_fd) - except: - pass + self.session = SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars) + + while self.session.ended is None: + self.session.poll() + + if self.peak_output: + sys.stdout.write('\n') + sys.stdout.flush() + + except SysCallError: + return False + + return True + + def decode(self, fmt='UTF-8'): + return self.session._trace_log.decode(fmt) + + @property + def exit_code(self): + return self.session.exit_code + + @property + def trace_log(self): + return self.session._trace_log def prerequisite_check(): @@ -317,5 +360,23 @@ def prerequisite_check(): return True + def reboot(): - o = b''.join(sys_command("/usr/bin/reboot")) + o = b''.join(SysCommand("/usr/bin/reboot")) + + +def pid_exists(pid: int): + try: + return any(subprocess.check_output(['/usr/bin/ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip()) + except subprocess.CalledProcessError: + return False + + +def run_custom_user_commands(commands, installation): + for index, command in enumerate(commands): + log(f'Executing custom command "{command}" ...', fg='yellow') + with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script: + temp_script.write(command) + execution_output = SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh") + log(execution_output) + os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh") |