Usage¶
sgn-gwframe provides three SGN pipeline elements for working with gravitational wave frame files: two sources for reading data, and a sink for writing it.
All elements integrate with SGN pipelines via the sgn-ts timeseries framework.
FrameSource¶
FrameSource reads timeseries data
from .gwf frame files. It accepts data from a frame cache file, a single
.gwf file, or a list of .gwf file paths.
Key parameters¶
- channels -- list of channel names to read (e.g.,
["L1:GWOSC-16KHZ_R1_STRAIN"]) - frames -- path to a
.cachefile, a single.gwffile, or a list of.gwfpaths - start -- GPS start time (required)
- end or duration -- GPS end time, or total duration in seconds (one is required to define when the stream ends)
- instrument -- optional instrument filter (e.g.,
"H1") to only read matching files
Reading from a frame cache¶
A frame cache is a text file where each line describes a frame file:
L L1_GWOSC_16KHZ_R1 1187008882 32 /data/frames/L-L1_GWOSC-1187008882-32.gwf
L L1_GWOSC_16KHZ_R1 1187008914 32 /data/frames/L-L1_GWOSC-1187008914-32.gwf
from sgn_gwframe.sources import FrameSource
src = FrameSource(
name="gwosc",
channels=["L1:GWOSC-16KHZ_R1_STRAIN"],
frames="frames.cache",
start=1187008882,
end=1187008946,
)
Reading from individual frame files¶
# Single file
src = FrameSource(
name="single",
channels=["H1:GDS-CALIB_STRAIN"],
frames="/data/H-H1_GWOSC-1187008882-32.gwf",
start=1187008882,
duration=32,
)
# Multiple files
src = FrameSource(
name="multi",
channels=["H1:GDS-CALIB_STRAIN"],
frames=[
"/data/H-H1_GWOSC-1187008882-32.gwf",
"/data/H-H1_GWOSC-1187008914-32.gwf",
],
start=1187008882,
end=1187008946,
)
Gap handling¶
If the provided frame files have gaps (missing time segments), FrameSource
automatically pads those regions with gap buffers and logs a warning. Downstream
elements can detect these gaps via the buffer's data_valid flag.
FrameWatchSource¶
FrameWatchSource monitors a
directory for new .gwf frame files in real-time. It is designed for live
streaming pipelines where frame files are continuously written by an upstream
process (e.g., detector data acquisition).
Frame files must follow the
T050017 naming convention:
{site}-{description}-{gpstime}-{duration}.gwf.
Key parameters¶
- channels -- list of channel names to read from arriving frame files
- watch_dir -- directory path to monitor for new files
- start -- GPS start time; when
None, uses current GPS time (live mode) - discont_wait_time -- seconds to wait with no data before sending a gap buffer (default: 60)
- queue_timeout -- seconds to wait for the next file from the internal queue (default: 1)
- watch_suffix -- file suffix to watch for (default:
".gwf")
Live streaming¶
When start is not specified, FrameWatchSource begins from the current GPS
time and streams data as new frame files appear:
from sgn_gwframe.sources import FrameWatchSource
src = FrameWatchSource(
name="L1_live",
channels=["L1:GDS-CALIB_STRAIN"],
watch_dir="/data/frames/L1",
)
Replay from a specific time¶
Set start to replay historical data from frame files already on disk:
src = FrameWatchSource(
name="L1_replay",
channels=["L1:GDS-CALIB_STRAIN"],
watch_dir="/data/frames/L1",
start=1187008882,
)
Discontinuity handling¶
When a gap is detected (no data for longer than discont_wait_time seconds),
FrameWatchSource sends gap buffers to keep the pipeline advancing. During
shorter pauses, it sends zero-duration heartbeat buffers to signal liveness
without advancing the data stream.
FrameSink¶
FrameSink writes timeseries data to
.gwf frame files. It supports atomic writes, compression, multi-frame files,
subdirectory organization, and automatic file retention.
Key parameters¶
- channels -- list of channel names to write
- duration -- duration of each output frame in seconds
- path -- output path template with format placeholders:
{instruments},{description},{gps_start_time},{duration} - description -- description string for the filename (default:
"SGN") - compression / compression_level -- compression scheme and level (0-9)
Basic usage¶
from sgn_gwframe.sinks import FrameSink
sink = FrameSink(
name="writer",
channels=["H1:GDS-CALIB_STRAIN"],
duration=1,
path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
description="FILTERED",
)
Multi-frame files¶
Use frame_duration to combine multiple frames into a single file. The value
must be an integer multiple of duration:
sink = FrameSink(
name="writer",
channels=["H1:GDS-CALIB_STRAIN"],
duration=1,
frame_duration=16, # 16 one-second frames per file
path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
)
Subdirectory organization¶
Use subdir_seconds to organize output files into time-bucketed subdirectories:
sink = FrameSink(
name="writer",
channels=["H1:GDS-CALIB_STRAIN"],
duration=1,
subdir_seconds=100000, # e.g., GPS 1234567890 -> output/12345/
path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
)
Retention policies¶
FrameSink supports both count-based and time-based retention to manage disk
usage. These can be used independently or together:
sink = FrameSink(
name="writer",
channels=["H1:GDS-CALIB_STRAIN"],
duration=1,
max_files=1000, # keep only the 1000 most recent files
retention_time=86400, # also delete files older than 24 hours
path="output/{instruments}-{description}-{gps_start_time}-{duration}.gwf",
)
Atomic writes¶
All frame files are written atomically: data is first written to a temporary
file, then moved to the final path. This prevents downstream consumers from
reading partially-written files. Use tmpdir to specify a custom temporary
directory if desired.