Source code for pymt.events.port
"""Wrap a port as a :class:`Timeline` event.
"""
import os
import sys
import yaml
from ..component.grid import GridMixIn
from ..framework import services
from ..mappers import NearestVal
from ..utils import as_cwd
[docs]class PortEvent(GridMixIn):
"""Wrap a port as an event.
Parameters
----------
port : str
Name of port.
init_args : list, optional
List of arguments to initialize port.
run_dir : str, optional
Path to directory to execute port.
"""
def __init__(self, *args, **kwds):
if isinstance(kwds["port"], str):
self._port = services.get_component_instance(kwds["port"])
else:
self._port = kwds["port"]
self._init_args = kwds.get("init_args", [])
self._run_dir = kwds.get("run_dir", ".")
if isinstance(self._init_args, str):
self._init_args = [self._init_args]
self._status_fp = open(
os.path.abspath(os.path.join(self._run_dir, "_time.txt")), "w"
)
GridMixIn.__init__(self)
[docs] def initialize(self):
"""Initialize the event.
Run the underlying port's initialization method in its *run_dir*. The event's *init_args* are passed to
the initialize method as arguments.
"""
with as_cwd(self._run_dir):
status = {"name": self.name, "time": 0.0, "status": "initializing"}
self._status_fp.write(yaml.dump(status))
try:
self._port.initialize(*self._init_args)
except Exception:
raise
[docs] def run(self, time):
"""Run the event.
Call the `run` method of the underlying port using *time* as the run time.
Parameters
----------
time : float
Time to run the event to.
"""
with as_cwd(self._run_dir):
status = {"name": self.name, "time": time, "status": "running"}
print(yaml.dump(status), file=self._status_fp)
self._status_fp.flush()
sys.stdout.flush()
sys.stderr.flush()
print(yaml.dump(status, default_flow_style=True))
self._port.run(time)
sys.stdout.flush()
sys.stderr.flush()
[docs] def update(self, time):
with as_cwd(self._run_dir):
self._port.update_until(time)
[docs] def finalize(self):
"""Finalize the event.
Run the `finalize` method of the underlying port.
"""
with as_cwd(self._run_dir):
status = {"name": self.name, "time": None, "status": "finishing"}
self._status_fp.write(yaml.dump(status))
self._port.finalize()
status = {"name": self.name, "time": None, "status": "finished"}
self._status_fp.write(yaml.dump(status))
self._status_fp.close()
[docs]class PortMapEvent:
"""An event that maps values between ports.
Parameters
----------
src_port : str
Port name that is the data source.
dst_port : str
Port name that is the destination.
vars_to_map : list, optional
Names of variable to map.
method : {'direct', 'nearest'}, optional
Method used to map values.
"""
def __init__(self, *args, **kwds):
if isinstance(kwds["src_port"], str):
self._src = services.get_component_instance(kwds["src_port"])
else:
self._src = kwds["src_port"]
if isinstance(kwds["dst_port"], str):
self._dst = services.get_component_instance(kwds["dst_port"])
else:
self._dst = kwds["dst_port"]
self._vars_to_map = kwds.get("vars_to_map", [])
self._method = kwds.get("method", "direct")
if self._method == "direct":
self._mapper = None
elif self._method == "nearest":
self._mapper = NearestVal()
else:
raise ValueError("method %s not understood" % self._method)
[docs] def initialize(self):
"""Initialize the data mappers."""
if self._mapper is not None:
self._mapper.initialize(self._dst, self._src, vars=self._vars_to_map)
[docs] def run(self, stop_time):
"""Map values from one port to another."""
for dst_name, src_name in self._vars_to_map:
src_values = self._src.get_value(
src_name, units=self._dst.get_var_units(dst_name)
)
if self._mapper is None:
dst_values = src_values
else:
dst_values = self._mapper.run(src_values)
self._dst.set_value(dst_name, dst_values)
[docs] def finalize(self):
pass