Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2022-02-02 14:22:52 +0100
committerGitHub <noreply@github.com>2022-02-02 14:22:52 +0100
commitc08520f9902aeb1b4ce22e1159060792081b0327 (patch)
tree934ac279ac0e6d675b409d7936c97ce73d721a6a
parentdfd064a57f6a0006e9fc614ea229fd9883722085 (diff)
SysCommand() to remove ANSII VT100 Esc codes & archlinux-keyring fix (#933)
* Fixed SysCommandWorker() so that it removes ANSII VT100 escape codes. I also moved package.py into it's own folder, as that's something I want to expand on a lot, so package related stuff should go in there. I created a installed_package() function which gets information about the locally installed package. I changed so that find_packages() and find_package() returns a data-model instead for the package information. This should unify and make sure we detect issues down the line. * Working on structuring .version constructor that works with BaseModel * Added version contructors to VersionDef(). Also added __eq__ and __lt__ to LocalPackage() and PackageSearchResult(). * removed debug and added a TODO * Removed whitespace * Removed mirror-database function from myrepo
-rw-r--r--archinstall/__init__.py17
-rw-r--r--archinstall/lib/exceptions.py3
-rw-r--r--archinstall/lib/general.py26
-rw-r--r--archinstall/lib/models/__init__.py129
-rw-r--r--archinstall/lib/packages.py66
-rw-r--r--archinstall/lib/packages/__init__.py0
-rw-r--r--archinstall/lib/packages/packages.py109
-rw-r--r--archinstall/lib/user_interaction.py3
-rw-r--r--examples/guided.py15
9 files changed, 293 insertions, 75 deletions
diff --git a/archinstall/__init__.py b/archinstall/__init__.py
index 12d6887b..1bae70c9 100644
--- a/archinstall/__init__.py
+++ b/archinstall/__init__.py
@@ -14,7 +14,22 @@ from .lib.luks import *
from .lib.mirrors import *
from .lib.networking import *
from .lib.output import *
-from .lib.packages import *
+from .lib.models import (
+ VersionDef,
+ PackageSearchResult,
+ PackageSearch,
+ LocalPackage
+)
+from .lib.packages.packages import (
+ find_group,
+ package_search,
+ IsGroup,
+ find_package,
+ download_package,
+ find_packages,
+ installed_package,
+ validate_package_list
+)
from .lib.profiles import *
from .lib.services import *
from .lib.storage import *
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index 783bc9c5..b89d1fcb 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -41,3 +41,6 @@ class UserError(BaseException):
class ServiceException(BaseException):
pass
+
+class PackageError(BaseException):
+ pass \ No newline at end of file
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index a3976234..a5444801 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -9,6 +9,7 @@ import subprocess
import string
import sys
import time
+import re
from datetime import datetime, date
from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
@@ -81,6 +82,18 @@ def locate_binary(name :str) -> str:
raise RequirementError(f"Binary {name} does not exist.")
+def clear_vt100_escape_codes(data :Union[bytes, str]):
+ # https://stackoverflow.com/a/43627833/929999
+ if type(data) == bytes:
+ vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8')
+ else:
+ vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]'
+
+ for match in re.findall(vt100_escape_regex, data, re.IGNORECASE):
+ data = data.replace(match, '' if type(data) == str else b'')
+
+ return data
+
def json_dumps(*args :str, **kwargs :str) -> str:
return json.dumps(*args, **{**kwargs, 'cls': JSON})
@@ -168,7 +181,8 @@ class SysCommandWorker:
peak_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None,
logfile :Optional[None] = None,
- working_directory :Optional[str] = './'):
+ working_directory :Optional[str] = './',
+ remove_vt100_escape_codes_from_lines :bool = True):
if not callbacks:
callbacks = {}
@@ -200,6 +214,7 @@ class SysCommandWorker:
self.child_fd :Optional[int] = None
self.started :Optional[float] = None
self.ended :Optional[float] = None
+ self.remove_vt100_escape_codes_from_lines :bool = remove_vt100_escape_codes_from_lines
def __contains__(self, key: bytes) -> bool:
"""
@@ -216,6 +231,9 @@ class SysCommandWorker:
def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]:
for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'):
if line:
+ if self.remove_vt100_escape_codes_from_lines:
+ line = clear_vt100_escape_codes(line)
+
yield line + b'\n'
self._trace_log_pos = self._trace_log.rfind(b'\n')
@@ -368,7 +386,8 @@ class SysCommand:
start_callback :Optional[Callable[[Any], Any]] = None,
peak_output :Optional[bool] = False,
environment_vars :Optional[Dict[str, Any]] = None,
- working_directory :Optional[str] = './'):
+ working_directory :Optional[str] = './',
+ remove_vt100_escape_codes_from_lines :bool = True):
_callbacks = {}
if callbacks:
@@ -382,6 +401,7 @@ class SysCommand:
self.peak_output = peak_output
self.environment_vars = environment_vars
self.working_directory = working_directory
+ self.remove_vt100_escape_codes_from_lines = remove_vt100_escape_codes_from_lines
self.session :Optional[SysCommandWorker] = None
self.create_session()
@@ -435,7 +455,7 @@ class SysCommand:
if self.session:
return self.session
- with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars) as session:
+ with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines) as session:
if not self.session:
self.session = session
diff --git a/archinstall/lib/models/__init__.py b/archinstall/lib/models/__init__.py
new file mode 100644
index 00000000..c7920cdf
--- /dev/null
+++ b/archinstall/lib/models/__init__.py
@@ -0,0 +1,129 @@
+from typing import Optional, List
+from pydantic import BaseModel, validator
+
+class VersionDef(BaseModel):
+ version_string: str
+
+ @classmethod
+ def parse_version(self) -> List[str]:
+ if '.' in self.version_string:
+ versions = self.version_string.split('.')
+ else:
+ versions = [self.version_string]
+
+ return versions
+
+ @classmethod
+ def major(self) -> str:
+ return self.parse_version()[0]
+
+ @classmethod
+ def minor(self) -> str:
+ versions = self.parse_version()
+ if len(versions) >= 2:
+ return versions[1]
+
+ @classmethod
+ def patch(self) -> str:
+ versions = self.parse_version()
+ if '-' in versions[-1]:
+ _, patch_version = versions[-1].split('-', 1)
+ return patch_version
+
+ def __eq__(self, other :'VersionDef') -> bool:
+ if other.major == self.major and \
+ other.minor == self.minor and \
+ other.patch == self.patch:
+
+ return True
+ return False
+
+ def __lt__(self, other :'VersionDef') -> bool:
+ if self.major > other.major:
+ return False
+ elif self.minor and other.minor and self.minor > other.minor:
+ return False
+ elif self.patch and other.patch and self.patch > other.patch:
+ return False
+
+ def __str__(self) -> str:
+ return self.version_string
+
+
+class PackageSearchResult(BaseModel):
+ pkgname: str
+ pkgbase: str
+ repo: str
+ arch: str
+ pkgver: str
+ pkgrel: str
+ epoch: int
+ pkgdesc: str
+ url: str
+ filename: str
+ compressed_size: int
+ installed_size: int
+ build_date: str
+ last_update: str
+ flag_date: Optional[str]
+ maintainers: List[str]
+ packager: str
+ groups: List[str]
+ licenses: List[str]
+ conflicts: List[str]
+ provides: List[str]
+ replaces: List[str]
+ depends: List[str]
+ optdepends: List[str]
+ makedepends: List[str]
+ checkdepends: List[str]
+
+ @property
+ def pkg_version(self) -> str:
+ return self.pkgver
+
+ def __eq__(self, other :'VersionDef') -> bool:
+ return self.pkg_version == other.pkg_version
+
+ def __lt__(self, other :'VersionDef') -> bool:
+ return self.pkg_version < other.pkg_version
+
+
+class PackageSearch(BaseModel):
+ version: int
+ limit: int
+ valid: bool
+ results: List[PackageSearchResult]
+
+
+class LocalPackage(BaseModel):
+ name: str
+ version: str
+ description:str
+ architecture: str
+ url: str
+ licenses: str
+ groups: str
+ depends_on: str
+ optional_deps: str
+ required_by: str
+ optional_for: str
+ conflicts_with: str
+ replaces: str
+ installed_size: str
+ packager: str
+ build_date: str
+ install_date: str
+ install_reason: str
+ install_script: str
+ validated_by: str
+
+ @property
+ def pkg_version(self) -> str:
+ return self.version
+
+ def __eq__(self, other :'VersionDef') -> bool:
+ return self.pkg_version == other.pkg_version
+
+ def __lt__(self, other :'VersionDef') -> bool:
+ return self.pkg_version < other.pkg_version \ No newline at end of file
diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py
deleted file mode 100644
index 1d46ef5e..00000000
--- a/archinstall/lib/packages.py
+++ /dev/null
@@ -1,66 +0,0 @@
-import json
-import ssl
-import urllib.error
-import urllib.parse
-import urllib.request
-from typing import Dict, Any
-
-from .exceptions import RequirementError
-
-BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}'
-BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
-
-
-def find_group(name :str) -> bool:
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- ssl_context.verify_mode = ssl.CERT_NONE
- try:
- response = urllib.request.urlopen(BASE_GROUP_URL.format(group=name), context=ssl_context)
- except urllib.error.HTTPError as err:
- if err.code == 404:
- return False
- else:
- raise err
-
- # Just to be sure some code didn't slip through the exception
- if response.code == 200:
- return True
-
-
-def find_package(name :str) -> Any:
- """
- Finds a specific package via the package database.
- It makes a simple web-request, which might be a bit slow.
- """
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- ssl_context.verify_mode = ssl.CERT_NONE
- response = urllib.request.urlopen(BASE_URL.format(package=name), context=ssl_context)
- data = response.read().decode('UTF-8')
- return json.loads(data)
-
-
-def find_packages(*names :str) -> Dict[str, Any]:
- """
- This function returns the search results for many packages.
- The function itself is rather slow, so consider not sending to
- many packages to the search query.
- """
- return {package: find_package(package) for package in names}
-
-
-def validate_package_list(packages: list) -> bool:
- """
- Validates a list of given packages.
- Raises `RequirementError` if one or more packages are not found.
- """
- invalid_packages = [
- package
- for package in packages
- if not find_package(package)['results'] and not find_group(package)
- ]
- if invalid_packages:
- raise RequirementError(f"Invalid package names: {invalid_packages}")
-
- return True
diff --git a/archinstall/lib/packages/__init__.py b/archinstall/lib/packages/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/archinstall/lib/packages/__init__.py
diff --git a/archinstall/lib/packages/packages.py b/archinstall/lib/packages/packages.py
new file mode 100644
index 00000000..7dc74b32
--- /dev/null
+++ b/archinstall/lib/packages/packages.py
@@ -0,0 +1,109 @@
+import ssl
+import urllib.request
+import json
+from typing import Dict, Any
+from ..general import SysCommand
+from ..models import PackageSearch, PackageSearchResult, LocalPackage
+from ..exceptions import PackageError, SysCallError, RequirementError
+
+BASE_URL_PKG_SEARCH = 'https://archlinux.org/packages/search/json/?name={package}'
+# BASE_URL_PKG_CONTENT = 'https://archlinux.org/packages/search/json/'
+BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
+
+
+def find_group(name :str) -> bool:
+ # TODO UPSTREAM: Implement /json/ for the groups search
+ ssl_context = ssl.create_default_context()
+ ssl_context.check_hostname = False
+ ssl_context.verify_mode = ssl.CERT_NONE
+ try:
+ response = urllib.request.urlopen(BASE_GROUP_URL.format(group=name), context=ssl_context)
+ except urllib.error.HTTPError as err:
+ if err.code == 404:
+ return False
+ else:
+ raise err
+
+ # Just to be sure some code didn't slip through the exception
+ if response.code == 200:
+ return True
+
+ return False
+
+def package_search(package :str) -> PackageSearch:
+ """
+ Finds a specific package via the package database.
+ It makes a simple web-request, which might be a bit slow.
+ """
+ # TODO UPSTREAM: Implement bulk search, either support name=X&name=Y or split on space (%20 or ' ')
+ # TODO: utilize pacman cache first, upstream second.
+ ssl_context = ssl.create_default_context()
+ ssl_context.check_hostname = False
+ ssl_context.verify_mode = ssl.CERT_NONE
+ response = urllib.request.urlopen(BASE_URL_PKG_SEARCH.format(package=package), context=ssl_context)
+
+ if response.code != 200:
+ raise PackageError(f"Could not locate package: [{response.code}] {response}")
+
+ data = response.read().decode('UTF-8')
+
+ return PackageSearch(**json.loads(data))
+
+class IsGroup(BaseException):
+ pass
+
+def find_package(package :str) -> PackageSearchResult:
+ data = package_search(package)
+
+ if not data.results:
+ # Check if the package is actually a group
+ if find_group(package):
+ # TODO: Until upstream adds a JSON result for group searches
+ # there is no way we're going to parse HTML reliably.
+ raise IsGroup("Implement group search")
+
+ raise PackageError(f"Could not locate {package} while looking for repository category")
+
+ # If we didn't find the package in the search results,
+ # odds are it's a group package
+ for result in data.results:
+ if result.pkgname == package:
+ return result
+
+ raise PackageError(f"Could not locate {package} in result while looking for repository category")
+
+def find_packages(*names :str) -> Dict[str, Any]:
+ """
+ This function returns the search results for many packages.
+ The function itself is rather slow, so consider not sending to
+ many packages to the search query.
+ """
+ return {package: find_package(package) for package in names}
+
+
+def validate_package_list(packages: list) -> bool:
+ """
+ Validates a list of given packages.
+ Raises `RequirementError` if one or more packages are not found.
+ """
+ invalid_packages = [
+ package
+ for package in packages
+ if not find_package(package)['results'] and not find_group(package)
+ ]
+ if invalid_packages:
+ raise RequirementError(f"Invalid package names: {invalid_packages}")
+
+ return True
+
+def installed_package(package :str) -> LocalPackage:
+ package_info = {}
+ try:
+ for line in SysCommand(f"pacman -Q --info {package}"):
+ if b':' in line:
+ key, value = line.decode().split(':', 1)
+ package_info[key.strip().lower().replace(' ', '_')] = value.strip()
+ except SysCallError:
+ pass
+
+ return LocalPackage(**package_info) \ No newline at end of file
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index d5cd9257..34ce5534 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -28,7 +28,8 @@ from .mirrors import list_mirrors
# TODO: Some inconsistencies between the selection processes.
# Some return the keys from the options, some the values?
-from .. import fs_types, validate_package_list
+from .disk.validators import fs_types
+from .packages.packages import validate_package_list
# TODO: These can be removed after the move to simple_menu.py
def get_terminal_height() -> int:
diff --git a/examples/guided.py b/examples/guided.py
index 22e0f883..45e213df 100644
--- a/examples/guided.py
+++ b/examples/guided.py
@@ -309,10 +309,17 @@ if not (archinstall.check_mirror_reachable() or archinstall.arguments.get('skip-
archinstall.log(f"Arch Linux mirrors are not reachable. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
exit(1)
-if not (archinstall.update_keyring() or archinstall.arguments.get('skip-keyring-update', False)):
- log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None))
- archinstall.log(f"Failed to update the keyring. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
- exit(1)
+if not archinstall.arguments.get('offline', False):
+ # If we want to check for keyring updates
+ # and the installed package version is lower than the upstream version
+ if archinstall.arguments.get('skip-keyring-update', False) is False and \
+ archinstall.installed_package('archlinux-keyring') < archinstall.find_package('archlinux-keyring'):
+
+ # Then we update the keyring in the ISO environment
+ if not archinstall.update_keyring():
+ log_file = os.path.join(archinstall.storage.get('LOG_PATH', None), archinstall.storage.get('LOG_FILE', None))
+ archinstall.log(f"Failed to update the keyring. Please check your internet connection and the log file '{log_file}'.", level=logging.INFO, fg="red")
+ exit(1)
load_config()
if not archinstall.arguments.get('silent'):