Source code for gwdetchar.omega.core

# coding=utf-8
# Copyright (C) Alex Urban (2019)
#
# This file is part of the GW DetChar python package.
#
# GW DetChar is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GW DetChar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GW DetChar.  If not, see <http://www.gnu.org/licenses/>.

"""Core utilities for implementing omega scans
"""

from scipy.signal import butter

from gwpy.segments import Segment
from gwpy.signal.qtransform import q_scan

__author__ = 'Alex Urban <alexander.urban@ligo.org>'
__credits__ = 'Duncan Macleod <duncan.macleod@ligo.org>'


# -- basic utilities ----------------------------------------------------------

[docs] def highpass(series, f_low, order=12, analog=False, ftype='sos'): """High-pass a `TimeSeries` with a Butterworth filter Parameters ---------- series : `~gwpy.timeseries.TimeSeries` the `TimeSeries` data to high-pass filter f_low : `float` lower cutoff frequency (Hz) of the filter order : `int`, optional number of taps in the filter, default: 12 analog : `bool`, optional when True, return an analog filter, otherwise a digital filter is returned, default: False ftype : `str`, optional type of filter: numerator/denominator (`'ba'`), pole-zero (`'zpk'`), or second-order sections (`'sos'`), default: `'sos'` Returns ------- hpseries : `~gwpy.timeseries.TimeSeries` the high-passed `TimeSeries` Notes ----- This utility designs a Butterworth filter of order `order` with corner frequency `f_low / 1.5`, then applies this filter to the input. See Also -------- scipy.signal.butter gwpy.timeseries.TimeSeries.filter """ corner = f_low / 1.5 fs = series.sample_rate.to('Hz').value hpfilt = butter(order, corner, btype='highpass', analog=analog, output=ftype, fs=fs) hpseries = series.filter(hpfilt, filtfilt=True) return hpseries
[docs] def whiten(series, fftlength, overlap=None, method='median', window='hann', detrend='linear'): """Whiten a `TimeSeries` against its own ASD Parameters ---------- series : `~gwpy.timeseries.TimeSeries` the `TimeSeries` data to whiten fftlength : `float` FFT integration length (in seconds) for ASD estimation overlap : `float`, optional seconds of overlap between FFTs, defaults to half the FFT length method : `str`, optional FFT-averaging method, default: ``'median'``, window : `str`, `numpy.ndarray`, optional window function to apply to timeseries prior to FFT, default: ``'hann'`` see :func:`scipy.signal.get_window` for details on acceptable formats detrend : `str`, optional type of detrending to do before FFT, default: ``'linear'`` Returns ------- wseries : `~gwpy.timeseries.TimeSeries` a whitened version of the input data with zero mean and unit variance See Also -------- gwpy.timeseries.TimeSeries.whiten """ # get overlap window and whiten if overlap is None: overlap = fftlength / 2 return series.whiten(fftlength=fftlength, overlap=overlap, window=window, detrend=detrend, method=method).detrend(detrend)
# -- omega scans --------------------------------------------------------------
[docs] def conditioner(xoft, fftlength, overlap=None, resample=None, f_low=None, **kwargs): """Condition some input data for an omega scan Parameters ---------- xoft : `~gwpy.timeseries.TimeSeries` the `TimeSeries` data to whiten fftlength : `float` FFT integration length (in seconds) for ASD estimation overlap : `float`, optional seconds of overlap between FFTs, defaults to half the FFT length resample : `int`, optional desired sampling rate (Hz) of the output if different from the input, default: no resampling f_low : `float`, optional lower cutoff frequency (Hz) of the filter, default: ``None`` **kwargs : `dict`, optional additional arguments to :func:`highpass` Returns ------- wxoft : `~gwpy.timeseries.TimeSeries` a whitened version of the input data with zero mean and unit variance hpxoft : `~gwpy.timeseries.TimeSeries` high-passed version of the input data (returned only if `f_low` is not ``None``) xoft : `~gwpy.timeseries.TimeSeries` original (possibly resampled) version of the input data """ if resample: xoft = xoft.resample(resample) # get whitened and high-passed data streams if f_low is None: wxoft = whiten(xoft, fftlength, overlap=overlap) return (wxoft, xoft) else: hpxoft = highpass(xoft, f_low, **kwargs) wxoft = whiten(hpxoft, fftlength, overlap=overlap) return (wxoft, hpxoft, xoft)
[docs] def primary(gps, length, hoft, fftlength, resample=None, f_low=None, **kwargs): """Condition the primary channel for use as a matched-filter Parameters ---------- gps : `float` GPS time (seconds) of suspected transient length : `float` length (seconds) of the desired matched-filter hoft : `~gwpy.timeseries.TimeSeries` the `TimeSeries` data to whiten fftlength : `float` FFT integration length (in seconds) for ASD estimation resample : `int`, optional desired sampling rate (Hz) of the output if different from the input, default: no resampling f_low : `float`, optional lower cutoff frequency (Hz) of the filter, default: `None` **kwargs : `dict` additional keyword arguments to `omega.conditioner` Returns ------- out : `~gwpy.timeseries.TimeSeries` the conditioned data stream """ if f_low is None: out, _ = conditioner(hoft, fftlength, resample=resample) else: out, _, _ = conditioner( hoft, fftlength, resample=resample, f_low=f_low, **kwargs) return out.crop(gps - length/2, gps + length/2).taper()
[docs] def cross_correlate(xoft, hoft): """Cross-correlate two `TimeSeries` by matched-filter Parameters ---------- xoft : `~gwpy.timeseries.TimeSeries` the `TimeSeries` data to analyze hoft : `~gwpy.timeseries.TimeSeries` a `TimeSeries` data to use as a matched-filter Returns ------- out : `~gwpy.timeseries.TimeSeries` the output of a single phase matched-filter """ # make sure series have consistent sample rates if hoft.sample_rate.value < xoft.sample_rate.value: xoft = xoft.resample(hoft.sample_rate.value) elif hoft.sample_rate.value > xoft.sample_rate.value: hoft = hoft.resample(xoft.sample_rate.value) out = xoft.correlate(hoft, window='hann') return out
[docs] def scan(gps, channel, xoft, fftlength, resample=None, fthresh=1e-10, search=0.5, nt=1400, nf=700, logf=True, **kwargs): """Scan a channel for evidence of transients Parameters ---------- gps : `float` the GPS time (seconds) to scan channel : `OmegaChannel` `OmegaChannel` object corresponding to this data stream xoft : `~gwpy.timeseries.TimeSeries` the `TimeSeries` data to analyze fftlength : `float` FFT integration length (in seconds) for ASD estimation resample : `int`, optional desired sampling rate (Hz) of the output if different from the input, default: no resampling fthresh : `float`, optional threshold on false alarm rate (Hz) for this channel to be considered interesting, default: 1e-10 search : `float`, optional time window (seconds) around `gps` in which to find peak energies, default: 0.5 nt : `int`, optional number of points on the time axis of the interpolated `Spectrogram`, default: 1400 nf : `int`, optional number of points on the frequency axis of the interpolated `Spectrogram`, default: 700 logf : `bool`, optional boolean switch to enable (`True`) or disable (`False`) use of log-sampled frequencies in the output `Spectrogram`, default: `True` **kwargs : `dict`, optional additional arguments to `omega.conditioner` Returns ------- series : `tuple` an ordered collection of intermediate data products from this scan, including: the resampled `TimeSeries`, high-passed `TimeSeries`, whitened `TimeSeries`, whitened `QGram`, high-passed `QGram`, interpolated whitened `Spectrogram`, and interpolated high-passed `Spectrogram` """ # condition data wxoft, hpxoft, xoft = conditioner( xoft, fftlength, resample=resample, f_low=channel.frange[0], **kwargs) # compute whitened Q-gram search = Segment(gps - search/2, gps + search/2) qgram, far = q_scan( wxoft, mismatch=channel.mismatch, qrange=channel.qrange, frange=channel.frange, search=search) if (far >= fthresh) and (not channel.always_plot): return None # series is insignificant # compute raw Q-gram Q = qgram.plane.q rqgram, _ = q_scan( hpxoft, mismatch=channel.mismatch, qrange=(Q, Q), frange=qgram.plane.frange, search=search) # compute interpolated spectrograms tres = min(channel.pranges) / nt fres = nf if logf else ( channel.frange[1] - channel.frange[0]) / nf outseg = Segment( gps - max(channel.pranges)/2, gps + max(channel.pranges)/2, ) qspec = qgram.interpolate( tres=tres, fres=fres, logf=logf, outseg=outseg, ) rqspec = rqgram.interpolate( tres=tres, fres=fres, logf=logf, outseg=outseg, ) return (xoft, hpxoft, wxoft, qgram, rqgram, qspec, rqspec)