Source code for si_unit_pandas.base

#!/usr/bin/env python3
#
#  base.py
"""
Base functionality.
"""
#
#  Copyright (c) 2020 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
#  Based on cyberpandas
#  https://github.com/ContinuumIO/cyberpandas
#  Copyright (c) 2018, Anaconda, Inc.
#
#  Redistribution and use in source and binary forms, with or without
#  modification, are permitted provided that the following conditions are met:
#
#  * Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
#  * Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
#  * Neither the name of the copyright holder nor the names of its
#    contributors may be used to endorse or promote products derived from
#    this software without specific prior written permission.
#
#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
#  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
#  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
#  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
#  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
#  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#  _isstringslice based on awkward-array
#  https://github.com/scikit-hep/awkward-array
#  Copyright (c) 2018-2019, Jim Pivarski
#  Licensed under the BSD 3-Clause License
#

# stdlib
from abc import abstractmethod
from numbers import Real
from typing import Dict, Iterable, List, Optional, Sequence, SupportsFloat, Tuple, Type, TypeVar, Union, overload

# 3rd party
import numpy  # type: ignore
from domdf_python_tools.doctools import prettify_docstrings
from pandas.core.arrays import ExtensionArray  # type: ignore
from pandas.core.dtypes.base import ExtensionDtype  # type: ignore
from pandas.core.dtypes.generic import ABCExtensionArray  # type: ignore
from typing_extensions import Literal, Protocol

__all__ = ["NumPyBackedExtensionArrayMixin"]


