Skip to content

framewatch

A source element to watch directories for new frame files in real-time.

FrameWatchSource dataclass

FrameWatchSource(*, channels, watch_dir, discont_wait_time=60, queue_timeout=1, watch_suffix='.gwf')

Bases: TSResourceSource

Source element to watch a directory for new frame files in real-time.

Monitors a directory for new frame files, automatically reading and sending data as files arrive. Handles discontinuities with gap detection, validates data integrity, and supports both live streaming (start=None) and replay from a specific GPS time.

For multi-IFO coherent analysis, use multiple FrameWatchSource elements (one per IFO) and let the pipeline handle synchronization.

Parameters:

Name Type Description Default
channels list[str]

list[str], channel names to read from frame files. Source pads will be automatically generated for each channel, with channel name as pad name.

required
watch_dir str

str, directory path to watch for new frame files.

required
discont_wait_time float

float, time to wait before sending a gap buffer when no data arrives, in seconds. Default: 60

60
queue_timeout float

float, time to wait for next file from the queue, in seconds. Default: 1

1
watch_suffix str

str, filename suffix to watch for. Default: ".gwf"

'.gwf'
Example

Basic usage::

from sgn_gwframe.sources import FrameWatchSource
from sgn.apps import Pipeline

src = FrameWatchSource(
    name="L1_data",
    channels=["L1:GDS-CALIB_STRAIN"],
    watch_dir="/data/frames/L1",
    duration=1,  # 1 second buffers
)

pipeline = Pipeline()
pipeline.insert(src, ...)
pipeline.run()
Note

Requires frame files following T050017 naming convention: {site}-{description}-{gpstime}-{duration}.gwf

static_source_pads property

static_source_pads

Define source pads from channels.

worker_process

worker_process(context, channels, watch_dir, srcs, watch_suffix, queue_timeout, discont_wait_time, start, logger)

Worker process that watches directory and reads in frame data.

