Source code for aeon.io.api

import bisect
import datetime
from os import PathLike
from pathlib import Path

import pandas as pd

"""The duration of each acquisition chunk, in whole hours."""
CHUNK_DURATION = 1


[docs] def aeon(seconds): """Converts a Harp timestamp, in seconds, to a datetime object.""" return datetime.datetime(1904, 1, 1) + pd.to_timedelta(seconds, "s")
[docs] def chunk(time): """Returns the whole hour acquisition chunk for a measurement timestamp. :param datetime or Series time: An object or series specifying the measurement timestamps. :return: A datetime object or series specifying the acquisition chunk for the measurement timestamp. """ if isinstance(time, pd.Series): hour = CHUNK_DURATION * (time.dt.hour // CHUNK_DURATION) return pd.to_datetime(time.dt.date) + pd.to_timedelta(hour, "h") else: hour = CHUNK_DURATION * (time.hour // CHUNK_DURATION) return pd.to_datetime(datetime.datetime.combine(time.date(), datetime.time(hour=hour)))
[docs] def chunk_range(start, end): """Returns a range of whole hour acquisition chunks. :param datetime start: The left bound of the time range. :param datetime end: The right bound of the time range. :return: A DatetimeIndex representing the acquisition chunk range. """ return pd.date_range(chunk(start), chunk(end), freq=pd.DateOffset(hours=CHUNK_DURATION))
[docs] def chunk_key(file): """Returns the acquisition chunk key for the specified file name.""" epoch = file.parts[-3] chunk_str = file.stem.split("_")[-1] try: date_str, time_str = chunk_str.split("T") except ValueError: epoch = file.parts[-2] date_str, time_str = epoch.split("T") return epoch, datetime.datetime.fromisoformat(date_str + "T" + time_str.replace("-", ":"))
def _set_index(data): if not isinstance(data.index, pd.DatetimeIndex): data.index = aeon(data.index) data.index.name = "time" def _empty(columns): return pd.DataFrame(columns=columns, index=pd.DatetimeIndex([], name="time"))
[docs] def load(root, reader, start=None, end=None, time=None, tolerance=None, epoch=None, **kwargs): """Extracts chunk data from the root path of an Aeon dataset. Reads all chunk data using the specified data stream reader. A subset of the data can be loaded by specifying an optional time range, or a list of timestamps used to index the data on file. Returned data will be sorted chronologically. :param str or PathLike root: The root path, or prioritised sequence of paths, where data is stored. :param Reader reader: A data stream reader object used to read chunk data from the dataset. :param datetime, optional start: The left bound of the time range to extract. :param datetime, optional end: The right bound of the time range to extract. :param datetime, optional time: An object or series specifying the timestamps to extract. :param datetime, optional tolerance: The maximum distance between original and new timestamps for inexact matches. :param str, optional epoch: A wildcard pattern to use when searching epoch data. :param optional kwargs: Optional keyword arguments to forward to the reader when reading chunk data. :return: A pandas data frame containing epoch event metadata, sorted by time. """ if isinstance(root, str): root = Path(root) if isinstance(root, PathLike): root = [root] epoch_pattern = "**" if epoch is None else epoch fileset = { chunk_key(fname): fname for path in root for fname in Path(path).glob(f"{epoch_pattern}/**/{reader.pattern}.{reader.extension}") } files = sorted(fileset.items()) if time is not None: # ensure input is converted to timestamp series if isinstance(time, pd.DataFrame): time = time.index if not isinstance(time, pd.Series): time = pd.Series(time) time.index = time dataframes = [] filetimes = [chunk for (_, chunk), _ in files] files = [file for _, file in files] for key, values in time.groupby(by=chunk): i = bisect.bisect_left(filetimes, key) # type: ignore if i < len(filetimes): frame = reader.read(files[i], **kwargs) _set_index(frame) else: frame = _empty(reader.columns) data = frame.reset_index() data.set_index("time", drop=False, inplace=True) data = data.reindex(values, method="pad", tolerance=tolerance) missing = len(data.time) - data.time.count() if missing > 0 and i > 0: # expand reindex to allow adjacent chunks # to fill missing values previous = reader.read(files[i - 1], **kwargs) data = pd.concat([previous, frame]) data = data.reindex(values, tolerance=tolerance) data.dropna(inplace=True) else: data.drop(columns="time", inplace=True) dataframes.append(data) if len(dataframes) == 0: return _empty(reader.columns) return pd.concat(dataframes) if start is not None or end is not None: chunk_start = chunk(start) if start is not None else pd.Timestamp.min chunk_end = chunk(end) if end is not None else pd.Timestamp.max files = list(filter(lambda item: chunk_start <= chunk(item[0][1]) <= chunk_end, files)) if len(files) == 0: return _empty(reader.columns) data = pd.concat([reader.read(file, **kwargs) for _, file in files]) _set_index(data) if start is not None or end is not None: try: return data.loc[start:end] except KeyError: import warnings if not data.index.has_duplicates: warnings.warn( f"data index for {reader.pattern} contains out-of-order timestamps!", stacklevel=2 ) data = data.sort_index() else: warnings.warn(f"data index for {reader.pattern} contains duplicate keys!", stacklevel=2) data = data[~data.index.duplicated(keep="first")] return data.loc[start:end] return data