Skip to content

Usage

sgn-gwframe provides three SGN pipeline elements for working with gravitational wave frame files: two sources for reading data, and a sink for writing it.

All elements integrate with SGN pipelines via the sgn-ts timeseries framework.

FrameSource

FrameSource reads timeseries data from .gwf frame files. It accepts data from a frame cache file, a single .gwf file, or a list of .gwf file paths.

Key parameters

  • channels -- list of channel names to read (e.g., ["L1:GWOSC-16KHZ_R1_STRAIN"])
  • frames -- path to a .cache file, a single .gwf file, or a list of .gwf paths
  • start -- GPS start time (required)
  • end or duration -- GPS end time, or total duration in seconds (one is required to define when the stream ends)
  • instrument -- optional instrument filter (e.g., "H1") to only read matching files

Reading from a frame cache

A frame cache is a text file where each line describes a frame file:

L L1_GWOSC_16KHZ_R1 1187008882 32 /data/frames/L-L1_GWOSC-1187008882-32.gwf
L L1_GWOSC_16KHZ_R1 1187008914 32 /data/frames/L-L1_GWOSC-1187008914-32.gwf
from sgn_gwframe.sources import FrameSource

src = FrameSource(
    name="gwosc",
    channels=["L1:GWOSC-16KHZ_R1_STRAIN"],
    frames="frames.cache",
    start=1187008882,
    end=1187008946,
)

Reading from individual frame files

# Single file
src = FrameSource(
    name="single",
    channels=["H1:GDS-CALIB_STRAIN"],
    frames="/data/H-H1_GWOSC-1187008882-32.gwf",
    start=1187008882,
    duration=32,
)

# Multiple files
src = FrameSource(
    name="multi",
    channels=["H1:GDS-CALIB_STRAIN"],
    frames=[
        "/data/H-H1_GWOSC-1187008882-32.gwf",
        "/data/H-H1_GWOSC-1187008914-32.gwf",
    ],
    start=1187008882,
    end=1187008946,
)

Gap handling

If the provided frame files have gaps (missing time segments), FrameSource automatically pads those regions with gap buffers and logs a warning. Downstream elements can detect these gaps via the buffer's data_valid flag.

FrameWatchSource

FrameWatchSource monitors a directory for new .gwf frame files in real-time. It is designed for live streaming pipelines where frame files are continuously written by an upstream process (e.g., detector data acquisition).

Frame files must follow the T050017 naming convention: {site}-{description}-{gpstime}-{duration}.gwf.

Key parameters

  • channels -- list of channel names to read from arriving frame files
  • watch_dir -- directory path to monitor for new files
  • start -- GPS start time; when None, uses current GPS time (live mode)
  • discont_wait_time -- seconds to wait with no data before sending a gap buffer (default: 60)
  • queue_timeout -- seconds to wait for the next file from the internal queue (default: 1)
  • watch_suffix -- file suffix to watch for (default: ".gwf")

Live streaming

When start is not specified, FrameWatchSource begins from the current GPS time and streams data as new frame files appear:

from sgn_gwframe.sources import FrameWatchSource

src = FrameWatchSource(
    name="L1_live",
    channels=["L1:GDS-CALIB_STRAIN"],
    watch_dir="/data/frames/L1",
)

Replay from a specific time

Set start to replay historical data from frame files already on disk:

src = FrameWatchSource(
    name="L1_replay",
    channels=["L1:GDS-CALIB_STRAIN"],
    watch_dir="/data/frames/L1",
    start=1187008882,
)

Discontinuity handling

When a gap is detected (no data for longer than discont_wait_time seconds), FrameWatchSource sends gap buffers to keep the pipeline advancing. During shorter pauses, it sends zero-duration heartbeat buffers to signal liveness without advancing the data stream.

FrameSink

FrameSink writes timeseries data to .gwf frame files. It supports atomic writes, compression, multi-frame files, subdirectory organization, and automatic file retention.

Key parameters

  • channels -- list of channel names to write
  • duration -- duration of each output frame in seconds
  • path -- output path template with format placeholders: {instruments}, {description}, {gps_start_time}, {duration}
  • description -- description string for the filename (default: "SGN")
  • compression / compression_level -- compression scheme and level (0-9)

Basic usage

from sgn_gwframe.sinks import FrameSink

sink = FrameSink(
    name="writer",
    channels=["H1:GDS-CALIB_STRAIN"],
    duration=1,
    path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
    description="FILTERED",
)

Multi-frame files

Use frame_duration to combine multiple frames into a single file. The value must be an integer multiple of duration:

sink = FrameSink(
    name="writer",
    channels=["H1:GDS-CALIB_STRAIN"],
    duration=1,
    frame_duration=16,  # 16 one-second frames per file
    path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
)

Subdirectory organization

Use subdir_seconds to organize output files into time-bucketed subdirectories:

sink = FrameSink(
    name="writer",
    channels=["H1:GDS-CALIB_STRAIN"],
    duration=1,
    subdir_seconds=100000,  # e.g., GPS 1234567890 -> output/12345/
    path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
)

Retention policies

FrameSink supports both count-based and time-based retention to manage disk usage. These can be used independently or together:

sink = FrameSink(
    name="writer",
    channels=["H1:GDS-CALIB_STRAIN"],
    duration=1,
    max_files=1000,          # keep only the 1000 most recent files
    retention_time=86400,    # also delete files older than 24 hours
    path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
)

Atomic writes

All frame files are written atomically: data is first written to a temporary file, then moved to the final path. This prevents downstream consumers from reading partially-written files. Use tmpdir to specify a custom temporary directory if desired.