import os
from ...grids import utils as gutils
from .constants import _NP_TO_NC_TYPE, open_netcdf
_OPENED_FILES = {}
[docs]def close_all():
for path in _OPENED_FILES:
close(path)
[docs]def close(path):
try:
_OPENED_FILES[path].close()
except KeyError:
pass
else:
del _OPENED_FILES[path]
[docs]class NetcdfField:
def __init__(
self, path, field, fmt="NETCDF4", append=False, time=None, keep_open=False
):
path = os.path.abspath(path)
self._path = path
self._field = field
if path in _OPENED_FILES and not os.path.isfile(path):
close(path)
if path in _OPENED_FILES:
self._root = _OPENED_FILES[path]
else:
self._root = open_netcdf(path, mode="w", fmt=fmt, append=append)
self._set_mesh_topology()
self._set_node_variable_data()
self._set_face_variable_data()
self._set_time_variable(now=time)
if keep_open:
_OPENED_FILES[path] = self._root
else:
self.close()
def _set_mesh_dimensions(self):
raise NotImplementedError("_set_mesh_dimensions")
def _set_mesh_coordinate_data(self):
raise NotImplementedError("_set_mesh_coordinate_data")
@property
def node_data_dimensions(self):
raise NotImplementedError("node_data_dimensions")
@property
def face_data_dimensions(self):
raise NotImplementedError("face_data_dimensions")
[docs] def close(self):
if self._path in _OPENED_FILES:
del _OPENED_FILES[self._path]
self._root.close()
@property
def type(self):
return "unknown"
@property
def field(self):
return self._field
@property
def root(self):
return self._root
@property
def topology_dimension(self):
return len(gutils.non_singleton_shape(self.field))
@property
def field_axes(self):
return gutils.non_singleton_axes(self.field)
@property
def node_coordinates(self):
return ""
@property
def face_connectivity(self):
return "face_nodes_connectivity"
@property
def face_node_connectivity(self):
return "face_nodes"
@property
def node_count(self):
return self.field.get_point_count()
@property
def face_count(self):
return self.field.get_cell_count()
@property
def vertex_count(self):
return self.field.get_vertex_count()
@property
def time_count(self):
try:
return len(self._root.variables["time"])
except KeyError:
return 0
[docs] def has_dimension(self, name):
return name in self._root.dimensions
[docs] def has_variable(self, name):
return name in self._root.variables
[docs] def create_dimension(self, name, dim_len):
try:
if not self.has_dimension(name):
self._root.createDimension(name, dim_len)
except IndexError:
pass
[docs] def create_variable(self, name, *args, **kwds):
if not self.has_variable(name):
self._root.createVariable(name, *args, **kwds)
[docs] def set_variable(self, name, *args, **kwds):
if len(args) not in (0, 1):
raise ValueError("number of arguments must be 0 or 1")
attrs = kwds.pop("attrs", {})
variable = self.data_variable(name)
for attr, value in attrs.items():
variable.setncattr(attr, value)
if len(args) > 0:
array = args[0]
if "time" in variable.dimensions:
n_times = self.time_count
if array.size > 1:
variable[n_times, :] = array.flat
else:
variable[n_times] = array[0]
else:
variable[:] = array.reshape(variable.shape)
[docs] def data_variable(self, name):
return self.root.variables[name]
def _set_mesh_topology(self):
self._set_topology()
self._set_mesh_dimensions()
self._set_time_dimension()
self._set_mesh_coordinate_data()
self._set_face_node_connectivity_data()
def _set_topology(self):
self.create_variable("mesh", "i8")
self.set_variable(
"mesh",
attrs={
"cf_role": "mesh_topology",
"topology_dimension": self.topology_dimension,
"node_coordinates": " ".join(self.node_coordinates),
"face_connectivity": self.face_connectivity,
"face_node_connectivity": self.face_node_connectivity,
"type": self.type,
},
)
def _set_time_dimension(self):
if not self.has_dimension("time"):
self.create_dimension("time", None)
def _set_coordinate_data(self):
self._set_mesh_coordinate_data()
self._set_face_node_connectivity_data()
def _set_time_variable(
self, now=None, units="days", reference="0001-01-01 00:00:00 UTC"
):
self.create_variable("time", "f8", ("time",))
time = self.data_variable("time")
time.units = " ".join([units, "since", reference])
time.long_name = "time"
if now is not None:
time[self.time_count - 1] = now
else:
time[self.time_count - 1] = self.time_count - 1
def _set_variable_data(self):
self._set_node_variable_data()
self._set_face_variable_data()
def _set_node_variable_data(self):
point_fields = self.field.get_point_fields()
for var_name, array in point_fields.items():
self.create_variable(
var_name,
_NP_TO_NC_TYPE[str(array.dtype)],
["time"] + list(self.node_data_dimensions),
)
self.set_variable(
var_name,
array,
attrs={
"units": self.field.get_field_units(var_name),
"standard_name": var_name,
"long_name": var_name,
"location": "node",
"coordinates": " ".join(self.node_data_dimensions),
},
)
def _set_face_variable_data(self):
face_fields = self.field.get_cell_fields()
for var_name, array in face_fields.items():
self.create_variable(
var_name,
_NP_TO_NC_TYPE[str(array.dtype)],
["time"] + list(self.face_data_dimensions),
)
self.set_variable(
var_name,
array,
attrs={
"units": self.field.get_field_units(var_name),
"standard_name": var_name,
"long_name": var_name,
"location": "face",
"coordinates": " ".join(self.node_data_dimensions),
},
)
def _set_face_node_connectivity_data(self):
pass
[docs]class NetcdfRectilinearField(NetcdfField):
@property
def type(self):
return "rectilinear"
@property
def node_coordinates(self):
return gutils.non_singleton_dimension_names(self.field)
@property
def topology_dimension(self):
return len(gutils.non_singleton_shape(self.field))
@property
def node_data_dimensions(self):
return gutils.non_singleton_dimension_names(self.field)
@property
def axis_coordinates(self):
return gutils.non_singleton_dimension_names(self.field)
def _set_mesh_dimensions(self):
field_shape = self.field.get_shape()
for name, axis in zip(self.axis_coordinates, self.field_axes):
self.create_dimension(name, field_shape[axis])
def _set_mesh_coordinate_data(self):
for name, axis in zip(self.axis_coordinates, self.field_axes):
self.create_variable(name, "f8", (name,))
self.set_variable(
name,
self.field.get_axis_coordinates(axis=axis),
attrs={
"units": self.field.get_coordinate_units(axis),
"standard_name": self.field.get_coordinate_name(axis),
"long_name": self.field.get_coordinate_name(axis),
"name": self.field.get_coordinate_name(axis),
},
)
[docs]class NetcdfStructuredField(NetcdfRectilinearField):
@property
def type(self):
return "structured"
@property
def node_data_dimensions(self):
return gutils.non_singleton_dimension_names(self._field)
@property
def node_coordinates(self):
return gutils.non_singleton_dimension_names(self._field)
# names = gutils.non_singleton_dimension_names(self._field)
# return ['node_' + name for name in names]
def _set_mesh_dimensions(self):
NetcdfRectilinearField._set_mesh_dimensions(self)
for name in self.node_coordinates:
self.create_dimension(name, self.node_count)
def _set_mesh_coordinate_data(self):
dims = self.node_data_dimensions
# for (name, axis) in zip(self.node_coordinates, self.field_axes):
for name, axis in zip(self.node_data_dimensions, self.field_axes):
self.create_variable(name, "f8", dims)
self.set_variable(
name,
self.field.get_coordinate(axis),
attrs={
"units": self.field.get_coordinate_units(axis),
"standard_name": self.field.get_coordinate_name(axis),
"long_name": self.field.get_coordinate_name(axis),
},
)
[docs]class NetcdfUnstructuredField(NetcdfStructuredField):
@property
def type(self):
return "unstructured"
@property
def node_coordinates(self):
names = []
for axis in range(self.field.get_dim_count()):
names.append("node_" + self.field.get_coordinate_name(axis))
return names
@property
def topology_dimension(self):
return self.field.get_dim_count()
@property
def node_data_dimensions(self):
return ("n_node",)
# dimensions = []
# for name in self.node_coordinates:
# dimensions.append((name, ))
# return dimensions
@property
def face_data_dimensions(self):
return ("n_face",)
def _set_mesh_dimensions(self):
# for name in self.node_coordinates:
# self.create_dimension(name, self.node_count)
self.create_dimension("n_node", self.node_count)
self.create_dimension("n_face", self.face_count)
self.create_dimension("n_vertex", self.vertex_count)
self.create_dimension("n_max_face_nodes", self.field.get_max_vertices())
def _set_mesh_coordinate_data(self):
dims = self.node_data_dimensions
# for (name, axis) in zip(self.node_data_dimensions, self.field_axes):
# for (name, axis) in zip(self.node_coordinates, self.field_axes):
for axis, name in enumerate(self.node_coordinates):
self.create_variable(name, "f8", dims)
self.set_variable(
name,
self.field.get_coordinate(axis),
attrs={
"units": self.field.get_coordinate_units(axis),
"standard_name": self.field.get_coordinate_name(axis),
"long_name": self.field.get_coordinate_name(axis),
},
)
def _set_face_node_connectivity_data(self):
self.create_variable("face_nodes_connectivity", "i8", ("n_vertex",))
self.set_variable(
"face_nodes_connectivity",
self.field.get_connectivity(),
attrs={
"cf_role": "face_node_connectivity",
"long_name": "Maps every face to its corner nodes.",
"start_index": 0,
},
)
self.create_variable("face_nodes_offset", "i8", ("n_face",))
self.set_variable(
"face_nodes_offset",
self.field.get_offset(),
attrs={
"cf_role": "face_node_offset",
"long_name": "Maps face index into connectivity array",
},
)
(connectivity, fill_val) = self.field.get_connectivity_as_matrix()
self.create_variable(
"face_nodes", "i8", ("n_face", "n_max_face_nodes"), fill_value=fill_val
)
self.set_variable(
"face_nodes",
connectivity,
attrs={
"cf_role": "face_node_connectivity",
"long_name": "Maps every face to its corner nodes.",
"start_index": 0,
},
)