Skip to content

frame

Read timeseries data from gravitational wave frame files.

CacheEntry dataclass

CacheEntry(observatory, description, gps_start, duration, path)

Simple cache entry parser for frame cache files.

Cache file format: observatory description gps_start duration path Example: L L1_GWOSC_16KHZ_R1 1240215487 32 ./path/to/file.gwf

segment property

segment

Return the time segment for this cache entry.

from_line classmethod

from_line(line)

Parse a cache file line into a CacheEntry.

Source code in src/sgn_gwframe/sources/frame.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@classmethod
def from_line(cls, line: str) -> CacheEntry:
    """Parse a cache file line into a CacheEntry."""
    parts = line.strip().split()
    if len(parts) != 5:
        msg = f"Invalid cache line format: {line}"
        raise ValueError(msg)
    return cls(
        observatory=parts[0],
        description=parts[1],
        gps_start=float(parts[2]),
        duration=float(parts[3]),
        path=parts[4],
    )

FrameSource dataclass

FrameSource(*, channels, frames, instrument=None)

Bases: TSSource

Read timeseries data from gravitational wave frame files

Parameters:

Name Type Description Default
channels list[str]

list[str], a list of channel names of the data, e.g., ["L1:GWOSC-16KHZ_R1_STRAIN", "L1:GWOSC-16KHZ_R1_DQMASK"]. Source pads will be automatically generated for each channel, with channel name as pad name.

required
frames str | list[str]

str | list[str], one of: - Path to a .cache file containing frame file locations - Path to a single .gwf frame file - List of paths to .gwf frame files

required
instrument Optional[str]

str, optional, only read gwf files from this instrument (e.g., "H1", "L1"). Default: None

None

static_source_pads property

static_source_pads

Define source pads from channels.

internal

internal()

Check if we need to read the next gw frame file in the cache. All channels are read at once.

Source code in src/sgn_gwframe/sources/frame.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def internal(self) -> None:
    """Check if we need to read the next gw frame file in the cache. All channels
    are read at once.
    """

    # load next frame of data from disk when we have less than
    # one buffer length of data left
    read_new = False
    for channel, adapter in self.A.items():
        if adapter.size < self.num_samples(self.rates[channel]):
            read_new = True
            break

    if read_new and self.cache:
        # Read multiple channels at once
        self.load_gwf_data(self.cache[0])

        # now that we have loaded data from this frame,
        # remove it from the cache
        self.cache.pop(0)

load_gwf_data

load_gwf_data(frame_file)

Load timeseries data from a gwf frame file.

Parameters:

Name Type Description Default
frame_file CacheEntry

CacheEntry, the gwf frame file to read timeseries data from

required

Returns:

Type Description
None

dict[str, np.ndarray], a dictionary with channel names as keys and

None

numpy arrays of timeseries data as values

Source code in src/sgn_gwframe/sources/frame.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def load_gwf_data(self, frame_file: CacheEntry) -> None:
    """Load timeseries data from a gwf frame file.

    Args:
        frame_file:
            CacheEntry, the gwf frame file to read timeseries data from

    Returns:
        dict[str, np.ndarray], a dictionary with channel names as keys and
        numpy arrays of timeseries data as values
    """

    # get first cache entry
    segment = frame_file.segment

    intersection = self.analysis_seg & segment
    start = intersection[0]
    end = intersection[1]

    data_dict = gwframe.read(
        frame_file.path, channel=self.channels, start=start, end=end
    )

    if len(self.rates) == 0:
        for channel, data in data_dict.items():
            self.rates[channel] = int(data.sample_rate)

    for channel, data in data_dict.items():
        if self.last_epoch < start:
            self.logger.warning(
                "Unexpected epoch: %f, expected: %f, sending gap buffer",
                start,
                self.last_epoch,
            )
            self.A[channel].push(
                SeriesBuffer(
                    offset=Offset.fromsec(self.last_epoch),
                    sample_rate=self.rates[channel],
                    data=None,
                    shape=(int((start - self.last_epoch) * self.rates[channel]),),
                )
            )
        elif self.last_epoch > start:
            msg = (
                f"Unepected epoch: {start}, expected: {self.last_epoch}, sending "
                "gap buffer"
            )
            raise ValueError(msg)
        self.A[channel].push(
            SeriesBuffer(
                offset=Offset.fromsec(float(start)),
                sample_rate=self.rates[channel],
                data=data.array,
            )
        )

    self.last_epoch = end

new

new(pad)

New frames are created on "pad" with an instance specific count and a name derived from the channel name. "EOS" is set once we have procssed all data in the cache within the analysis segment.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new TSFrame

required

Returns:

Type Description
TSFrame

TSFrame, the TSFrame that carries a list of SeriesBuffers

Source code in src/sgn_gwframe/sources/frame.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def new(self, pad: SourcePad) -> TSFrame:
    """New frames are created on "pad" with an instance specific count and a name
    derived from the channel name. "EOS" is set once we have procssed all data in
    the cache within the analysis segment.

    Args:
        pad:
            SourcePad, the pad for which to produce a new TSFrame

    Returns:
        TSFrame, the TSFrame that carries a list of SeriesBuffers
    """

    self.cnt[pad] += 1

    channel = self.rsrcs[pad]

    metadata = {"cnt": self.cnt[pad], "name": "'%s'" % pad.name}

    frame = self.prepare_frame(pad, metadata=metadata)

    if self.A[channel].end_offset >= frame.end_offset:
        bufs = self.A[channel].get_sliced_buffers((frame.offset, frame.end_offset))

        frame.set_buffers(list(bufs))

        self.A[channel].flush_samples_by_end_offset(frame.end_offset)

    return frame