Source code for aeon.schema.streams

import inspect
from itertools import chain
from warnings import warn


[docs] class Stream: """Represents a single data stream. Attributes: reader (Reader): The reader used to retrieve the stream data. """ def __init__(self, reader): self.reader = reader def __iter__(self): yield (self.__class__.__name__, self.reader)
[docs] class StreamGroup: """Represents a logical group of multiple data streams. Attributes: path (str): Path to the folder where stream chunks are located. args (Any): Data streams or data stream groups to be included in this stream group. """ def __init__(self, path, *args): self.path = path self._args = args self._nested = ( member for member in vars(self.__class__).values() if inspect.isclass(member) and issubclass(member, Stream | StreamGroup) ) def __iter__(self): for factory in chain(self._nested, self._args): yield from iter(factory(self.path))
[docs] class Device: """Groups multiple data streams into a logical device. If a device contains a single stream with the same pattern as the device `name`, it will be considered a singleton, and the stream reader will be paired directly with the device without nesting. Attributes: name (str): Name of the device. args (Any): Data streams collected from the device. path (str, optional): Path to the folder where stream chunks are located. """ def __init__(self, name, *args, path=None): if name is None: raise ValueError("name cannot be None.") self.name = name self._streams = Device._createStreams(name if path is None else path, args) @staticmethod def _createStreams(path, args): streams = {} for factory in args: if inspect.isclass(factory) and not hasattr(factory.__init__, "__code__"): warn( f"Stream group classes with default constructors are deprecated. {factory}", category=DeprecationWarning, stacklevel=2, ) for method in vars(factory).values(): if isinstance(method, staticmethod): streams.update(method.__func__(path)) else: streams.update(factory(path)) return streams def __iter__(self): if len(self._streams) == 1: singleton = self._streams.get(self.name, None) if singleton: return iter((self.name, singleton)) return iter((self.name, self._streams))