Source code for aeon.schema.core

import aeon.io.reader as _reader
from aeon.schema.streams import Stream, StreamGroup


[docs] class Heartbeat(Stream): """Heartbeat event for Harp devices.""" def __init__(self, pattern): super().__init__(_reader.Heartbeat(f"{pattern}_8_*"))
[docs] class Video(Stream): """Video frame metadata.""" def __init__(self, pattern): super().__init__(_reader.Video(f"{pattern}_*"))
[docs] class Position(Stream): """Position tracking data for the specified camera.""" def __init__(self, pattern): super().__init__(_reader.Position(f"{pattern}_200_*"))
[docs] class Encoder(Stream): """Wheel magnetic encoder data.""" def __init__(self, pattern): super().__init__(_reader.Encoder(f"{pattern}_90_*"))
[docs] class Environment(StreamGroup): """Metadata for environment mode and subjects.""" def __init__(self, pattern): super().__init__(pattern, EnvironmentState, SubjectState)
[docs] class EnvironmentState(Stream): """Environment state log.""" def __init__(self, pattern): super().__init__(_reader.Csv(f"{pattern}_EnvironmentState_*", ["state"]))
[docs] class SubjectState(Stream): """Subject state log.""" def __init__(self, pattern): super().__init__(_reader.Subject(f"{pattern}_SubjectState_*"))
[docs] class MessageLog(Stream): """Message log data.""" def __init__(self, pattern): super().__init__(_reader.Log(f"{pattern}_MessageLog_*"))
[docs] class Metadata(Stream): """Metadata for acquisition epochs.""" def __init__(self, pattern): super().__init__(_reader.Metadata(pattern))