Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/Reflector.py
diff options
context:
space:
mode:
authorErich Eckner <git@eckner.net>2021-06-01 20:36:03 +0200
committerErich Eckner <git@eckner.net>2021-06-01 20:36:03 +0200
commit2de7c17dc3410f3e48e101da8664ca5dcf9748a6 (patch)
tree349b3e86845b65e1f0d256322b0fc52b67ef21c3 /Reflector.py
parente228fd9ee21bcaadf89586eea4a57fd5c3a3c0af (diff)
reflector-2020.12.tar.xz
Diffstat (limited to 'Reflector.py')
-rw-r--r--Reflector.py103
1 files changed, 27 insertions, 76 deletions
diff --git a/Reflector.py b/Reflector.py
index 1665090..bb56b56 100644
--- a/Reflector.py
+++ b/Reflector.py
@@ -33,14 +33,12 @@ import json
import logging
import os
import pipes
-import queue
import re
import shlex
import socket
import subprocess
import sys
import tempfile
-import threading
import time
import urllib.error
import urllib.request
@@ -55,14 +53,13 @@ DISPLAY_TIME_FORMAT = '%Y-%m-%d %H:%M:%S UTC'
PARSE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
PARSE_TIME_FORMAT_WITH_USEC = '%Y-%m-%dT%H:%M:%S.%fZ'
-DB_SUBPATH = 'core/os/x86_64/core.db'
+DB_SUBPATH = 'community/os/x86_64/community.db'
MIRROR_URL_FORMAT = '{0}{1}/os/{2}'
MIRRORLIST_ENTRY_FORMAT = "Server = " + MIRROR_URL_FORMAT + "\n"
DEFAULT_CONNECTION_TIMEOUT = 5
DEFAULT_CACHE_TIMEOUT = 300
-DEFAULT_N_THREADS = os.cpu_count()
SORT_TYPES = {
'age': 'last server synchronization',
@@ -129,7 +126,7 @@ def get_mirrorstatus(
return obj, mtime
except (IOError, urllib.error.URLError, socket.timeout) as err:
- raise MirrorStatusError(f'failed to retrieve mirrorstatus data: {err.__class__.__name__}: {err}')
+ raise MirrorStatusError(f'failed to retrieve mirrorstatus data: {err.__class__.__name__}: {err}') from err
# ------------------------------ Miscellaneous ------------------------------- #
@@ -170,7 +167,7 @@ def count_countries(mirrors):
# --------------------------------- Sorting ---------------------------------- #
-def sort(mirrors, by=None, n_threads=DEFAULT_N_THREADS): # pylint: disable=invalid-name
+def sort(mirrors, by=None): # pylint: disable=invalid-name
'''
Sort mirrors by different criteria.
'''
@@ -182,14 +179,14 @@ def sort(mirrors, by=None, n_threads=DEFAULT_N_THREADS): # pylint: disable=inva
mirrors.sort(key=lambda m: m['last_sync'], reverse=True)
elif by == 'rate':
- rates = rate(mirrors, n_threads=n_threads)
+ rates = rate(mirrors)
mirrors = sorted(mirrors, key=lambda m: rates[m['url']], reverse=True)
else:
try:
mirrors.sort(key=lambda m: m[by])
- except KeyError:
- raise MirrorStatusError('attempted to sort mirrors by unrecognized criterion: "{}"'.format(by))
+ except KeyError as err:
+ raise MirrorStatusError('attempted to sort mirrors by unrecognized criterion: "{}"'.format(by)) from err
return mirrors
@@ -241,9 +238,10 @@ def rate_http(db_url, connection_timeout=DEFAULT_CONNECTION_TIMEOUT):
return 0, 0
-def rate(mirrors, n_threads=DEFAULT_N_THREADS, connection_timeout=DEFAULT_CONNECTION_TIMEOUT):
+def rate(mirrors, connection_timeout=DEFAULT_CONNECTION_TIMEOUT):
'''
- Rate mirrors by timing the download the core repo's database for each one.
+ Rate mirrors by timing the download of the community repo's database from
+ each one.
'''
# Ensure that mirrors is not a generator so that its length can be determined.
if not isinstance(mirrors, tuple):
@@ -252,67 +250,28 @@ def rate(mirrors, n_threads=DEFAULT_N_THREADS, connection_timeout=DEFAULT_CONNEC
if not mirrors:
return None
- # At least 1 thread and not more than the number of mirrors.
- n_threads = max(1, min(n_threads, len(mirrors)))
-
- # URL input queue.
- q_in = queue.Queue()
- # URL, elapsed time and rate output queue.
- q_out = queue.Queue()
-
- def worker():
- while True:
- # To stop a thread, an integer will be inserted in the input queue. Each
- # thread will increment it and re-insert it until it equals the
- # threadcount. After encountering the integer, the thread exits the loop.
- url = q_in.get()
-
- if isinstance(url, int):
- if url < n_threads:
- q_in.put(url + 1)
-
- else:
- db_url = url + DB_SUBPATH
- scheme = urllib.parse.urlparse(url).scheme
-
- if scheme == 'rsync':
- time_delta, ratio = rate_rsync(db_url, connection_timeout)
- else:
- time_delta, ratio = rate_http(db_url, connection_timeout)
-
- q_out.put((url, time_delta, ratio))
-
- q_in.task_done()
-
- workers = tuple(threading.Thread(target=worker) for _ in range(n_threads))
- for wkr in workers:
- wkr.daemon = True
- wkr.start()
-
- url_len = max(len(m['url']) for m in mirrors)
logger = get_logger()
- for mir in mirrors:
- url = mir['url']
- logger.info('rating %s', url)
- q_in.put(url)
-
- # To exit the threads.
- q_in.put(0)
- q_in.join()
+ logger.info('rating %s mirror(s) by download speed', len(mirrors))
+ url_len = max(len(mir['url']) for mir in mirrors)
header_fmt = '{{:{:d}s}} {{:>14s}} {{:>9s}}'.format(url_len)
logger.info(header_fmt.format('Server', 'Rate', 'Time'))
fmt = '{{:{:d}s}} {{:8.2f}} KiB/s {{:7.2f}} s'.format(url_len)
- # Loop over the mirrors just to ensure that we get the rate for each mirror.
- # The value in the loop does not (necessarily) correspond to the mirror.
rates = dict()
- for _ in mirrors:
- url, dtime, ratio = q_out.get()
+ for mir in mirrors:
+ url = mir['url']
+ db_url = url + DB_SUBPATH
+ scheme = urllib.parse.urlparse(url).scheme
+
+ if scheme == 'rsync':
+ time_delta, ratio = rate_rsync(db_url, connection_timeout)
+ else:
+ time_delta, ratio = rate_http(db_url, connection_timeout)
+
kibps = ratio / 1024.0
- logger.info(fmt.format(url, kibps, dtime))
+ logger.info(fmt.format(url, kibps, time_delta))
rates[url] = ratio
- q_out.task_done()
return rates
@@ -325,7 +284,7 @@ class MirrorStatusError(Exception):
'''
def __init__(self, msg):
- super(MirrorStatusError, self).__init__()
+ super().__init__()
self.msg = msg
def __str__(self):
@@ -511,13 +470,11 @@ class MirrorStatus():
connection_timeout=DEFAULT_CONNECTION_TIMEOUT,
cache_timeout=DEFAULT_CACHE_TIMEOUT,
min_completion_pct=1.0,
- threads=DEFAULT_N_THREADS,
url=URL
):
self.connection_timeout = connection_timeout
self.cache_timeout = cache_timeout
self.min_completion_pct = min_completion_pct
- self.threads = threads
self.url = url
self.mirror_status = None
@@ -549,8 +506,8 @@ class MirrorStatus():
obj = self.get_obj()
try:
return obj['urls']
- except KeyError:
- raise MirrorStatusError('no mirrors detected in mirror status output')
+ except KeyError as err:
+ raise MirrorStatusError('no mirrors detected in mirror status output') from err
def filter(self, mirrors=None, **kwargs):
'''
@@ -567,7 +524,7 @@ class MirrorStatus():
'''
if mirrors is None:
mirrors = self.get_mirrors()
- yield from sort(mirrors, n_threads=self.threads, **kwargs)
+ yield from sort(mirrors, **kwargs)
def rate(self, mirrors=None, **kwargs):
'''
@@ -575,7 +532,7 @@ class MirrorStatus():
'''
if mirrors is None:
mirrors = self.get_mirrors()
- yield from sort(mirrors, n_threads=self.threads, by='rate', **kwargs)
+ yield from sort(mirrors, by='rate', **kwargs)
def get_mirrorlist(self, mirrors=None, include_country=False, cmd=None):
'''
@@ -676,11 +633,6 @@ def add_arguments(parser):
)
parser.add_argument(
- '--threads', type=int, metavar='n', default=DEFAULT_N_THREADS,
- help='The maximum number of threads to use when rating mirrors. Keep in mind that this may skew your results if your connection is saturated. Default: %(default)s (number of detected CPUs)'
- )
-
- parser.add_argument(
'--verbose', action='store_true',
help='Print extra information to STDERR. Only works with some options.'
)
@@ -814,7 +766,6 @@ def process_options(options, mirrorstatus=None, mirrors=None):
# download_timeout=options.download_timeout,
cache_timeout=options.cache_timeout,
min_completion_pct=(options.completion_percent / 100.),
- threads=options.threads,
url=options.url
)