Source code in src/sgn_gwframe/sources/framewatch.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def worker_process(  # type: ignore[override]
    self,
    context: WorkerContext,
    channels: list[str],
    watch_dir: str,
    srcs: dict[str, SourcePad],
    watch_suffix: str,
    queue_timeout: float,
    discont_wait_time: float,
    start: int | None,
    logger: logging.Logger,
) -> None:
    """Worker process that watches directory and reads in frame data."""
    # Initialize on first call
    if "initialized" not in context.state:
        # Get starting time
        if start is None:
            start_time = int(gpstime.gpsnow())
            logger.debug(
                "No start time specified, using current GPS time: %d", start_time
            )
        else:
            start_time = start
            logger.debug("Starting at GPS time: %d", start_time)

        # Setup watchdog observer and queue
        file_queue: queue.Queue[tuple[str, int]] = queue.Queue()
        event_handler = _FrameFileEventHandler(file_queue, watch_suffix, start_time)
        observer = Observer()
        observer.schedule(event_handler, path=watch_dir)
        observer.daemon = True
        observer.start()

        # Store state
        context.state["initialized"] = True
        context.state["file_queue"] = file_queue
        context.state["observer"] = observer
        context.state["next_buffer_time"] = start_time
        context.state["last_file_time"] = None
        context.state["rates"] = {}
        context.state["buffer_duration"] = None
        context.state["pending_frame"] = None
        context.state["pending_file_time"] = None

    # Check if we should stop
    if context.should_stop():
        observer = context.state.get("observer")  # type: ignore[assignment]
        if observer is not None:
            observer.stop()
            observer.join()
        return

    # Get state
    file_queue = context.state["file_queue"]
    next_buffer_time = context.state["next_buffer_time"]
    last_file_time = context.state["last_file_time"]
    rates = context.state["rates"]
    buffer_duration = context.state["buffer_duration"]
    pending_frame = context.state["pending_frame"]
    pending_file_time = context.state["pending_file_time"]

    # Determine what action to take based on current state
    action: WorkerAction | None = None
    frame_data = None
    gap_start_time = None
    gap_end_time = None
    file_time = None

    # Case 1: Handle pending discontinuity
    if pending_frame is not None:
        gap_start_time = next_buffer_time
        gap_end_time = gap_start_time + buffer_duration
        action = WorkerAction.GAP

        # Check if gap is filled - then also send the pending data
        if gap_end_time >= pending_file_time:
            frame_data = pending_frame

    # Case 2: Try to get a file from the queue
    else:
        try:
            filepath, file_time = _drain_and_get_file(
                file_queue, queue_timeout, next_buffer_time, last_file_time, logger
            )
        except queue.Empty:
            # Timeout case - need rates to send heartbeat or gap
            if not rates or buffer_duration is None:
                # Try to initialize rates from a sample file in the directory
                result = _initialize_rates_from_sample(
                    watch_dir, watch_suffix, channels, logger
                )
                if result is not None:
                    rates, buffer_duration = result
                    context.state["rates"] = rates
                    context.state["buffer_duration"] = buffer_duration
                else:
                    # Can't send anything without knowing rates
                    return

            # Determine if we should send gap or heartbeat
            time_waiting = gpstime.gpsnow() - next_buffer_time
            if time_waiting >= discont_wait_time:
                logger.warning(
                    "No data received for %.1f seconds (>= %.1f second timeout), "
                    "sending gap buffer",
                    time_waiting,
                    discont_wait_time,
                )
                gap_start_time = next_buffer_time
                gap_end_time = gap_start_time + buffer_duration
                action = WorkerAction.GAP
            else:
                action = WorkerAction.HEARTBEAT
        else:
            # Successfully got a file from queue - try to read it
            try:
                frame_data = _read_and_validate_frame(
                    filepath,
                    channels,
                    rates,
                    buffer_duration,
                    file_time,
                )
            except (FileNotFoundError, OSError, RuntimeError, ValueError):
                logger.exception("Failed to read or validate file %s", filepath)
                # Send gap if we know duration, otherwise heartbeat
                if buffer_duration is not None:
                    gap_start_time = next_buffer_time
                    gap_end_time = gap_start_time + buffer_duration
                    action = WorkerAction.GAP
                else:
                    action = WorkerAction.HEARTBEAT
            else:
                # Successfully read frame - process it
                # Initialize rates if this is the first file
                if not rates:
                    rates, buffer_duration = _initialize_rates_and_duration(
                        frame_data, channels
                    )
                    context.state["rates"] = rates
                    context.state["buffer_duration"] = buffer_duration

                # Check for discontinuity
                if last_file_time is not None and file_time > next_buffer_time:
                    logger.warning(
                        "Discontinuity detected: expected time %d, got %d "
                        "(gap of %.1f seconds)",
                        next_buffer_time,
                        file_time,
                        file_time - next_buffer_time,
                    )
                    # Store frame and send gap next iteration
                    context.state["pending_frame"] = frame_data
                    context.state["pending_file_time"] = file_time
                    gap_start_time = next_buffer_time
                    gap_end_time = gap_start_time + buffer_duration
                    action = WorkerAction.GAP
                    frame_data = None  # Don't send data yet
                else:
                    # Continuous data
                    action = WorkerAction.DATA

    # Execute action based on determined state
    # Initialize rates if needed for GAP or HEARTBEAT actions
    if action in (WorkerAction.GAP, WorkerAction.HEARTBEAT) and (
        not rates or buffer_duration is None
    ):
        result = _initialize_rates_from_sample(
            watch_dir, watch_suffix, channels, logger
        )
        if result is not None:
            rates, buffer_duration = result
            context.state["rates"] = rates
            context.state["buffer_duration"] = buffer_duration
        else:
            msg = (
                f"Cannot initialize FrameWatchSource: sample rates for "
                f"requested channels unknown and no existing files exist "
                f"in {watch_dir}"
            )
            raise RuntimeError(msg)

    if action == WorkerAction.DATA:
        assert frame_data is not None
        assert file_time is not None
        _send_data_buffers(context, frame_data, srcs)
        logger.debug(
            "Sent frame at time %d, delay=%.3f seconds",
            file_time,
            gpstime.gpsnow() - file_time,
        )
        context.state["next_buffer_time"] = int(file_time + buffer_duration)
        context.state["last_file_time"] = file_time

    elif action == WorkerAction.GAP:
        assert gap_start_time is not None
        assert gap_end_time is not None
        _send_gap_buffers(
            context, channels, srcs, rates, gap_start_time, gap_end_time
        )
        context.state["next_buffer_time"] = gap_end_time

        # If we also have pending data that's now ready, send it
        if frame_data is not None:
            _send_data_buffers(context, frame_data, srcs)
            context.state["next_buffer_time"] = int(
                pending_file_time + buffer_duration
            )
            context.state["last_file_time"] = pending_file_time
            context.state["pending_frame"] = None
            context.state["pending_file_time"] = None

    elif action == WorkerAction.HEARTBEAT:
        _send_heartbeat_buffers(context, channels, srcs, rates, next_buffer_time)

WorkerAction

Bases: Enum

Actions that the worker can take in a single iteration.