index : reflector32 | |
Archlinux32 fork of reflector | gitolite user |
summaryrefslogtreecommitdiff |
author | Erich Eckner <git@eckner.net> | 2021-06-01 20:36:03 +0200 |
---|---|---|
committer | Erich Eckner <git@eckner.net> | 2021-06-01 20:36:03 +0200 |
commit | 2de7c17dc3410f3e48e101da8664ca5dcf9748a6 (patch) | |
tree | 349b3e86845b65e1f0d256322b0fc52b67ef21c3 /Reflector.py | |
parent | e228fd9ee21bcaadf89586eea4a57fd5c3a3c0af (diff) |
-rw-r--r-- | Reflector.py | 103 |
diff --git a/Reflector.py b/Reflector.py index 1665090..bb56b56 100644 --- a/Reflector.py +++ b/Reflector.py @@ -33,14 +33,12 @@ import json import logging import os import pipes -import queue import re import shlex import socket import subprocess import sys import tempfile -import threading import time import urllib.error import urllib.request @@ -55,14 +53,13 @@ DISPLAY_TIME_FORMAT = '%Y-%m-%d %H:%M:%S UTC' PARSE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ' PARSE_TIME_FORMAT_WITH_USEC = '%Y-%m-%dT%H:%M:%S.%fZ' -DB_SUBPATH = 'core/os/x86_64/core.db' +DB_SUBPATH = 'community/os/x86_64/community.db' MIRROR_URL_FORMAT = '{0}{1}/os/{2}' MIRRORLIST_ENTRY_FORMAT = "Server = " + MIRROR_URL_FORMAT + "\n" DEFAULT_CONNECTION_TIMEOUT = 5 DEFAULT_CACHE_TIMEOUT = 300 -DEFAULT_N_THREADS = os.cpu_count() SORT_TYPES = { 'age': 'last server synchronization', @@ -129,7 +126,7 @@ def get_mirrorstatus( return obj, mtime except (IOError, urllib.error.URLError, socket.timeout) as err: - raise MirrorStatusError(f'failed to retrieve mirrorstatus data: {err.__class__.__name__}: {err}') + raise MirrorStatusError(f'failed to retrieve mirrorstatus data: {err.__class__.__name__}: {err}') from err # ------------------------------ Miscellaneous ------------------------------- # @@ -170,7 +167,7 @@ def count_countries(mirrors): # --------------------------------- Sorting ---------------------------------- # -def sort(mirrors, by=None, n_threads=DEFAULT_N_THREADS): # pylint: disable=invalid-name +def sort(mirrors, by=None): # pylint: disable=invalid-name ''' Sort mirrors by different criteria. ''' @@ -182,14 +179,14 @@ def sort(mirrors, by=None, n_threads=DEFAULT_N_THREADS): # pylint: disable=inva mirrors.sort(key=lambda m: m['last_sync'], reverse=True) elif by == 'rate': - rates = rate(mirrors, n_threads=n_threads) + rates = rate(mirrors) mirrors = sorted(mirrors, key=lambda m: rates[m['url']], reverse=True) else: try: mirrors.sort(key=lambda m: m[by]) - except KeyError: - raise MirrorStatusError('attempted to sort mirrors by unrecognized criterion: "{}"'.format(by)) + except KeyError as err: + raise MirrorStatusError('attempted to sort mirrors by unrecognized criterion: "{}"'.format(by)) from err return mirrors @@ -241,9 +238,10 @@ def rate_http(db_url, connection_timeout=DEFAULT_CONNECTION_TIMEOUT): return 0, 0 -def rate(mirrors, n_threads=DEFAULT_N_THREADS, connection_timeout=DEFAULT_CONNECTION_TIMEOUT): +def rate(mirrors, connection_timeout=DEFAULT_CONNECTION_TIMEOUT): ''' - Rate mirrors by timing the download the core repo's database for each one. + Rate mirrors by timing the download of the community repo's database from + each one. ''' # Ensure that mirrors is not a generator so that its length can be determined. if not isinstance(mirrors, tuple): @@ -252,67 +250,28 @@ def rate(mirrors, n_threads=DEFAULT_N_THREADS, connection_timeout=DEFAULT_CONNEC if not mirrors: return None - # At least 1 thread and not more than the number of mirrors. - n_threads = max(1, min(n_threads, len(mirrors))) - - # URL input queue. - q_in = queue.Queue() - # URL, elapsed time and rate output queue. - q_out = queue.Queue() - - def worker(): - while True: - # To stop a thread, an integer will be inserted in the input queue. Each - # thread will increment it and re-insert it until it equals the - # threadcount. After encountering the integer, the thread exits the loop. - url = q_in.get() - - if isinstance(url, int): - if url < n_threads: - q_in.put(url + 1) - - else: - db_url = url + DB_SUBPATH - scheme = urllib.parse.urlparse(url).scheme - - if scheme == 'rsync': - time_delta, ratio = rate_rsync(db_url, connection_timeout) - else: - time_delta, ratio = rate_http(db_url, connection_timeout) - - q_out.put((url, time_delta, ratio)) - - q_in.task_done() - - workers = tuple(threading.Thread(target=worker) for _ in range(n_threads)) - for wkr in workers: - wkr.daemon = True - wkr.start() - - url_len = max(len(m['url']) for m in mirrors) logger = get_logger() - for mir in mirrors: - url = mir['url'] - logger.info('rating %s', url) - q_in.put(url) - - # To exit the threads. - q_in.put(0) - q_in.join() + logger.info('rating %s mirror(s) by download speed', len(mirrors)) + url_len = max(len(mir['url']) for mir in mirrors) header_fmt = '{{:{:d}s}} {{:>14s}} {{:>9s}}'.format(url_len) logger.info(header_fmt.format('Server', 'Rate', 'Time')) fmt = '{{:{:d}s}} {{:8.2f}} KiB/s {{:7.2f}} s'.format(url_len) - # Loop over the mirrors just to ensure that we get the rate for each mirror. - # The value in the loop does not (necessarily) correspond to the mirror. rates = dict() - for _ in mirrors: - url, dtime, ratio = q_out.get() + for mir in mirrors: + url = mir['url'] + db_url = url + DB_SUBPATH + scheme = urllib.parse.urlparse(url).scheme + + if scheme == 'rsync': + time_delta, ratio = rate_rsync(db_url, connection_timeout) + else: + time_delta, ratio = rate_http(db_url, connection_timeout) + kibps = ratio / 1024.0 - logger.info(fmt.format(url, kibps, dtime)) + logger.info(fmt.format(url, kibps, time_delta)) rates[url] = ratio - q_out.task_done() return rates @@ -325,7 +284,7 @@ class MirrorStatusError(Exception): ''' def __init__(self, msg): - super(MirrorStatusError, self).__init__() + super().__init__() self.msg = msg def __str__(self): @@ -511,13 +470,11 @@ class MirrorStatus(): connection_timeout=DEFAULT_CONNECTION_TIMEOUT, cache_timeout=DEFAULT_CACHE_TIMEOUT, min_completion_pct=1.0, - threads=DEFAULT_N_THREADS, url=URL ): self.connection_timeout = connection_timeout self.cache_timeout = cache_timeout self.min_completion_pct = min_completion_pct - self.threads = threads self.url = url self.mirror_status = None @@ -549,8 +506,8 @@ class MirrorStatus(): obj = self.get_obj() try: return obj['urls'] - except KeyError: - raise MirrorStatusError('no mirrors detected in mirror status output') + except KeyError as err: + raise MirrorStatusError('no mirrors detected in mirror status output') from err def filter(self, mirrors=None, **kwargs): ''' @@ -567,7 +524,7 @@ class MirrorStatus(): ''' if mirrors is None: mirrors = self.get_mirrors() - yield from sort(mirrors, n_threads=self.threads, **kwargs) + yield from sort(mirrors, **kwargs) def rate(self, mirrors=None, **kwargs): ''' @@ -575,7 +532,7 @@ class MirrorStatus(): ''' if mirrors is None: mirrors = self.get_mirrors() - yield from sort(mirrors, n_threads=self.threads, by='rate', **kwargs) + yield from sort(mirrors, by='rate', **kwargs) def get_mirrorlist(self, mirrors=None, include_country=False, cmd=None): ''' @@ -676,11 +633,6 @@ def add_arguments(parser): ) parser.add_argument( - '--threads', type=int, metavar='n', default=DEFAULT_N_THREADS, - help='The maximum number of threads to use when rating mirrors. Keep in mind that this may skew your results if your connection is saturated. Default: %(default)s (number of detected CPUs)' - ) - - parser.add_argument( '--verbose', action='store_true', help='Print extra information to STDERR. Only works with some options.' ) @@ -814,7 +766,6 @@ def process_options(options, mirrorstatus=None, mirrors=None): # download_timeout=options.download_timeout, cache_timeout=options.cache_timeout, min_completion_pct=(options.completion_percent / 100.), - threads=options.threads, url=options.url ) |