Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/helpers
diff options
context:
space:
mode:
authorAnton Hvornum <anton.feeds+github@gmail.com>2020-07-06 15:46:19 +0200
committerAnton Hvornum <anton.feeds+github@gmail.com>2020-07-06 15:46:19 +0200
commit94f8d90121a8bd51111ee6067a78f81282574414 (patch)
tree606d188d052b092c72e0055fd99297b9348372c7 /helpers
parentf0bc987e1bea4d1243340be0e6dd1522ea2750ff (diff)
Added a PArtition() class that supports mounting and formatting. Also reworked the installation flow a bit to be a bit more clear while sacrificing some automation. Maybe I'll revert some changes and 'automatically' do certain things, but for now this shouldn't impact anyone to much
Diffstat (limited to 'helpers')
-rw-r--r--helpers/disk.py67
-rw-r--r--helpers/general.py5
2 files changed, 58 insertions, 14 deletions
diff --git a/helpers/disk.py b/helpers/disk.py
index 1c0a544c..e3129d37 100644
--- a/helpers/disk.py
+++ b/helpers/disk.py
@@ -2,9 +2,15 @@ import glob, re, os, json
from collections import OrderedDict
from helpers.general import sys_command
from exceptions import *
+import ctypes
+import ctypes.util
+import os
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GPT = 0b00000001
+libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
+libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
+
class BlockDevice():
def __init__(self, path, info):
@@ -32,6 +38,9 @@ class BlockDevice():
if not 'pkname' in self.info: raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
+ # if not stat.S_ISBLK(os.stat(full_path).st_mode):
+ # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
+
@property
def partitions(self):
o = b''.join(sys_command(f'partprobe {self.path}'))
@@ -50,11 +59,7 @@ class BlockDevice():
root_path = f"/dev/{r['blockdevices'][0]['name']}"
for part in r['blockdevices'][0]['children']:
part_id = part['name'][len(os.path.basename(self.path)):]
- parts[part_id] = {
- 'size' : part['size'],
- 'id' : part_id,
- 'path' : root_path + part_id
- }
+ parts[part_id] = Partition(root_path + part_id, part_id=part_id, size=part['size'])
return {k: parts[k] for k in sorted(parts)}
@@ -71,6 +76,45 @@ class BlockDevice():
raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key]
+class Partition():
+ def __init__(self, path, part_id=None, size=-1):
+ if not part_id: part_id = os.path.basename(path)
+ self.path = path
+ self.part_id = part_id
+ self.mountpoint = None
+ self.filesystem = None # TODO: Autodetect if we're reusing a partition
+ self.size = size # TODO: Refresh?
+
+ def __repr__(self, *args, **kwargs):
+ return f'Partition({self.path})'
+
+ def format(self, filesystem):
+ if filesystem == 'btrfs':
+ o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {self.path}'))
+ if not b'UUID' in o:
+ return False
+ self.filesystem = 'btrfs'
+ elif filesystem == 'fat32':
+ o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {self.path}'))
+ if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
+ return None
+ return True
+ else:
+ raise DiskError(f'Fileformat {filesystem} is not yet implemented.')
+ return True
+
+ def mount(self, target, fs=None, options=''):
+ if not fs:
+ if not self.filesystem: raise DiskError('Need to format (or define) the filesystem before mounting.')
+ fs = self.filesystem
+ # TODO: Move this to the BlockDevice or something.
+ ret = libc.mount(self.path.encode(), target.encode(), fs.encode(), 0, options.encode())
+ if ret < 0:
+ errno = ctypes.get_errno()
+ raise OSError(errno, f"Error mounting {self.path} ({fs}) on {target} with options '{options}': {os.strerror(errno)}")
+ self.mountpoint = target
+
+
class luks2():
def __init__(self, filesystem):
self.filesystem = filesystem
@@ -80,7 +124,7 @@ class luks2():
def __exit__(self, *args, **kwargs):
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
- if len(args):
+ if len(args) >= 2 and args[1]:
raise args[1]
return True
@@ -91,13 +135,13 @@ class luks2():
with open(key_file, 'wb') as fh:
fh.write(password)
- o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition["path"]}'))
+ o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition.path}'))
if not b'Command successful.' in o:
- raise DiskError(f'Could not encrypt volume "{partition["path"]}": {o}')
+ raise DiskError(f'Could not encrypt volume "{partition.path}": {o}')
return key_file
- def mount(self, partition, mountpoint, key_file):
+ def unlock(self, partition, mountpoint, key_file):
"""
Mounts a lukts2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
@@ -106,8 +150,9 @@ class luks2():
:type mountpoint: str
"""
if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead?
- sys_command(f'/usr/bin/cryptsetup open {partition["path"]} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
- return os.path.islink(f'/dev/mapper/{mountpoint}')
+ sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
+ if os.path.islink(f'/dev/mapper/{mountpoint}'):
+ return Partition(f'/dev/mapper/{mountpoint}')
def close(self, mountpoint):
sys_command(f'cryptsetup close /dev/mapper/{mountpoint}')
diff --git a/helpers/general.py b/helpers/general.py
index 036d2d68..32814ddc 100644
--- a/helpers/general.py
+++ b/helpers/general.py
@@ -16,6 +16,7 @@ class sys_command():#Thread):
def __init__(self, cmd, callback=None, start_callback=None, *args, **kwargs):
if not 'worker_id' in kwargs: kwargs['worker_id'] = gen_uid()
if not 'emulate' in kwargs: kwargs['emulate'] = False
+ if not 'surpress_errors' in kwargs: kwargs['surpress_errors'] = False
if kwargs['emulate']:
log(f"Starting command '{cmd}' in emulation mode.")
self.raw_cmd = cmd
@@ -170,11 +171,9 @@ class sys_command():#Thread):
if 'ignore_errors' in self.kwargs:
self.exit_code = 0
- if self.exit_code != 0:
+ if self.exit_code != 0 and not self.kwargs['surpress_errors']:
log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", origin='spawn', level=3)
log(self.trace_log.decode('UTF-8'), origin='spawn', level=3)
- #else:
- #log(f"{self.cmd[0]} exit nicely.", origin='spawn', level=5)
self.ended = time.time()
with open(f'{self.cwd}/trace.log', 'wb') as fh: