From 99faf7f217f6566157a65bc0870871b54c226993 Mon Sep 17 00:00:00 2001 From: Ivan Pozdeev Date: Sun, 29 Jun 2025 20:12:44 +0300 Subject: [PATCH 1/2] Add a script to generate CPython installation scripts [no ci] --- plugins/python-build/scripts/add_cpython.py | 567 ++++++++++++++++++ plugins/python-build/scripts/requirements.txt | 7 + 2 files changed, 574 insertions(+) create mode 100644 plugins/python-build/scripts/add_cpython.py diff --git a/plugins/python-build/scripts/add_cpython.py b/plugins/python-build/scripts/add_cpython.py new file mode 100644 index 00000000..a21ae661 --- /dev/null +++ b/plugins/python-build/scripts/add_cpython.py @@ -0,0 +1,567 @@ +#!/usr/bin/env python3 +"""Script to add CPython releases. + +Checks the CPython download archives for new versions, +then writes a build script for any which do not exist locally, +saving it to plugins/python-build/share/python-build. + +""" +import argparse +import dataclasses +import hashlib +import io +import itertools +import logging +import operator +import pathlib +import pprint +import re +import sys +import typing +import urllib.parse + +import jc +import more_itertools +import packaging.version +import requests +import requests_html +import sortedcontainers +import tqdm + +logger = logging.getLogger(__name__) + +REPO = "https://www.python.org/ftp/python/" + +CUTOFF_VERSION=packaging.version.Version('3.9') +EXCLUDED_VERSIONS= { + packaging.version.Version("3.9.3") #recalled upstream +} + +here = pathlib.Path(__file__).resolve() +OUT_DIR: pathlib.Path = here.parent.parent / "share" / "python-build" + +T_THUNK=\ +'''export PYTHON_BUILD_FREE_THREADING=1 +source "${BASH_SOURCE[0]%t}"''' + + +def adapt_script(version: packaging.version.Version, + previous_version: packaging.version.Version) -> typing.Union[pathlib.Path, None]: + + previous_version_path = OUT_DIR.joinpath(str(previous_version)) + + with previous_version_path.open("r", encoding='utf-8') as f: + script = f.readlines() + result = io.StringIO() + for line in script: + if m:=re.match(r'\s*install_package\s+"(?PPython-\S+)"\s+' + r'"(?P\S+)"\s+.*\s+verify_py(?P\d+)\s+.*$', + line): + existing_url_path = urllib.parse.urlparse(m.group('url')).path + try: + matched_download = more_itertools.one( + item for item in VersionDirectory.available[version].downloads + if existing_url_path.endswith(item.extension)) + except ValueError: + logger.error(f'Cannot match existing URL path\'s {existing_url_path} extension ' + f'to available downloads {VersionDirectory.available[version].downloads}') + return + new_package_name, new_package_url = matched_download.package_name, matched_download.url + new_package_hash = Url.sha256_url(new_package_url, VersionDirectory.session) + + verify_py_suffix = str(version.major)+str(version.minor) + + line = Re.sub_groups(m, + package=new_package_name, + url=new_package_url+'#'+new_package_hash, + verify_py_suffix=verify_py_suffix) + + elif m:=re.match(r'\s*install_package\s+"(?Popenssl-\S+)"\s+' + r'"(?P\S+)"\s.*$', + line): + item = VersionDirectory.openssl.get_store_latest_release() + + line = Re.sub_groups(m, + package=item.package_name, + url=item.url + '#' + item.hash) + + elif m:=re.match(r'\s*install_package\s+"(?Preadline-\S+)"\s+' + r'"(?P\S+)"\s.*$', + line): + item = VersionDirectory.readline.get_store_latest_release() + + line = Re.sub_groups(m, + package=item.package_name, + url=item.url + '#' + item.hash) + + result.write(line) + + result_path = OUT_DIR.joinpath(str(version)) + logger.info(f"Writing {result_path}") + result_path.write_text(result.getvalue(), encoding='utf-8') + result.close() + + return result_path + +def add_version(version: packaging.version.Version): + + previous_version = VersionDirectory.existing.pick_previous_version(version).version + + is_prerelease_upgrade = previous_version.major==version.major\ + and previous_version.minor==version.minor\ + and previous_version.micro==version.micro + + logger.info(f"Adding {version} based on {previous_version}" + + (" (prerelease upgrade)" if is_prerelease_upgrade else "")) + + VersionDirectory.available.get_store_available_source_downloads(version) + + new_path = adapt_script(version, + previous_version) + if not new_path: + return False + VersionDirectory.existing.append(_CPythonExistingScriptInfo(version,str(new_path))) + + cleanup_prerelease_upgrade(is_prerelease_upgrade, previous_version) + + handle_t_thunks(version, previous_version, is_prerelease_upgrade) + + return True + + +def cleanup_prerelease_upgrade( + is_prerelease_upgrade: bool, + previous_version: packaging.version.Version)\ + -> None: + if is_prerelease_upgrade: + previous_version_path = OUT_DIR / str(previous_version) + logger.info(f'Deleting {previous_version_path}') + previous_version_path.unlink() + del VersionDirectory.existing[previous_version] + + +def handle_t_thunks(version, previous_version, is_prerelease_upgrade): + if (version.major, version.minor) >= (3, 13): + # an old thunk may have older version-specific code + # so it's safer to write a known version-independent template + thunk_path = OUT_DIR.joinpath(str(version) + "t") + logger.info(f"Writing {thunk_path}") + thunk_path.write_text(T_THUNK, encoding='utf-8') + if is_prerelease_upgrade: + previous_thunk_path = OUT_DIR.joinpath(str(previous_version) + "t") + logger.info(f"Deleting {previous_thunk_path}") + previous_thunk_path.unlink() + +Arguments: argparse.Namespace + +def main(): + global Arguments + Arguments = parse_args() + logging.basicConfig(level=logging.DEBUG if Arguments.verbose else logging.INFO) + + cached_session=requests_html.HTMLSession() + global VersionDirectory + VersionDirectory = _VersionDirectory(cached_session) + + VersionDirectory.existing.populate() + VersionDirectory.available.populate() + + for initial_release in (v for v in frozenset(VersionDirectory.available.keys()) + if v.micro == 0 and v not in VersionDirectory.existing): + # may actually be a prerelease + VersionDirectory.available.get_store_available_source_downloads(initial_release, True) + del initial_release + + versions_to_add = sorted(VersionDirectory.available.keys() - VersionDirectory.existing.keys()) + + logger.info("Versions to add:\n"+pprint.pformat(versions_to_add)) + result = False + for version_to_add in versions_to_add: + result = add_version(version_to_add) or result + return int(not result) + +def parse_args(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "-d", "--dry-run", action="store_true", + help="Do not write scripts, just report them to stdout", + ) + parser.add_argument( + "-v", "--verbose", action="store_true", default=0, + help="Increase verbosity of logging", + ) + parsed = parser.parse_args() + return parsed + + +T = typing.TypeVar('T', bound=object) + +K = typing.TypeVar('K', bound=typing.Hashable) + +class KeyedList(typing.List[T], typing.Mapping[K, T]): + key_field: str + item_init: typing.Callable[..., T] = None + + def __init__(self, seq: typing.Union[typing.Iterable[T], None] = None): + super().__init__() + self._map = {} + if seq is not None: + self.__iadd__(seq) + + # read + + def __getitem__(self, key: K) -> T: + return self._map[key] + + def __contains__(self, key: K): + return key in self._map + + def keys(self) -> typing.AbstractSet[K]: + return self._map.keys() + + # write + + def append(self, item: T) -> None: + key = self._getkey(item) + if key in self: + raise ValueError(f"Key '{key:r}' already present") + super().append(item) + self._map[key] = item + + def __iadd__(self, other: typing.Iterable[T]): + for item in other: + self.append(item) + return self + + def __delitem__(self, key: K): + super().remove(self[key]) + del self._map[key] + + def clear(self): + super().__delitem__(slice(None,None)) + self._map.clear() + + # read-write + + def get_or_create(self, key: K, **kwargs): + try: + return self[key] + except KeyError as e: + if self.item_init is None: + raise AttributeError("'item_init' must be set to use automatic item creation") from e + kwargs[self.key_field] = key + item = self.item_init(**kwargs) + self.append(item) + return item + + # info + + def __repr__(self): + return self.__class__.__name__ + "([" + ", ".join(repr(i) for i in self) + "])" + + # private + + def _getkey(self, item: T) -> K: + return getattr(item, self.key_field) + +del T, K + +@dataclasses.dataclass(frozen=True) +class _CPythonAvailableVersionDownloadInfo: + extension: str + package_name: str + url: str + +class _CPythonAvailableVersionDownloadsDirectory(KeyedList[_CPythonAvailableVersionDownloadInfo, str]): + key_field = "extension" + + +@dataclasses.dataclass(frozen=True) +class _CPythonAvailableVersionInfo: + version: packaging.version.Version + download_page_url: str + downloads: _CPythonAvailableVersionDownloadsDirectory = dataclasses.field( + default_factory=lambda:_CPythonAvailableVersionDownloadsDirectory() + ) + + +class CPythonAvailableVersionsDirectory(KeyedList[_CPythonAvailableVersionInfo, packaging.version.Version]): + key_field = "version" + _session: requests.Session + item_init = _CPythonAvailableVersionInfo + + def __init__(self, session: requests.Session, seq=None): + super().__init__(seq) + self._session = session + + def populate(self, url=REPO, pattern=r'^\d+'): + """ + Fetch remote versions + """ + logger.info("Fetching available CPython versions") + for name, url in DownloadPage.enum_download_entries(url, pattern, self._session): + v = packaging.version.Version(name) + if v < CUTOFF_VERSION or v in EXCLUDED_VERSIONS: + continue + logger.debug(f'Available version: {name} ({v}), {url}') + self.append(_CPythonAvailableVersionInfo( + v, + url + )) + + def get_store_available_source_downloads(self, version, refine_mode=False): + entry = self[version] + if entry.downloads: + #already retrieved + return + additional_versions_found =\ + CPythonAvailableVersionsDirectory(self._session) if refine_mode else None + exact_download_found = False + for name, url in DownloadPage.enum_download_entries( + entry.download_page_url, + r'Python-.*\.(tar\.xz|tgz)$', + self._session): + m = re.match(r'(?PPython-(?P.*))\.(?Ptar\.xz|tgz)$', name) + + download_version = packaging.version.Version(m.group("version")) + if download_version != version: + if not refine_mode: + raise ValueError(f"Unexpectedly found a download {name} for {download_version} " + f"at page {entry.download_page_url} for {version}") + entry_to_fill = additional_versions_found.get_or_create( + download_version, + download_page_url=entry.download_page_url + ) + else: + exact_download_found = True + entry_to_fill = entry + + entry_to_fill.downloads.append(_CPythonAvailableVersionDownloadInfo( + m.group("extension"), m.group('package'), url + )) + + if not exact_download_found: + del self[version] + + self.append( + additional_versions_found[ + max(additional_versions_found.keys()) + ]) + + +class _CPythonExistingScriptInfo(typing.NamedTuple): + version: packaging.version.Version + filename: str + +class CPythonExistingScriptsDirectory(KeyedList[_CPythonExistingScriptInfo, packaging.version.Version]): + key_field = "version" + _filename_pattern = r'^\d+\.\d+(?:(t?)(-\w+)|(.\d+((?:a|b|rc)\d)?(t?)))$' + + def populate(self): + """ + Enumerate existing installation scripts in share/python-build/ by pattern + """ + logger.info(f"Enumerating existing versions in {OUT_DIR}") + for entry_name in (p.name for p in OUT_DIR.iterdir() if p.is_file()): + if (not (m := re.match(self._filename_pattern, entry_name)) + or m.group(1) == 't' or m.group(5) == 't'): + continue + try: + v = packaging.version.Version(entry_name) + # branch tip scrpts are different from release scripts and thus unusable as a pattern + if v.dev is not None: + continue + logger.debug(f"Existing version {v}") + + self.append(_CPythonExistingScriptInfo(v, entry_name)) + + except ValueError as e: + logger.error(f"Unable to parse existing version {entry_name}: {e}") + + def pick_previous_version(self, + version: packaging.version.Version) \ + -> _CPythonExistingScriptInfo: + return max(v for v in self if v.version < version) + + +class _OpenSSLVersionInfo(typing.NamedTuple): + version: packaging.version.Version + package_name: str + url: str + hash: str + +class OpenSSLVersionsDirectory(KeyedList[_OpenSSLVersionInfo, packaging.version.Version]): + key_field = "version" + + def get_store_latest_release(self) \ + -> _OpenSSLVersionInfo: + if self: + #already retrieved + return self[max(self.keys())] + + j = requests.get("https://api.github.com/repos/openssl/openssl/releases/latest").json() + # noinspection PyTypeChecker + # urlparse can parse str as well as bytes + shasum_url = more_itertools.one( + asset['browser_download_url'] + for asset in j['assets'] + if urllib.parse.urlparse(asset['browser_download_url']).path.split('/')[-1].endswith('.sha256') + ) + shasum_text = requests.get(shasum_url).text + shasum_data = jc.parse("hashsum", shasum_text, quiet=True)[0] + package_hash, package_filename = shasum_data["hash"], shasum_data["filename"] + del shasum_data, shasum_text, shasum_url + + # OpenSSL Github repo has tag names "openssl-" as of this writing like we need + # but let's not rely on that + # splitext doesn't work with a chained extension, it only splits off the last one + package_name, package_version_str = re.match(r"([^-]+-(.*?))\.\D", package_filename).groups() + package_version = packaging.version.Version(package_version_str) + + package_url = more_itertools.one( + asset['browser_download_url'] + for asset in j['assets'] + if urllib.parse.urlparse(asset['browser_download_url']).path.split('/')[-1] == package_filename + ) + + result = _OpenSSLVersionInfo(package_version, package_name, package_url, package_hash) + self.append(result) + + return result + + +class _ReadlineVersionInfo(typing.NamedTuple): + version : packaging.version.Version + package_name : str + url : str + hash : str + +class ReadlineVersionsDirectory(KeyedList[_ReadlineVersionInfo, packaging.version.Version]): + key_field = "version" + + def get_store_latest_release(self): + if not self: + self._store_latest_release() + return self._latest_release() + + def _store_latest_release(self): + candidates = ReadlineVersionsDirectory() + + pattern = r'(?Preadline-(?P\d+(?:\.\d+)+)).tar\.gz$' + for name, url in DownloadPage.enum_download_entries( + 'https://ftpmirror.gnu.org/readline/', pattern, VersionDirectory.session): + m = re.match(pattern, name) + version = packaging.version.Version(m.group('version')) + candidates.append(_ReadlineVersionInfo( + version, + m.group('package_name'), + url, + "" + )) + max_item = candidates._latest_release() + hash_ = Url.sha256_url(max_item.url, VersionDirectory.session) + + result = _ReadlineVersionInfo( + max_item.version, + max_item.package_name, + max_item.url, + hash_) + self.append(result) + + return result + + def _latest_release(self): + return self[max(self.keys())] + +class _VersionDirectory: + def __init__(self, session): + self.existing = CPythonExistingScriptsDirectory() + self.available = CPythonAvailableVersionsDirectory(session) + self.openssl = OpenSSLVersionsDirectory() + self.readline = ReadlineVersionsDirectory() + self.session = session +VersionDirectory : _VersionDirectory + +class DownloadPage: + class _DownloadPageEntry(typing.NamedTuple): + name: str + url: str + + @classmethod + def enum_download_entries(cls, url, pattern, session=None) -> typing.Generator[_DownloadPageEntry, None, None]: + if session is None: + session = requests_html.HTMLSession() + response = session.get(url) + page = response.html + table = page.find("pre", first=True) + # the first entry is ".." + links = table.find("a")[1:] + for link in links: + name = link.text.rstrip('/') + if not re.match(pattern, name): + continue + yield cls._DownloadPageEntry(name, urllib.parse.urljoin(response.url, link.attrs['href'])) + + +class Re: + @dataclasses.dataclass + class _interval: + group: typing.Union[int, str, None] + start: int + end: int + @staticmethod + def sub_groups(match: re.Match, + /, *args: [typing.AnyStr], + **kwargs: [typing.AnyStr])\ + -> typing.AnyStr: + repls={i:repl for i,repl in enumerate(args) if repl is not None} + repls.update({n:repl for n,repl in kwargs.items() if repl is not None}) + + intervals: sortedcontainers.SortedList[Re._interval]=\ + sortedcontainers.SortedKeyList(key=operator.attrgetter("start","end")) + + for group_id in itertools.chain(range(1,len(match.groups())), match.groupdict().keys()): + if group_id not in repls: + continue + if match.start(group_id) == -1: + continue + intervals.add(Re._interval(group_id,match.start(group_id),match.end(group_id))) + del group_id + + last_interval=Re._interval(None,0,0) + result="" + for interval in intervals: + if interval.start < last_interval.end: + raise ValueError(f"Cannot replace intersecting matches " + f"for groups {last_interval.group} and {interval.group} " + f"(position {interval.start})") + if interval.end == interval.start and \ + last_interval.start == last_interval.end == interval.start: + raise ValueError(f"Cannot replace consecutive zero-length matches " + f"for groups {last_interval.group} and {interval.group} " + f"(position {interval.start})") + + result+=match.string[last_interval.end:interval.start]+repls[interval.group] + last_interval = interval + result+=match.string[last_interval.end:] + + return result + +class Url: + @staticmethod + def sha256_url(url, session=None): + if session is None: + session = requests_html.HTMLSession() + logger.info(f"Downloading and computing hash of {url}") + h=hashlib.sha256() + r=session.get(url,stream=True) + total_bytes=int(r.headers.get('content-length',0)) or float('inf') + with tqdm.tqdm(total=total_bytes, unit='B', unit_scale=True, unit_divisor=1024) as t: + for c in r.iter_content(1024): + t.update(len(c)) + h.update(c) + return h.hexdigest() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/plugins/python-build/scripts/requirements.txt b/plugins/python-build/scripts/requirements.txt index 995ee355..ec87e231 100644 --- a/plugins/python-build/scripts/requirements.txt +++ b/plugins/python-build/scripts/requirements.txt @@ -1,2 +1,9 @@ requests-html lxml[html_clean] +packaging~=20.4 +requests~=2.32.4 +sortedcontainers~=2.4.0 +tqdm~=4.51.0 +pandas~=1.5.3 +numpy~=1.24.2 +jc @ git+https://github.com/native-api/jc@haslib_mode From ff94d498db8b0bbf050d8a1cffa9fa238751c0e9 Mon Sep 17 00:00:00 2001 From: Ivan Pozdeev Date: Wed, 3 Sep 2025 20:58:23 +0300 Subject: [PATCH 2/2] (refactor) --- plugins/python-build/scripts/add_miniforge.py | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/plugins/python-build/scripts/add_miniforge.py b/plugins/python-build/scripts/add_miniforge.py index 3003605c..cb90c9d8 100755 --- a/plugins/python-build/scripts/add_miniforge.py +++ b/plugins/python-build/scripts/add_miniforge.py @@ -124,20 +124,25 @@ def add_version(release, distributions): else: logger.info('Did not find specs for %(distribution)s', locals()) -for release in requests.get(f'https://api.github.com/repos/{MINIFORGE_REPO}/releases').json(): - version = release['tag_name'] - if version in SKIPPED_RELEASES: - continue +def main(): + for release in requests.get(f'https://api.github.com/repos/{MINIFORGE_REPO}/releases').json(): + version = release['tag_name'] - logger.info('Looking for %(version)s in %(out_dir)s', locals()) + if version in SKIPPED_RELEASES: + continue - # mambaforge is retired https://github.com/conda-forge/miniforge/releases/tag/24.11.2-0 - if version_tuple(version) >= (24,11,2): - distributions = DISTRIBUTIONS - else: - distributions = DISTRIBUTIONS_PRE25 + logger.info('Looking for %(version)s in %(out_dir)s', locals()) - if any(not list(out_dir.glob(f'{distribution}*-{version}')) for distribution in distributions): - logger.info('Downloading %(version)s', locals()) - add_version(release, distributions) + # mambaforge is retired https://github.com/conda-forge/miniforge/releases/tag/24.11.2-0 + if version_tuple(version) >= (24, 11, 2): + distributions = DISTRIBUTIONS + else: + distributions = DISTRIBUTIONS_PRE25 + + if any(not list(out_dir.glob(f'{distribution}*-{version}')) for distribution in distributions): + logger.info('Downloading %(version)s', locals()) + add_version(release, distributions) + +if __name__ == '__main__': + main()