Skip to content

frame

This module contains the FrameSink class.

Writes time series data to .gwf files.

FrameSink dataclass

FrameSink(*, channels, duration, path='{instruments}-{description}-{gps_start_time}-{duration}.gwf', force=False, description='SGN', max_files=None, retention_time=None, subdir_seconds=0, frame_name=None, frame_history=dict(), frame_duration=0, tmpdir=None, compression=gwframe.Compression.ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level=6)

Bases: TSSink

A sink element that writes time series data to file

Parameters:

Name Type Description Default
channels Sequence[str]

Sequence[str], the instruments to write to the file

required
duration int

int, the duration of the data to write to the file

required
path str

str, the path to write the frame files to. The file name must contain the following format parameters (in curly braces): - {instruments}, the sorted list of instruments inferred from the included channel names (e.g. "H1" for "H1:GDS-CAL...") - {description}, the description string for the frame - {gps_start_time}, the start time of the data in GPS seconds - {duration}, the duration of the data in seconds The extension on the the path determines the output file type. Currently ".gwf" and ".hdf5" are supported. default: "{instruments}-{description}-{gps_start_time}-{duration}.gwf"

'{instruments}-{description}-{gps_start_time}-{duration}.gwf'
force bool

bool, whether to overwrite existing files. Default: False

False
description str

str, description string to include in the filename. Default: "SGN"

'SGN'
max_files Optional[int]

int or None, when set to a positive value, enables circular buffer mode that keeps only the N most recent frame files. Older files are automatically deleted. Default: None (disabled)

None
retention_time Optional[float]

float or None, retention time in seconds. Files older than this will be automatically deleted. Can be combined with max_files. Default: None (keep forever)

None
subdir_seconds int

int, organize files into subdirectories based on time buckets. Files are grouped by GPS time divided by this value. For example, with subdir_seconds=100000, GPS time 1234567890 goes into subdirectory 12345/. When 0, files are written to a flat directory structure. Default: 0 (flat structure)

0
frame_name Optional[str]

str or None, name field for the frame metadata. If None, inferred from the instrument list (e.g., "H1L1"). Default: None

None
frame_history dict[str, str]

dict mapping history entry names to comments. These are added as frame history metadata. Default: empty dict

dict()
frame_duration int

int, total duration in seconds for multi-frame files. Must be an integer multiple of duration. When 0 or equal to duration, writes one frame per file. When greater than duration, multiple frames are written to a single file. For example, with duration=1 and frame_duration=16, the file contains 16 one-second frames and has a total duration of 16 seconds. Default: 0 (single frame per file)

0
tmpdir Optional[str]

str or None, directory for temporary files during atomic writes. If None, uses same directory as output. Default: None

None
compression int

int, compression scheme for frame files. Default: gwframe.Compression.ZERO_SUPPRESS_OTHERWISE_GZIP

ZERO_SUPPRESS_OTHERWISE_GZIP
compression_level int

int, compression level 0-9. Default: 6

6

static_sink_pads property

static_sink_pads

Return channels as static sink pads.

configure

configure()

Configure the FrameSink.

Source code in src/sgn_gwframe/sinks/frame.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def configure(self) -> None:
    """Configure the FrameSink."""
    # setup the adapter config for the audioadapter
    # ensure data is aligned to second boundaries
    stride = Offset.fromsec(self.duration)
    self.adapter_config.alignment(stride=stride, align_to=Offset.fromsec(1))

    self._instruments_str: str = "".join(
        sorted({chan.split(":")[0] for chan in self.channels})
    )

    # Initialize circular buffer tracking
    # Cache of created files: deque of filepaths in order of creation
    self._file_cache: deque[str] = deque()

    # Multi-frame file tracking
    self._current_writer: gwframe.FrameWriter | None = None
    self._current_file_path: Path | None = None
    self._temp_file_path: Path | None = None
    self._current_file_start: int | None = None
    self._file_duration: int = 0

process

process(input_frames)

Process input frames and write to file.

Parameters:

Name Type Description Default
input_frames dict[SinkPad, TSFrame]

dict[SinkPad, TSFrame], mapping of sink pads to their input frames

required
Source code in src/sgn_gwframe/sinks/frame.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
    """Process input frames and write to file.

    Args:
        input_frames:
            dict[SinkPad, TSFrame], mapping of sink pads to their input frames
    """
    # Collect channel data
    channels = {}
    sample_rates = {}
    start: float | None = None

    for pad, frame in input_frames.items():
        channel = pad.pad_name
        if frame.is_gap:
            return
        if frame.EOS:
            self.mark_eos(pad)

        # Load first buffer
        # TODO: fix this indexing to handle multiple buffers as multiple segments
        # This requires converting frame data to masked arrays using
        # the data_valid flag. Needs: sgn-ts MR 171 + gwframe masked array support
        data = frame.buffers[0]

        # TODO: check for above todo, For now, check if the buffer has enough data
        #  for the duration, later we'll need to cumulate check across multiple
        #  segments
        exp_samples = self.duration * data.sample_rate
        if data.samples < exp_samples:
            self.logger.warning(
                "Data does not contain enough samples for duration %d. Skipping",
                self.duration,
            )
            return

        # Compute GPS time from frame offset
        frame_start = Offset.offset_ref_start + Offset.tosec(frame.offset)
        frame_start = int(frame_start)  # Cast to int (aligned to second boundary)

        # Store start from first channel
        if start is None:
            start = frame_start

        # Store channel data and sample rate
        channels[channel] = data.data
        sample_rates[channel] = data.sample_rate

    # Create frame and add channels
    assert start is not None, "No valid frames to write"
    frame_name = self.frame_name or self._instruments_str
    gwf = gwframe.Frame(t0=start, duration=self.duration, name=frame_name)

    # Add history metadata
    for name, comment in self.frame_history.items():
        gwf.add_history(name, comment)

    for channel, data_array in channels.items():
        gwf.add_channel(
            channel=channel,
            data=data_array,
            sample_rate=sample_rates[channel],
        )

    self._write_frame(gwf, start, self.duration)

validate

validate()

Validate the FrameSink configuration.

Source code in src/sgn_gwframe/sinks/frame.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def validate(self) -> None:
    """Validate the FrameSink configuration."""
    # Check valid duration
    if not isinstance(self.duration, int) or self.duration <= 0:
        msg = f"Duration must be an positive integer, got {self.duration}"
        raise ValueError(msg)

    # Check frame_duration is valid
    if self.frame_duration < 0:
        msg = f"frame_duration must be non-negative, got {self.frame_duration}"
        raise ValueError(msg)
    if self.frame_duration > 0 and self.frame_duration % self.duration != 0:
        msg = (
            f"frame_duration ({self.frame_duration}) must be an integer "
            f"multiple of duration ({self.duration})"
        )
        raise ValueError(msg)

    # Check path contains parameters for duration and gps_start_time
    for param in FILENAME_PARAMS:
        if f"{{{param}}}" not in self.path:
            msg = f"Path must contain parameter {{{param}}}"
            raise ValueError(msg)