Source code for noob.node.tube
import uuid
import warnings
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, Union
from noob.event import Event
from noob.exceptions import ExtraInputWarning
from noob.node.base import Node, Slot
from noob.node.spec import NodeSpecification
from noob.types import ConfigSource, Epoch, RunnerContext
if TYPE_CHECKING:
from noob.runner import TubeRunner
from noob.tube import Tube, TubeSpecification
[docs]
class TubeNode(Node):
"""
A node that contains another tube within it
"""
tube: ConfigSource
_tube: Union["Tube", None] = None
_tube_spec: Union["TubeSpecification", None] = None
_runner: Union["TubeRunner", None] = None
@property
def tube_spec(self) -> "TubeSpecification":
from noob.tube import TubeSpecification
if self._tube_spec is None:
self._tube_spec = TubeSpecification.from_any(self.tube)
return self._tube_spec
# TODO: support dependency injected inits in plugin
[docs]
def init(self, context: RunnerContext) -> None: # type: ignore[override]
from noob import SynchronousRunner, Tube
with warnings.catch_warnings(action="ignore", category=ExtraInputWarning):
self._tube = Tube.from_specification(
self.tube, input={**context["tube"].input_collection.chain}
)
self._runner = SynchronousRunner(tube=self._tube)
self._runner.init()
[docs]
def deinit(self) -> None:
if self._runner is not None:
self._runner.deinit()
[docs]
def process(self, epoch: Epoch, **kwargs: Any) -> Any:
if self._runner is None:
raise RuntimeError(
"TubeNode must be initialized within a Runner "
"to receive the outer runner's context. "
"It doesn't make sense to run a TubeNode on its own."
)
res = self._runner.process(**kwargs)
if isinstance(res, dict):
now = datetime.now(UTC)
return [
Event(
id=uuid.uuid4().int,
timestamp=now,
node_id=self.id,
signal=key,
epoch=epoch,
value=value,
)
for key, value in res.items()
]
else:
return res
[docs]
@classmethod
def get_slots(cls, spec: NodeSpecification | None = None) -> dict[str, Slot]:
if spec is None:
raise ValueError("Must pass a spec to get slots for a tube node")
from noob.input import InputScope
from noob.tube import TubeSpecification
if not spec.params or "tube" not in spec.params:
raise ValueError("Tube node specifications must have a `tube` in their params")
tube_spec = TubeSpecification.from_any(spec.params["tube"])
slots = {}
for in_key, in_val in tube_spec.input.items():
if in_val.scope == InputScope.process:
slots[in_key] = Slot(name=in_key, annotation=Any)
return slots