Source code for pymt.framework.timeinterp

import bisect

from scipy.interpolate import interp1d

_MINIMUM_SIZE_FOR_METHOD = {
    "linear": 2,
    "nearest": 2,
    "zero": 2,
    "slinear": 2,
    "quadratic": 3,
    "cubic": 4,
    "previous": 2,
    "next": 2,
}


[docs] class TimeInterpolator: METHODS = ( "linear", "nearest", "zero", "slinear", "quadratic", "cubic", "previous", "next", )
[docs] def __init__( self, data=(), method="linear", fill_value="extrapolate", maxsize=None ): """Interpolate data based on an evolving time series of data values. Parameters --------- data : iterable of (*time*, *data*), optional The data to use for the interpolation as an iterable of (*time*, *data*). *data* can either be scalars or numpy arrays. method : str, optional The interpolation method to use. fill_value : str or float, optional The value to use if trying to interpolate values that are outside the stored times. The default is to extrapolate to the given time. maxsize : int, optional The maximum size of the buffer that holds the interpolation times. When the buffer reaches this size, the oldest times will be popped off a the stack. """ self._data = [] self._time = [] self._func = None self._method = None self._fill_value = None self._maxsize = None self.method = method self.fill_value = fill_value self.maxsize = maxsize self.add_data(data)
@property def method(self): """The method used for interpolating time.""" return self._method @method.setter def method(self, val): if val not in TimeInterpolator.METHODS: raise ValueError("method not understood") self._method = val @property def fill_value(self): """Value to use when interpolating values outside of the data.""" return self._fill_value @fill_value.setter def fill_value(self, val): if val != "extrapolate" and not isinstance(val, float): raise ValueError( "fill_value not understood, must be either float or 'extrapolate'" ) self._fill_value = val @property def maxsize(self): """The maximum number of times to be stored.""" return self._maxsize @maxsize.setter def maxsize(self, val): if val is not None and val <= _MINIMUM_SIZE_FOR_METHOD[self.method]: raise ValueError(f"maxsize too small for method ({self.method})") self._maxsize = val self._trim_data_to_maxsize()
[docs] def add_data(self, time_and_data): """Add new data points to the interpolator. Parameters ---------- time : float or iterable of float Time values data : float or iterable Data values to interpolate. """ if isinstance(time_and_data, float): time_and_data = tuple(time_and_data) for t, d in time_and_data: self._insert_data(t, d) self._trim_data_to_maxsize()
def _trim_data_to_maxsize(self): """Drop stored data to maxsize.""" if self.maxsize is not None: while len(self._time) > self.maxsize: self._time.pop(0) self._data.pop(0) def _insert_data(self, time, data): """Insert data so that it is stored sorted by time.""" ind = bisect.bisect_right(self._time, time) self._time.insert(ind, time) self._data.insert(ind, data) self._func = None
[docs] def interpolate(self, time): """Interpolate the data at a given time.""" if self._func is None: self._func = interp1d( self._time, self._data, axis=0, kind=self._method, fill_value=self._fill_value, copy=False, assume_sorted=True, bounds_error=False, ) return self._func(time)
[docs] def __call__(self, time): """Interpolate the data at a given time.""" return self.interpolate(time)