Source code for aeon.schema.social_02

import aeon.io.reader as _reader
from aeon.schema import core, foraging
from aeon.schema.streams import Stream, StreamGroup


[docs] class Environment(StreamGroup): def __init__(self, path): super().__init__(path) EnvironmentState = core.EnvironmentState
[docs] class BlockState(Stream): def __init__(self, path): super().__init__( _reader.Csv(f"{path}_BlockState_*", columns=["pellet_ct", "pellet_ct_thresh", "due_time"]) )
[docs] class LightEvents(Stream): def __init__(self, path): super().__init__(_reader.Csv(f"{path}_LightEvents_*", columns=["channel", "value"]))
MessageLog = core.MessageLog
[docs] class SubjectData(StreamGroup): def __init__(self, path): super().__init__(path)
[docs] class SubjectState(Stream): def __init__(self, path): super().__init__(_reader.Csv(f"{path}_SubjectState_*", columns=["id", "weight", "type"]))
[docs] class SubjectVisits(Stream): def __init__(self, path): super().__init__(_reader.Csv(f"{path}_SubjectVisits_*", columns=["id", "type", "region"]))
[docs] class SubjectWeight(Stream): def __init__(self, path): super().__init__( _reader.Csv( f"{path}_SubjectWeight_*", columns=["weight", "confidence", "subject_id", "int_id"] ) )
[docs] class Pose(Stream): def __init__(self, path): super().__init__(_reader.Pose(f"{path}_test-node1*"))
[docs] class WeightRaw(Stream): def __init__(self, path): super().__init__(_reader.Harp(f"{path}_200_*", ["weight(g)", "stability"]))
[docs] class WeightFiltered(Stream): def __init__(self, path): super().__init__(_reader.Harp(f"{path}_202_*", ["weight(g)", "stability"]))
[docs] class Patch(StreamGroup): def __init__(self, path): super().__init__(path)
[docs] class DepletionState(Stream): def __init__(self, path): super().__init__(_reader.Csv(f"{path}_State_*", columns=["threshold", "offset", "rate"]))
Encoder = core.Encoder Feeder = foraging.Feeder
[docs] class ManualDelivery(Stream): def __init__(self, path): super().__init__(_reader.Harp(f"{path}_201_*", ["manual_delivery"]))
[docs] class MissedPellet(Stream): def __init__(self, path): super().__init__(_reader.Harp(f"{path}_202_*", ["missed_pellet"]))
[docs] class RetriedDelivery(Stream): def __init__(self, path): super().__init__(_reader.Harp(f"{path}_203_*", ["retried_delivery"]))
[docs] class RfidEvents(Stream): def __init__(self, path): super().__init__(_reader.Harp(f"{path}_32*", ["rfid"]))