Source code for swc.aeon.io.api

"""API for reading Aeon data from disk."""

import bisect
import datetime
import warnings
from os import PathLike
from pathlib import Path
from typing import Literal, overload

import pandas as pd
from pandas._typing import SequenceNotStr
from typing_extensions import deprecated

CHUNK_DURATION = 1
"""The duration of each acquisition chunk, in whole hours."""

REFERENCE_EPOCH = datetime.datetime(1904, 1, 1, tzinfo=datetime.UTC)
"""The reference epoch for UTC harp time."""


[docs] @deprecated("Please use the to_datetime function instead.") def aeon(seconds: float | pd.Index | pd.Series) -> datetime.datetime | pd.DatetimeIndex | pd.Series: """Converts a Harp timestamp, in seconds, to a datetime object. .. deprecated:: 0.2.0 This function is deprecated and will be removed in a future release. Use :func:`to_datetime` instead. """ return to_datetime(seconds) # pragma: no cover
@overload def to_datetime(seconds: float) -> datetime.datetime: ... @overload def to_datetime(seconds: pd.Index) -> pd.DatetimeIndex: ... @overload def to_datetime(seconds: pd.Series) -> pd.Series: ...
[docs] def to_datetime(seconds: float | pd.Index | pd.Series) -> datetime.datetime | pd.DatetimeIndex | pd.Series: """Converts a UTC Harp timestamp to datetime. This function converts a scalar, Index or Series into a datetime type. Args: seconds: The Harp timestamp data, in fractional seconds, to convert to datetime. Returns: The decoded UTC Harp timestamp data. Return type depends on input. - scalar: datetime object - Index: DatetimeIndex of datetime64 dtype - Series: Series of datetime64 dtype """ return REFERENCE_EPOCH + pd.to_timedelta(seconds, "s")
@overload def to_seconds(time: datetime.datetime) -> float: ... @overload def to_seconds(time: pd.DatetimeIndex) -> pd.Index: ... @overload def to_seconds(time: pd.Series) -> pd.Series: ...
[docs] def to_seconds( time: datetime.datetime | pd.DatetimeIndex | pd.Series, ) -> float | pd.Index | pd.Series: """Converts datetime to UTC Harp timestamp. This function converts a datetime object, DatetimeIndex or Series into a UTC Harp timestamp type. Args: time: The object to convert to a UTC Harp timestamp type. Returns: The UTC Harp timestamp data, in fractional seconds. Return type depends on input. - datetime: scalar - DatetimeIndex: Index - Series: Series """ if isinstance(time, pd.Series): return (pd.to_datetime(time, utc=True) - REFERENCE_EPOCH).dt.total_seconds() else: return (pd.to_datetime(time, utc=True) - REFERENCE_EPOCH).total_seconds()
@overload def chunk(time: datetime.datetime) -> pd.Timestamp: ... @overload def chunk(time: pd.DatetimeIndex) -> pd.DatetimeIndex: ... @overload def chunk(time: "pd.Series[pd.Timestamp]") -> pd.Series: ...
[docs] def chunk( time: "datetime.datetime | pd.DatetimeIndex | pd.Series[pd.Timestamp]", ) -> pd.Timestamp | pd.DatetimeIndex | pd.Series: """Returns the whole hour acquisition chunk for a measurement timestamp. Args: time: The object for which to retrieve the acquisition chunk. Returns: Return type depends on input. - datetime: Timestamp representing the acquisition chunk - DatetimeIndex: DatetimeIndex of decoded acquisition chunks - Series: Series of decoded acquisition chunks """ if isinstance(time, pd.Series): hour = CHUNK_DURATION * (time.dt.hour // CHUNK_DURATION) return pd.to_datetime(time.dt.date, utc=True) + pd.to_timedelta(hour, "h") elif isinstance(time, pd.DatetimeIndex): hour = CHUNK_DURATION * (time.hour // CHUNK_DURATION) return pd.DatetimeIndex(time.date, tz=datetime.UTC) + pd.to_timedelta(hour, "h") else: hour = CHUNK_DURATION * (time.hour // CHUNK_DURATION) return pd.to_datetime(datetime.datetime.combine(time.date(), datetime.time(hour=hour)), utc=True)
[docs] def chunk_range(start: datetime.datetime, end: datetime.datetime) -> pd.DatetimeIndex: """Returns a fixed frequency DatetimeIndex of acquisition chunk dates. Args: start: The left bound for generating chunk dates. end: The right bound for generating chunk dates. Returns: The acquisition chunk range. """ return pd.date_range(chunk(start), chunk(end), freq=pd.DateOffset(hours=CHUNK_DURATION))
[docs] def chunk_key(path: Path) -> tuple[str, datetime.datetime]: """Returns the acquisition chunk key for the specified file. Args: path: The path to the file for which to retrieve the acquisition chunk key. Returns: A tuple containing the epoch string and the acquisition chunk datetime. """ epoch = path.parts[-3] chunk_str = path.stem.split("_")[-1] try: date_str, time_str = chunk_str.split("T") except ValueError: epoch = path.parts[-2] date_str, time_str = epoch.split("T") return epoch, pd.to_datetime( datetime.datetime.fromisoformat(date_str + "T" + time_str.replace("-", ":")), utc=True )
def _set_index(data: pd.DataFrame) -> None: if not isinstance(data.index, pd.DatetimeIndex): data.index = to_datetime(data.index) else: data.index = pd.to_datetime(data.index, utc=True) data.index.name = "time" def _empty(columns: SequenceNotStr[str]) -> pd.DataFrame: return pd.DataFrame(columns=columns, index=pd.DatetimeIndex([], name="time", tz=datetime.UTC)) def _filter_time_range( frame: pd.DataFrame, start: datetime.datetime | None, end: datetime.datetime | None, inclusive: Literal["both", "neither", "left", "right"] = "both", ) -> pd.DataFrame: """Access rows between the specified time range. Args: frame: The DataFrame to filter. start: The left bound of the time range. If not specified, the start of the sequence is included. end: The right bound of the time range. If not specified, the end of the sequence is included. inclusive: Specifies whether the `start` and `end` bounds are inclusive or exclusive. Options are "both", "left", "right", or "neither". Returns: The filtered DataFrame. """ result = frame.loc[start:end] if inclusive == "both" or len(result) == 0: return result first_idx_equals_start = result.index[0] == start last_idx_equals_end = result.index[-1] == end if inclusive == "left": # drop final row if the index is equal to end return result.iloc[:-1] if last_idx_equals_end else result elif inclusive == "right": # drop first row if the index is equal to start return result.iloc[1:] if first_idx_equals_start else result else: result = result.iloc[1:] if first_idx_equals_start else result result = result.iloc[:-1] if last_idx_equals_end else result return result
[docs] class Reader: """Extracts data from raw files in an Aeon dataset.""" pattern: str """Pattern used to find raw files, usually in the format `<Device>_<DataStream>`.""" columns: SequenceNotStr[str] """Column labels to use for the data.""" extension: str """Extension of data file pathnames.""" def __init__(self, pattern: str, columns: SequenceNotStr[str], extension: str): """Initialize the object with specified pattern, columns, and file extension.""" self.pattern = pattern self.columns = columns self.extension = extension
[docs] def read(self, path: Path) -> pd.DataFrame: """Reads data from the specified file. Args: path: Path to the data file. Returns: A DataFrame representing the data extracted from the specified file. """ return pd.DataFrame(columns=self.columns, index=pd.DatetimeIndex([]))
[docs] def load( root: str | PathLike | list[str] | list[PathLike], reader: Reader, start: datetime.datetime | None = None, end: datetime.datetime | None = None, inclusive: Literal["both", "neither", "left", "right"] = "both", time: datetime.datetime | list[datetime.datetime] | pd.DatetimeIndex | pd.DataFrame | None = None, tolerance: pd.Timedelta | None = None, epoch: str | None = None, **kwargs, ) -> pd.DataFrame: """Extracts chunk data from the root path of an Aeon dataset. Reads all chunk data using the specified data stream reader. A subset of the data can be loaded by specifying an optional time range, or a list of timestamps used to index the data on file. Returned data will be sorted chronologically. Note: Any timezone-naive values in `start`, `end`, and `time` will be treated as UTC. Args: root: The root path, or prioritised sequence of paths, where data is stored. reader: A data stream reader object used to read chunk data from the dataset. start: The left bound of the time range to extract. end: The right bound of the time range to extract. inclusive: Specifies whether the `start` and `end` bounds are inclusive or exclusive. This argument is only applicable when `start` and/or `end` bounds are provided. Options are "both", "left", "right", or "neither". time: A single timestamp, list of timestamps, DatetimeIndex, or a DataFrame with DatetimeIndex specifying the timestamps to extract. tolerance: The maximum distance between original and new timestamps for inexact matches. epoch: A wildcard pattern to use when searching epoch data. **kwargs: Optional keyword arguments to forward to `reader` when reading chunk data. Returns: A DataFrame containing extracted chunk data, sorted by time. """ if isinstance(root, str): root = Path(root) if isinstance(root, PathLike): root = [root] if start is not None: start = pd.to_datetime(start, utc=True) if end is not None: end = pd.to_datetime(end, utc=True) epoch_pattern = "**" if epoch is None else epoch fileset = { chunk_key(fname): fname for path in root for fname in Path(path).glob(f"{epoch_pattern}/**/{reader.pattern}.{reader.extension}") } files = sorted(fileset.items()) if time is not None: # ensure input is converted to timestamp series timestamps: pd.Series if isinstance(time, pd.DataFrame): timestamps = time.index.to_series() else: timestamps = pd.Series(time) timestamps.index = pd.DatetimeIndex(timestamps) timestamps = pd.to_datetime(timestamps, utc=True) dataframes = [] filetimes = [chunk for (_, chunk), _ in files] files = [file for _, file in files] for key, values in timestamps.groupby(by=chunk): i = bisect.bisect_left(filetimes, key) # type: ignore if i < len(filetimes): frame = reader.read(files[i], **kwargs) _set_index(frame) else: frame = _empty(reader.columns) data = frame.reset_index() data.set_index("time", drop=False, inplace=True) data = data.reindex(values, method="pad", tolerance=tolerance) missing = len(data.time) - data.time.count() if missing > 0 and i > 0: # expand reindex to allow adjacent chunks # to fill missing values previous = reader.read(files[i - 1], **kwargs) _set_index(previous) data = pd.concat([previous, frame]) data = data.reindex(values, method="pad", tolerance=tolerance) else: data.drop(columns="time", inplace=True) dataframes.append(data) if len(dataframes) == 0: return _empty(reader.columns) return pd.concat(dataframes) if start is not None or end is not None: chunk_start = chunk(start) if start is not None else pd.to_datetime(pd.Timestamp.min, utc=True) chunk_end = chunk(end) if end is not None else pd.to_datetime(pd.Timestamp.max, utc=True) files = list(filter(lambda item: chunk_start <= chunk(item[0][1]) <= chunk_end, files)) if len(files) == 0: return _empty(reader.columns) data = pd.concat([reader.read(file, **kwargs) for _, file in files]) _set_index(data) if start is not None or end is not None: try: return _filter_time_range(data, start, end, inclusive) except KeyError: if not data.index.is_monotonic_increasing: warnings.warn( f"data index for {reader.pattern} contains out-of-order timestamps!", stacklevel=2 ) data = data.sort_index() else: # pragma: no cover raise return data