#!/usr/bin/env python3
#
# distributions.py
"""
Iterate over installed distributions.
Third-party distributions are installed into Python's ``site-packages`` directory with tools such as pip_.
Distributions must have a ``*.dist-info`` directory (as defined by :pep:`566`) to be discoverable.
.. _pip: https://pypi.org/project/pip/
"""
#
# Copyright © 2021 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
#
# Parts of iter_distributions based on https://github.com/takluyver/entrypoints
# Copyright (c) 2015 Thomas Kluyver and contributors
# MIT Licensed
#
# stdlib
import abc
import collections
import functools
import posixpath
import sys
from contextlib import suppress
from csv import reader as csv_reader
from operator import itemgetter
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Tuple,
Type,
TypeVar
)
# 3rd party
import handy_archives
from domdf_python_tools.paths import PathPlus
from domdf_python_tools.typing import PathLike
from domdf_python_tools.utils import divide
from packaging.version import Version
# this package
from dist_meta import metadata, wheel
from dist_meta._utils import _canonicalize, _iter_dist_infos, _parse_version, _parse_wheel_filename
from dist_meta.metadata_mapping import MetadataMapping
from dist_meta.record import FileHash, RecordEntry
_tuplegetter = lambda index, doc: property(itemgetter(index), doc=doc)
if not TYPE_CHECKING:
with suppress(ImportError):
# 3rd party
from _collections import _tuplegetter
__all__ = (
"get_distribution",
"iter_distributions",
"packages_distributions",
"DistributionType",
"Distribution",
"WheelDistribution",
"DistributionNotFoundError",
"_DT",
)
_DT = TypeVar("_DT", bound="DistributionType")
_D = TypeVar("_D", bound="Distribution")
_WD = TypeVar("_WD", bound="WheelDistribution")
[docs]class DistributionType(abc.ABC):
"""
Abstract base class for :class:`~.Distribution`-like objects.
.. versionchanged:: 0.3.0
Previously was a :py:obj:`~.typing.Union` representing :class:`~.Distribution` and :class:`~.WheelDistribution`.
Now a common base class for those two classes, and custom classes providing the same API
This class implements most of the :func:`collections.namedtuple` API.
Subclasses must implement ``_fields`` (as a tuple of field names)
and the :class:`tuple` interface (specifically ``__iter__`` and ``__getitem__``).
"""
#: The name of the distribution. No normalization is performed.
name: str
#: The version number of the distribution.
version: Version
#: A tuple of field names for the "namedtuple".
_fields: Tuple[str, ...] # actually a ClassVar, but need to support older Pythons
#: A mapping of field names to default values.
_field_defaults: Dict[str, Any]
# These must be implemented by subclasses
__iter__: Callable
__getitem__: Callable
def __init_subclass__(cls: Type["DistributionType"], **kwargs):
if not getattr(cls, "_fields", ()):
raise ValueError("'_fields' cannot be empty.")
ns = cls.__dict__
field_defaults = getattr(cls, "_field_defaults", {})
for index, name in enumerate(cls._fields): # pylint: disable=use-dict-comprehension
if name in ns:
field_defaults[name] = ns[name]
if cls._fields[0] != "name":
raise ValueError("The first item in '_fields' must be 'name'")
elif cls._fields[1] != "version":
raise ValueError("The second item in '_fields' must be 'version'")
for index, name in enumerate(cls._fields):
# pylint: disable=dotted-import-in-loop,loop-global-usage
doc = sys.intern(f'Alias for field number {index}')
setattr(cls, name, _tuplegetter(index, doc))
# pylint: enable=dotted-import-in-loop,loop-global-usage
cls._field_defaults = field_defaults
[docs] @abc.abstractmethod
def read_file(self, filename: str) -> str:
"""
Read a file from the ``*.dist-info`` directory and return its content.
:param filename:
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def has_file(self, filename: str) -> bool:
"""
Returns whether the ``*.dist-info`` directory contains a file named ``filename``.
:param filename:
"""
raise NotImplementedError
[docs] def _asdict(self) -> Dict[str, Any]:
"""
Return a new dict which maps field names to their values.
"""
return dict(zip(self._fields, self))
def __getnewargs__(self) -> Tuple:
"""
Return self as a plain tuple. Used by copy and pickle.
"""
return tuple(self)
[docs] def _replace(self: _DT, **kwargs) -> _DT:
"""
Make a new :class:`~.DistributionType` object, of the same type as this one,
replacing the specified fields with new values.
:param iterable:
""" # noqa: D400
result = self._make(map(kwargs.pop, self._fields, self))
if kwargs:
raise ValueError(f"Got unexpected field names: {list(kwargs)!r}")
return result
[docs] @classmethod
def _make(cls: Type[_DT], iterable) -> _DT: # noqa: MAN001
"""
Make a new :class:`~.DistributionType` object, of the same type as this one, from a sequence or iterable.
:param iterable:
"""
return cls(*iterable)
[docs] def get_entry_points(self) -> Dict[str, Dict[str, str]]: # -> EntryPointMap
"""
Returns a mapping of entry point groups to entry points.
Entry points in the group are contained in a dictionary mapping entry point names to objects.
:class:`dist_meta.entry_points.EntryPoint` objects can be constructed as follows:
.. code-block:: python
for name, epstr in distro.get_entry_points().get("console_scripts", {}).items():
EntryPoint(name, epstr)
"""
# this package
from dist_meta import entry_points
if self.has_file("entry_points.txt"):
return entry_points.loads(self.read_file("entry_points.txt"))
return {}
[docs] def get_wheel(self) -> Optional[MetadataMapping]:
"""
Returns the content of the ``*.dist-info/WHEEL`` file, or :py:obj:`None` if the file does not exist.
The file will only be present if the distribution was installed from a :pep:`wheel <427>`.
""" # noqa: RST399
if self.has_file("WHEEL"):
return wheel.loads(self.read_file("WHEEL"))
return None
[docs] def get_record(self) -> Optional[List[RecordEntry]]:
"""
Returns the parsed content of the ``*.dist-info/RECORD`` file, or :py:obj:`None` if the file does not exist.
:returns: A :class:`dist_meta.record.RecordEntry` object for each line in the record
(i.e. each file in the distribution).
This includes files in the ``*.dist-info`` directory.
"""
if self.has_file("RECORD"):
content = self.read_file("RECORD").splitlines()
output = []
for line in csv_reader(content):
name, hash_, size_str, *_ = line
entry = RecordEntry(
name.strip(),
hash=FileHash.from_string(hash_) if hash_ else None,
size=int(size_str) if size_str else None,
)
output.append(entry)
return output
else:
return None
[docs] def __repr__(self) -> str:
"""
Returns a string representation of the :class:`~.DistributionType`.
"""
return f"<{self.__class__.__name__}({self.name!r}, {self.version!r})>"
[docs]class Distribution(DistributionType, Tuple[str, Version, PathPlus]):
"""
Represents an installed Python distribution.
:param name: The name of the distribution.
"""
#: The name of the distribution. No normalization is performed.
name: str
#: The version number of the distribution.
version: Version
#: The path to the ``*.dist-info`` directory in the file system.
path: PathPlus
__slots__ = ()
_fields = ("name", "version", "path")
def __new__(
cls: Type[_D],
name: str,
version: Version,
path: PathPlus,
) -> _D:
"""
Construct a new :class:`~.Distribution` object.
:rtype: :class:`~.Distribution`
"""
# If this is super().__new__ it breaks on PyPy
return tuple.__new__(cls, (name, version, path))
[docs] @classmethod
def from_path(cls: Type[_D], path: PathLike) -> _D:
"""
Construct a :class:`~.Distribution` from a filesystem path to the ``*.dist-info`` directory.
:param path:
:rtype: :class:`~.Distribution`
"""
path = PathPlus(path)
if path.name[0] == '~':
raise ValueError(
"Directory path starts with a tilde (~). "
"This may be a temporary directory created by pip.",
)
distro_name_version = path.stem
# Check works around https://foss.heptapod.net/pypy/pypy/-/issues/3579
if sys.implementation.name == "pypy": # pragma: no cover (!PyPy)
if distro_name_version == "hpy":
name, version = "hpy", "0.0.0"
elif distro_name_version == "cffi":
name, version = "cffi", "0.0.0"
else:
name, version = divide(distro_name_version, '-')
else:
name, version = divide(distro_name_version, '-')
return cls(name, _parse_version(version), path)
[docs] def read_file(self, filename: str) -> str:
"""
Read a file from the ``*.dist-info`` directory and return its content.
:param filename:
"""
return (self.path / filename).read_text()
[docs] def has_file(self, filename: str) -> bool:
"""
Returns whether the ``*.dist-info`` directory contains a file named ``filename``.
:param filename:
"""
return (self.path / filename).is_file()
[docs] def get_record(self) -> Optional[List[RecordEntry]]:
"""
Returns the parsed content of the ``*.dist-info/RECORD`` file, or :py:obj:`None` if the file does not exist.
:returns: A :class:`dist_meta.record.RecordEntry` object for each line in the record
(i.e. each file in the distribution).
This includes files in the ``*.dist-info`` directory.
"""
if self.has_file("RECORD"):
content = self.read_file("RECORD").splitlines()
output = []
for line in csv_reader(content):
name, hash_, size_str, *_ = line
entry = RecordEntry(
name.strip(),
hash=FileHash.from_string(hash_) if hash_ else None,
size=int(size_str) if size_str else None,
distro=self,
)
output.append(entry)
return output
else:
return None
[docs]class WheelDistribution(DistributionType, Tuple[str, Version, PathPlus, handy_archives.ZipFile]):
"""
Represents a Python distribution in :pep:`wheel <427>` form.
:param name: The name of the distribution.
A :class:`~.WheelDistribution` can be used as a contextmanager,
which will close the underlying :class:`zipfile.ZipFile` when exiting
the :keyword:`with` block.
""" # noqa: RST399
#: The name of the distribution. No normalization is performed.
name: str
#: The version number of the distribution.
version: Version
#: The path to the ``.whl`` file.
path: PathPlus
#: The opened zip file.
wheel_zip: handy_archives.ZipFile
__slots__ = ()
_fields = ("name", "version", "path", "wheel_zip")
def __new__(
cls: Type[_WD],
name: str,
version: Version,
path: PathPlus,
wheel_zip: handy_archives.ZipFile,
) -> _WD:
"""
Construct a new :class:`~.WheelDistribution` object.
:rtype: :class:`~.WheelDistribution`
"""
# If this is super().__new__ it breaks on PyPy
return tuple.__new__(cls, (name, version, path, wheel_zip))
[docs] @classmethod
def from_path(cls: Type[_WD], path: PathLike, **kwargs) -> _WD:
r"""
Construct a :class:`~.WheelDistribution` from a filesystem path to the ``.whl`` file.
:param path:
:param \*\*kwargs: Additional keyword arguments passed to :class:`zipfile.ZipFile`.
:rtype: :class:`~.WheelDistribution`
"""
path = PathPlus(path)
name, version, *_ = _parse_wheel_filename(path)
wheel_zip = handy_archives.ZipFile(path, 'r', **kwargs)
return cls(name, version, path, wheel_zip)
def __enter__(self: _WD) -> _WD:
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.wheel_zip.close()
[docs] def read_file(self, filename: str) -> str:
"""
Read a file from the ``*.dist-info`` directory and return its content.
:param filename:
"""
dist_info = f"{self.name}-{self.version}.dist-info"
try:
return self.wheel_zip.read_text(posixpath.join(dist_info, filename))
except FileNotFoundError as fnf_e:
try:
dist_info = _get_dist_info_path(self)
except _NoDistInfoFound:
raise fnf_e
return self.wheel_zip.read_text(posixpath.join(dist_info, filename))
[docs] def has_file(self, filename: str) -> bool:
"""
Returns whether the ``*.dist-info`` directory contains a file named ``filename``.
:param filename:
"""
dist_info = f"{self.name}-{self.version}.dist-info"
if posixpath.join(dist_info, filename) in self.wheel_zip.namelist():
return True
else:
try:
dist_info = _get_dist_info_path(self)
except _NoDistInfoFound:
return False
else:
return posixpath.join(dist_info, filename) in self.wheel_zip.namelist()
[docs] def get_wheel(self) -> MetadataMapping:
"""
Returns the content of the ``*.dist-info/WHEEL`` file.
:raises FileNotFoundError: if the file does not exist.
"""
return wheel.loads(self.read_file("WHEEL"))
[docs] def get_record(self) -> List[RecordEntry]:
"""
Returns the parsed content of the ``*.dist-info/RECORD`` file, or :py:obj:`None` if the file does not exist.
:returns: A :class:`dist_meta.record.RecordEntry` object for each line in the record
(i.e. each file in the distribution).
This includes files in the ``*.dist-info`` directory.
:raises FileNotFoundError: if the file does not exist.
"""
content = self.read_file("RECORD").splitlines()
output = []
for line in csv_reader(content):
name, hash_, size_str, *_ = line
entry = RecordEntry(
name,
hash=FileHash.from_string(hash_) if hash_ else None,
size=int(size_str) if size_str else None,
)
output.append(entry)
return output
[docs]def iter_distributions(path: Optional[Iterable[PathLike]] = None) -> Iterator[Distribution]:
"""
Returns an iterator over installed distributions on ``path``.
:param path: The directories entries to search for distributions in.
:default path: :py:data:`sys.path`
"""
if path is None: # pragma: no cover
path = sys.path
# Distributions found earlier in path will shadow those with the same name found later.
# If these distributions used different module names, it may actually be possible to import both,
# but in most cases this shadowing will be correct.
distro_names_seen = set()
for folder in map(PathPlus, path):
if not folder.is_dir():
continue
for subdir in _iter_dist_infos(folder):
if subdir.name[0] == '~':
# Temporary directory created by pip
continue
distro = Distribution.from_path(subdir)
normalized_name = _canonicalize(distro.name)
if normalized_name in distro_names_seen:
continue
distro_names_seen.add(normalized_name)
yield distro
[docs]def get_distribution(
name: str,
path: Optional[Iterable[PathLike]] = None,
) -> Distribution:
"""
Returns a :class:`~.Distribution` instance for the distribution with the given name.
:param name:
:param path: The directories entries to search for distributions in.
:default path: :py:data:`sys.path`
:rtype:
"""
for distro in iter_distributions(path=path):
if _canonicalize(distro.name) == _canonicalize(name):
return distro
raise DistributionNotFoundError(name)
[docs]class DistributionNotFoundError(ValueError):
"""
Raised when a distribution cannot be located.
"""
class _NoDistInfoFound(Exception):
pass
@functools.lru_cache()
def _get_dist_info_path(dist: WheelDistribution) -> str:
"""
Find the name of the dist-info directory, case insensitive and allowing unnormalised versions.
:param dist:
:raises _NoDistInfoFound: If no dist-info directory is found, or the version/name don't match.
"""
casefolded_dist_name = dist.name.casefold()
for filename in dist.wheel_zip.namelist():
if ".dist-info" in filename:
# Might be the directory we're looking for
with suppress(Exception):
# Ignore parsing errors
dist_info_dir = filename.split('/', 1)[0]
# pylint: disable=dotted-import-in-loop,loop-invariant-statement
distro_name_version, extension = posixpath.splitext(dist_info_dir)
if extension != ".dist-info":
continue
# pylint: enable=dotted-import-in-loop,loop-invariant-statement
name, version = divide(distro_name_version, '-')
if name.casefold() == casefolded_dist_name:
actual_version = _parse_version(version)
if actual_version == dist.version:
dist_info = f"{name}-{version}.dist-info"
return dist_info
# path not found
raise _NoDistInfoFound
[docs]def packages_distributions(path: Optional[Iterable[PathLike]] = None) -> Mapping[str, List[str]]:
"""
Returns a mapping of top-level packages to a list of distributions which provide them.
The same top-level package may be provided by multiple distributions,
especially in the case of namespace packages.
:param path: The directories entries to search for distributions in.
:default path: :py:data:`sys.path`
.. versionadded:: 0.7.0
:bold-title:`Example:`
.. code-block:: pycon
>>> import collections.abc
>>> pkgs = packages_distributions()
>>> all(isinstance(dist, collections.abc.Sequence) for dist in pkgs.values())
True
"""
if path is None: # pragma: no cover
path = sys.path
pkg_to_dist = collections.defaultdict(set)
for dist in iter_distributions(path):
dist_name = dist.get_metadata()["Name"]
assert dist_name is not None
record = dist.get_record() or ()
for file in record:
if file.suffix == ".py":
if ".." in file.parts:
# File outside of site-packages (e.g. in venv/bin)
continue
if len(file.parts) > 1:
# Package
pkg = file.parts[0]
else:
# Single file module
pkg = file.stem
pkg_to_dist[pkg].add(dist_name)
return {k: sorted(v) for k, v in pkg_to_dist.items()}