Source code for aeon.io.device
import inspect
from typing_extensions import deprecated
[docs]
@deprecated("Please use the StreamGroup class from the streams module instead.")
def compositeStream(pattern, *args):
"""Merges multiple data streams into a single composite stream."""
composite = {}
if args:
for binder_fn in args:
if inspect.isclass(binder_fn):
for method in vars(binder_fn).values():
if isinstance(method, staticmethod):
composite.update(method.__func__(pattern))
else:
composite.update(binder_fn(pattern))
return composite
[docs]
@deprecated("The Device class has been moved to the streams module.")
class Device:
"""Groups multiple Readers into a logical device.
If a device contains a single stream reader with the same pattern as the device `name`, it will be
considered a singleton, and the stream reader will be paired directly with the device without nesting.
Attributes:
name (str): Name of the device.
args (any): A binder function or class that returns a dictionary of Readers.
pattern (str, optional): Pattern used to find raw chunk files,
usually in the format `<Device>_<DataStream>`.
"""
def __init__(self, name, *args, pattern=None):
self.name = name
self.registry = compositeStream(name if pattern is None else pattern, *args)
def __iter__(self):
if len(self.registry) == 1:
singleton = self.registry.get(self.name, None)
if singleton:
return iter((self.name, singleton))
return iter((self.name, self.registry))