Source code for aeon.schema.core
import aeon.io.reader as _reader
from aeon.schema.streams import Stream, StreamGroup
[docs]
class Heartbeat(Stream):
"""Heartbeat event for Harp devices."""
def __init__(self, pattern):
super().__init__(_reader.Heartbeat(f"{pattern}_8_*"))
[docs]
class Video(Stream):
"""Video frame metadata."""
def __init__(self, pattern):
super().__init__(_reader.Video(f"{pattern}_*"))
[docs]
class Position(Stream):
"""Position tracking data for the specified camera."""
def __init__(self, pattern):
super().__init__(_reader.Position(f"{pattern}_200_*"))
[docs]
class Encoder(Stream):
"""Wheel magnetic encoder data."""
def __init__(self, pattern):
super().__init__(_reader.Encoder(f"{pattern}_90_*"))
[docs]
class Environment(StreamGroup):
"""Metadata for environment mode and subjects."""
def __init__(self, pattern):
super().__init__(pattern, EnvironmentState, SubjectState)
[docs]
class EnvironmentState(Stream):
"""Environment state log."""
def __init__(self, pattern):
super().__init__(_reader.Csv(f"{pattern}_EnvironmentState_*", ["state"]))
[docs]
class SubjectState(Stream):
"""Subject state log."""
def __init__(self, pattern):
super().__init__(_reader.Subject(f"{pattern}_SubjectState_*"))
[docs]
class MessageLog(Stream):
"""Message log data."""
def __init__(self, pattern):
super().__init__(_reader.Log(f"{pattern}_MessageLog_*"))