From 94f8d90121a8bd51111ee6067a78f81282574414 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Mon, 6 Jul 2020 15:46:19 +0200 Subject: Added a PArtition() class that supports mounting and formatting. Also reworked the installation flow a bit to be a bit more clear while sacrificing some automation. Maybe I'll revert some changes and 'automatically' do certain things, but for now this shouldn't impact anyone to much --- helpers/disk.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 11 deletions(-) (limited to 'helpers/disk.py') diff --git a/helpers/disk.py b/helpers/disk.py index 1c0a544c..e3129d37 100644 --- a/helpers/disk.py +++ b/helpers/disk.py @@ -2,9 +2,15 @@ import glob, re, os, json from collections import OrderedDict from helpers.general import sys_command from exceptions import * +import ctypes +import ctypes.util +import os ROOT_DIR_PATTERN = re.compile('^.*?/devices') GPT = 0b00000001 +libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) +libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p) + class BlockDevice(): def __init__(self, path, info): @@ -32,6 +38,9 @@ class BlockDevice(): if not 'pkname' in self.info: raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.') return f"/dev/{self.info['pkname']}" + # if not stat.S_ISBLK(os.stat(full_path).st_mode): + # raise DiskError(f'Selected disk "{full_path}" is not a block device.') + @property def partitions(self): o = b''.join(sys_command(f'partprobe {self.path}')) @@ -50,11 +59,7 @@ class BlockDevice(): root_path = f"/dev/{r['blockdevices'][0]['name']}" for part in r['blockdevices'][0]['children']: part_id = part['name'][len(os.path.basename(self.path)):] - parts[part_id] = { - 'size' : part['size'], - 'id' : part_id, - 'path' : root_path + part_id - } + parts[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size']) return {k: parts[k] for k in sorted(parts)} @@ -71,6 +76,45 @@ class BlockDevice(): raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] +class Partition(): + def __init__(self, path, part_id=None, size=-1): + if not part_id: part_id = os.path.basename(path) + self.path = path + self.part_id = part_id + self.mountpoint = None + self.filesystem = None # TODO: Autodetect if we're reusing a partition + self.size = size # TODO: Refresh? + + def __repr__(self, *args, **kwargs): + return f'Partition({self.path})' + + def format(self, filesystem): + if filesystem == 'btrfs': + o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {self.path}')) + if not b'UUID' in o: + return False + self.filesystem = 'btrfs' + elif filesystem == 'fat32': + o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {self.path}')) + if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o: + return None + return True + else: + raise DiskError(f'Fileformat {filesystem} is not yet implemented.') + return True + + def mount(self, target, fs=None, options=''): + if not fs: + if not self.filesystem: raise DiskError('Need to format (or define) the filesystem before mounting.') + fs = self.filesystem + # TODO: Move this to the BlockDevice or something. + ret = libc.mount(self.path.encode(), target.encode(), fs.encode(), 0, options.encode()) + if ret < 0: + errno = ctypes.get_errno() + raise OSError(errno, f"Error mounting {self.path} ({fs}) on {target} with options '{options}': {os.strerror(errno)}") + self.mountpoint = target + + class luks2(): def __init__(self, filesystem): self.filesystem = filesystem @@ -80,7 +124,7 @@ class luks2(): def __exit__(self, *args, **kwargs): # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - if len(args): + if len(args) >= 2 and args[1]: raise args[1] return True @@ -91,13 +135,13 @@ class luks2(): with open(key_file, 'wb') as fh: fh.write(password) - o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition["path"]}')) + o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition.path}')) if not b'Command successful.' in o: - raise DiskError(f'Could not encrypt volume "{partition["path"]}": {o}') + raise DiskError(f'Could not encrypt volume "{partition.path}": {o}') return key_file - def mount(self, partition, mountpoint, key_file): + def unlock(self, partition, mountpoint, key_file): """ Mounts a lukts2 compatible partition to a certain mountpoint. Keyfile must be specified as there's no way to interact with the pw-prompt atm. @@ -106,8 +150,9 @@ class luks2(): :type mountpoint: str """ if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead? - sys_command(f'/usr/bin/cryptsetup open {partition["path"]} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') - return os.path.islink(f'/dev/mapper/{mountpoint}') + sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2') + if os.path.islink(f'/dev/mapper/{mountpoint}'): + return Partition(f'/dev/mapper/{mountpoint}') def close(self, mountpoint): sys_command(f'cryptsetup close /dev/mapper/{mountpoint}') -- cgit v1.2.3-70-g09d2