index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Anton Hvornum <anton.feeds+github@gmail.com> | 2020-07-06 15:46:19 +0200 |
---|---|---|
committer | Anton Hvornum <anton.feeds+github@gmail.com> | 2020-07-06 15:46:19 +0200 |
commit | 94f8d90121a8bd51111ee6067a78f81282574414 (patch) | |
tree | 606d188d052b092c72e0055fd99297b9348372c7 /helpers | |
parent | f0bc987e1bea4d1243340be0e6dd1522ea2750ff (diff) |
-rw-r--r-- | helpers/disk.py | 67 | ||||
-rw-r--r-- | helpers/general.py | 5 |
diff --git a/helpers/disk.py b/helpers/disk.py index 1c0a544c..e3129d37 100644 --- a/helpers/disk.py +++ b/helpers/disk.py @@ -2,9 +2,15 @@ import glob, re, os, json from collections import OrderedDict from helpers.general import sys_command from exceptions import * +import ctypes +import ctypes.util +import os ROOT_DIR_PATTERN = re.compile('^.*?/devices') GPT = 0b00000001 +libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) +libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p) + class BlockDevice(): def __init__(self, path, info): @@ -32,6 +38,9 @@ class BlockDevice(): if not 'pkname' in self.info: raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.') return f"/dev/{self.info['pkname']}" + # if not stat.S_ISBLK(os.stat(full_path).st_mode): + # raise DiskError(f'Selected disk "{full_path}" is not a block device.') + @property def partitions(self): o = b''.join(sys_command(f'partprobe {self.path}')) @@ -50,11 +59,7 @@ class BlockDevice(): root_path = f"/dev/{r['blockdevices'][0]['name']}" for part in r['blockdevices'][0]['children']: part_id = part['name'][len(os.path.basename(self.path)):] - parts[part_id] = { - 'size' : part['size'], - 'id' : part_id, - 'path' : root_path + part_id - } + parts[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size']) return {k: parts[k] for k in sorted(parts)} @@ -71,6 +76,45 @@ class BlockDevice(): raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] +class Partition(): + def __init__(self, path, part_id=None, size=-1): + if not part_id: part_id = os.path.basename(path) + self.path = path + self.part_id = part_id + self.mountpoint = None + self.filesystem = None # TODO: Autodetect if we're reusing a partition + self.size = size # TODO: Refresh? + + def __repr__(self, *args, **kwargs): + return f'Partition({self.path})' + + def format(self, filesystem): + if filesystem == 'btrfs': + o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {self.path}')) + if not b'UUID' in o: + return False + self.filesystem = 'btrfs' + elif filesystem == 'fat32': + o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {self.path}')) + if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: + return None + return True + else: + raise DiskError(f'Fileformat {filesystem} is not yet implemented.') + return True + + def mount(self, target, fs=None, options=''): + if not fs: + if not self.filesystem: raise DiskError('Need to format (or define) the filesystem before mounting.') + fs = self.filesystem + # TODO: Move this to the BlockDevice or something. + ret = libc.mount(self.path.encode(), target.encode(), fs.encode(), 0, options.encode()) + if ret < 0: + errno = ctypes.get_errno() + raise OSError(errno, f"Error mounting {self.path} ({fs}) on {target} with options '{options}': {os.strerror(errno)}") + self.mountpoint = target + + class luks2(): def __init__(self, filesystem): self.filesystem = filesystem @@ -80,7 +124,7 @@ class luks2(): def __exit__(self, *args, **kwargs): # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - if len(args): + if len(args) >= 2 and args[1]: raise args[1] return True @@ -91,13 +135,13 @@ class luks2(): with open(key_file, 'wb') as fh: fh.write(password) - o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition["path"]}')) + o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition.path}')) if not b'Command successful.' in o: - raise DiskError(f'Could not encrypt volume "{partition["path"]}": {o}') + raise DiskError(f'Could not encrypt volume "{partition.path}": {o}') return key_file - def mount(self, partition, mountpoint, key_file): + def unlock(self, partition, mountpoint, key_file): """ Mounts a lukts2 compatible partition to a certain mountpoint. Keyfile must be specified as there's no way to interact with the pw-prompt atm. @@ -106,8 +150,9 @@ class luks2(): :type mountpoint: str """ if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead? - sys_command(f'/usr/bin/cryptsetup open {partition["path"]} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') - return os.path.islink(f'/dev/mapper/{mountpoint}') + sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') + if os.path.islink(f'/dev/mapper/{mountpoint}'): + return Partition(f'/dev/mapper/{mountpoint}') def close(self, mountpoint): sys_command(f'cryptsetup close /dev/mapper/{mountpoint}') diff --git a/helpers/general.py b/helpers/general.py index 036d2d68..32814ddc 100644 --- a/helpers/general.py +++ b/helpers/general.py @@ -16,6 +16,7 @@ class sys_command():#Thread): def __init__(self, cmd, callback=None, start_callback=None, *args, **kwargs): if not 'worker_id' in kwargs: kwargs['worker_id'] = gen_uid() if not 'emulate' in kwargs: kwargs['emulate'] = False + if not 'surpress_errors' in kwargs: kwargs['surpress_errors'] = False if kwargs['emulate']: log(f"Starting command '{cmd}' in emulation mode.") self.raw_cmd = cmd @@ -170,11 +171,9 @@ class sys_command():#Thread): if 'ignore_errors' in self.kwargs: self.exit_code = 0 - if self.exit_code != 0: + if self.exit_code != 0 and not self.kwargs['surpress_errors']: log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", origin='spawn', level=3) log(self.trace_log.decode('UTF-8'), origin='spawn', level=3) - #else: - #log(f"{self.cmd[0]} exit nicely.", origin='spawn', level=5) self.ended = time.time() with open(f'{self.cwd}/trace.log', 'wb') as fh: |