Source code for pymt.mappers.pointtocell
from collections import defaultdict
import numpy as np
from scipy.spatial import KDTree
from .imapper import IGridMapper, IncompatibleGridError
# from .mapper import IncompatibleGridError
[docs]def map_cells_to_points(coords, dst_grid, dst_point_ids, bad_val=-1):
src_x, src_y = coords
cell_to_point_id = defaultdict(list)
for j, point_id in enumerate(dst_point_ids):
for cell_id in dst_grid.get_shared_cells(point_id):
if dst_grid.is_in_cell(src_x[j], src_y[j], cell_id):
cell_to_point_id[cell_id].append(j)
return cell_to_point_id
[docs]class PointToCell(IGridMapper):
_name = "PointToCell"
[docs] def initialize(self, dest_grid, src_grid, **kwds):
if not self.test(dest_grid, src_grid):
raise IncompatibleGridError(dest_grid.name, src_grid.name)
src_x = src_grid.get_x()
src_y = src_grid.get_y()
tree = KDTree(list(zip(dest_grid.get_x(), dest_grid.get_y())))
(_, nearest_dest_id) = tree.query(list(zip(src_x, src_y)))
self._map = map_cells_to_points(
(src_x, src_y), dest_grid, nearest_dest_id, bad_val=-1
)
self._dst_cell_count = dest_grid.get_cell_count()
self._src_point_count = src_grid.get_point_count()
[docs] def run(self, src_values, **kwds):
dst_vals = kwds.get("dst_vals", None)
bad_val = kwds.get("bad_val", -999)
method = kwds.get("method", np.mean)
if src_values.size != self._src_point_count:
raise ValueError("size mismatch between source and point count")
if dst_vals is None:
dst_vals = np.array([bad_val] * self._dst_cell_count, dtype=float)
if dst_vals.size != self._dst_cell_count:
raise ValueError("size mismatch between destination and cell count")
for cell_id, point_ids in self._map.items():
if all(src_values[point_ids] > bad_val):
dst_vals[cell_id] = method(src_values[point_ids])
return dst_vals
[docs] @staticmethod
def test(dst_grid, src_grid):
return all(np.diff(dst_grid.get_offset()) > 2) and src_grid is not None
@property
def name(self):
return self._name