[docs]class NumPyBackedExtensionArrayMixin(ExtensionArray): """ Mixin for pandas extension backed by a numpy array. """ _dtype: Type[ExtensionDtype] @property def dtype(self): """ The dtype for this extension array, :class:`~.CelsiusType`. """ return self._dtype
[docs] @classmethod def _from_sequence(cls, scalars: Iterable, dtype=None, copy: bool = False): """ Construct a new ExtensionArray from a sequence of scalars. :param scalars: Each element will be an instance of the scalar type for this array, ``cls.dtype.type``. :param dtype: Construct for this particular dtype. This should be a Dtype compatible with the ExtensionArray. :type dtype: dtype, optional :param copy: If True, copy the underlying data. """ return cls(scalars, dtype=dtype)
[docs] @classmethod def _from_factorized(cls, values: numpy.ndarray, original: ExtensionArray): """ Reconstruct an ExtensionArray after factorization. :param values: An integer ndarray with the factorized values. :param original: The original ExtensionArray that factorize was called on. .. seealso:: :meth:`pandas.pandas.api.extensions.ExtensionArray.factorize` """ return cls(values)
@property def shape(self) -> Tuple[int]: """ Return a tuple of the array dimensions. """ return len(self.data),
[docs] def __len__(self) -> int: """ Returns the length of this array. """ return len(self.data)
[docs] def setitem(self, indexer, value): """ Set the 'value' inplace. """ # I think having a separate than __setitem__ is good # since we have to return here, but __setitem__ doesn't. self[indexer] = value return self
@property def nbytes(self) -> int: """ The number of bytes needed to store this object in memory. """ return self._itemsize * len(self)
[docs] def _formatting_values(self): return numpy.array(self._format_values(), dtype="object")
[docs] def copy(self, deep: bool = False) -> ABCExtensionArray: """ Return a copy of the array. :param deep: :return: :rtype: """ return type(self)(self.data.copy())
[docs] @classmethod def _concat_same_type(cls, to_concat: Sequence[ABCExtensionArray]) -> ABCExtensionArray: """ Concatenate multiple arrays. :param to_concat: sequence of this type """ return cls(numpy.concatenate([array.data for array in to_concat]))
[docs] def tolist(self) -> List: """ Convert the array to a Python list. """ return self.data.tolist()
[docs] def argsort( self, ascending: bool = True, kind: Union[Literal["quicksort"], Literal["mergesort"], Literal["heapsort"]] = "quicksort", *args, **kwargs, ) -> numpy.ndarray: r""" Return the indices that would sort this array. :param ascending: Whether the indices should result in an ascending or descending sort. :param kind: {'quicksort', 'mergesort', 'heapsort'}, optional Sorting algorithm. \*args and \*\*kwargs are passed through to :func:`numpy.argsort`. :return: Array of indices that sort ``self``. If NaN values are contained, NaN values are placed at the end. .. seealso:: :class:`numpy.argsort`: Sorting implementation used internally. """ return self.data.argsort()
[docs] def unique(self) -> ExtensionArray: # noqa: D102 # https://github.com/pandas-dev/pandas/pull/19869 _, indices = numpy.unique(self.data, return_index=True) data = self.data.take(numpy.sort(indices)) return self._from_ndarray(data)
_A = TypeVar("_A") class BaseArray(numpy.lib.mixins.NDArrayOperatorsMixin, NumPyBackedExtensionArrayMixin): ndim: int = 1 data: numpy.ndarray @classmethod def _from_ndarray(cls: _A, data: numpy.ndarray, copy: bool = False) -> _A: """ Zero-copy construction of a BaseArray from an ndarray. :param data: This should have CelsiusType._record_type dtype :param copy: Whether to copy the data. :return: """ if copy: data = data.copy() new = cls([]) # type: ignore new.data = data return new @property def na_value(self): """ The missing value. **Example:** .. code-block:: >>> BaseArray([]).na_value numpy.nan """ return self.dtype.na_value def take(self, indices, allow_fill: bool = False, fill_value=None): # Can't use pandas' take yet # 1. axis # 2. I don't know how to do the reshaping correctly. indices = numpy.asarray(indices, dtype="int") if allow_fill and fill_value is None: fill_value = self.na_value elif allow_fill and not isinstance(fill_value, tuple): if not numpy.isnan(fill_value): fill_value = int(fill_value) if allow_fill: mask = (indices == -1) if not len(self): if not (indices == -1).all(): msg = "Invalid take for empty array. Must be all -1." raise IndexError(msg) else: # all NA take from and empty array took = ( numpy.full( (len(indices), 2), fill_value, dtype=">u8", ).reshape(-1).astype(self.dtype._record_type) ) return self._from_ndarray(took) if (indices < -1).any(): msg = "Invalid value in 'indicies'. Must be all >= -1 for 'allow_fill=True'" raise ValueError(msg) took = self.data.take(indices) if allow_fill: took[mask] = fill_value return self._from_ndarray(took) def __repr__(self) -> str: formatted = self._format_values() return f"{self.__class__.__name__}({formatted!r})" def isna(self): """ Indicator for whether each element is missing. """ if numpy.isnan(self.na_value): return numpy.isnan(self.data) else: return self.data == self.na_value # From https://github.com/scikit-hep/awkward-array/blob/2bbdb68d7a4fff2eeaed81eb76195e59232e8c13/awkward/array/base.py#L611 def _isstringslice(self, where): if isinstance(where, str): return True elif isinstance(where, bytes): raise TypeError("column selection must be str, not bytes, in Python 3") elif isinstance(where, tuple): return False elif ( isinstance(where, (numpy.ndarray, self.__class__)) and issubclass(where.dtype.type, (numpy.str, numpy.str_)) ): return True elif isinstance(where, (numpy.ndarray, self.__class__)) and issubclass( where.dtype.type, (numpy.object, numpy.object_) ) and not issubclass(where.dtype.type, (numpy.bool, numpy.bool_)): return len(where) > 0 and all(isinstance(x, str) for x in where) elif isinstance(where, (numpy.ndarray, self.__class__)): return False try: assert len(where) > 0 assert all(isinstance(x, str) for x in where) except (TypeError, AssertionError): return False else: return True def __delitem__(self, where): if isinstance(where, str): del self.data[where] elif self._isstringslice(where): for x in where: del self.data[x] else: raise TypeError(f"invalid index for removing column from Table: {where}") @property @abstractmethod def _parser(self): raise NotImplementedError def append(self, value) -> None: """ Append a value to this BaseArray. :param value: """ self.data = numpy.append(self.data, self._parser(value).data) def __setitem__(self, key, value): value = self._parser(value).data self.data[key] = value class _SupportsIndex(Protocol): def __index__(self) -> int: ... _F = TypeVar("_F", bound="UserFloat") @prettify_docstrings class UserFloat(Real): """ Class that simulates a float. :param value: Values to initialise the :class:`~domdf_python_tools.bases.UserFloat` with. .. versionadded:: 1.6.0 """ def __init__(self, value: Union[SupportsFloat, _SupportsIndex, str, bytes, bytearray] = 0.0): self._value = (float(value), ) def as_integer_ratio(self) -> Tuple[int, int]: return float(self).as_integer_ratio() def hex(self) -> str: # noqa: A003 # pylint: disable=redefined-builtin return float(self).hex() def is_integer(self) -> bool: return float(self).is_integer() @classmethod def fromhex(cls: Type[_F], __s: str) -> _F: return cls(float.fromhex(__s)) def __add__(self: _F, other: float) -> _F: return self.__class__(float(self).__add__(other)) def __sub__(self: _F, other: float) -> _F: return self.__class__(float(self).__sub__(other)) def __mul__(self: _F, other: float) -> _F: return self.__class__(float(self).__mul__(other)) def __floordiv__(self: _F, other: float) -> _F: # type: ignore return self.__class__(float(self).__floordiv__(other)) def __truediv__(self: _F, other: float) -> _F: return self.__class__(float(self).__truediv__(other)) def __mod__(self: _F, other: float) -> _F: return self.__class__(float(self).__mod__(other)) def __divmod__(self: _F, other: float) -> Tuple[_F, _F]: return tuple(self.__class__(x) for x in float(self).__divmod__(other)) # type: ignore def __pow__(self: _F, other: float, mod=None) -> _F: return self.__class__(float(self).__pow__(other, mod)) def __radd__(self: _F, other: float) -> _F: return self.__class__(float(self).__radd__(other)) def __rsub__(self: _F, other: float) -> _F: return self.__class__(float(self).__rsub__(other)) def __rmul__(self: _F, other: float) -> _F: return self.__class__(float(self).__rmul__(other)) def __rfloordiv__(self: _F, other: float) -> _F: # type: ignore return self.__class__(float(self).__rfloordiv__(other)) def __rtruediv__(self: _F, other: float) -> _F: return self.__class__(float(self).__rtruediv__(other)) def __rmod__(self: _F, other: float) -> _F: return self.__class__(float(self).__rmod__(other)) def __rdivmod__(self: _F, other: float) -> Tuple[_F, _F]: return tuple(self.__class__(x) for x in float(self).__rdivmod__(other)) # type: ignore def __rpow__(self: _F, other: float, mod=None) -> _F: return self.__class__(float(self).__rpow__(other, mod)) def __getnewargs__(self) -> Tuple[float]: return self._value def __trunc__(self) -> int: return float(self).__trunc__() @overload def __round__(self, ndigits: int) -> float: ... @overload def __round__(self, ndigits: None = ...) -> int: ... def __round__(self, ndigits: Optional[int] = None) -> Union[int, float]: return float(self).__round__(ndigits) def __eq__(self, other: object) -> bool: if isinstance(other, UserFloat): return self._value == other._value else: return float(self).__eq__(other) def __ne__(self, other: object) -> bool: if isinstance(other, UserFloat): return self._value != other._value else: return float(self).__ne__(other) def __lt__(self, other: float) -> bool: if isinstance(other, UserFloat): return self._value < other._value else: return float(self).__lt__(other) def __le__(self, other: float) -> bool: if isinstance(other, UserFloat): return self._value <= other._value else: return float(self).__le__(other) def __gt__(self, other: float) -> bool: if isinstance(other, UserFloat): return self._value > other._value else: return float(self).__gt__(other) def __ge__(self, other: float) -> bool: if isinstance(other, UserFloat): return self._value >= other._value else: return float(self).__ge__(other) def __neg__(self: _F) -> _F: return self.__class__(float(self).__neg__()) def __pos__(self: _F) -> _F: return self.__class__(float(self).__pos__()) def __str__(self) -> str: return str(float(self)) def __int__(self) -> int: return int(float(self)) def __float__(self) -> float: return self._value[0] def __abs__(self: _F) -> _F: return self.__class__(float(self).__abs__()) def __hash__(self) -> int: return float(self).__hash__() def __repr__(self) -> str: return str(self) def __ceil__(self): raise NotImplementedError def __floor__(self): raise NotImplementedError