Source code for gwdetchar.mct

# coding=utf-8
# Copyright (C) LIGO Scientific Collaboration (2015-)
#
# This file is part of the GW DetChar python package.
#
# GW DetChar is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GW DetChar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GW DetChar.  If not, see <http://www.gnu.org/licenses/>.

"""Find times when an input signal crosses a particular threshold
"""

import gwdatafind
import os
import sys

from astropy.table import vstack as vstack_tables

from gwpy.io.cache import (cache_segments, sieve as sieve_cache)
from gwpy.segments import (DataQualityFlag, Segment, SegmentList)
from gwpy.table import EventTable

from . import (const, cli)
from .daq import find_crossings
from .io.datafind import get_data
from .utils import table_from_times

__author__ = 'TJ Massinger <thomas.massinger@ligo.org>'
__credits__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

# set up logger
PROG = ('python -m gwdetchar.mct' if sys.argv[0].endswith('.py')
        else os.path.basename(sys.argv[0]))
LOGGER = cli.logger(name=PROG.split('python -m ').pop())


# -- parse command-line -------------------------------------------------------

[docs] def create_parser(): """Create a command-line parser for this entry point """ # initialize argument parser parser = cli.create_parser( prog=PROG, description=__doc__, ) # positional arguments cli.add_gps_start_stop_arguments(parser) cli.add_ifo_option(parser) cli.add_nproc_option(parser) cli.add_frametype_option( parser, required=const.IFO is None, default=const.IFO is not None and '%s_R' % const.IFO, ) # optional arguments parser.add_argument( '-a', '--state-flag', metavar='FLAG', help='restrict search to times when FLAG was active', ) parser.add_argument( '-o', '--output-path', help='path to output HDF5 file, name will be ' 'automatically generated based on IFO and GPS times', ) parser.add_argument( '-t', '--threshold', type=float, nargs='+', default=[0., 2.**16, -2.**16], help='threshold for marking input data crossings', ) parser.add_argument( '-c', '--channel', required=True, type=str, help='channel to read for input data', ) parser.add_argument( '-r', '--rate-thresh', default=16., type=float, help='if the trigger rate (Hz) is above this value for a given ' 'segment, crossings for that segment will not be recorded', ) # return the argument parser return parser
# -- main code block ----------------------------------------------------------
[docs] def main(args=None): """Run the zero-crossing counter tool """ parser = create_parser() args = parser.parse_args(args=args) span = Segment(args.gpsstart, args.gpsend) LOGGER.info('-- Processing channel %s over span %d - %d' % (args.channel, args.gpsstart, args.gpsend)) if args.state_flag: state = DataQualityFlag.query( args.state_flag, int(args.gpsstart), int(args.gpsend), url=const.DEFAULT_SEGMENT_SERVER, ) statea = state.active else: statea = SegmentList([span]) duration = abs(span) # initialize output files for each threshold and store them in a dict outfiles = {} for thresh in args.threshold: outfiles[str(thresh)] = ( os.path.join( args.output_path, '%s_%s_DAC-%d-%d.h5' % (args.channel.replace('-', '_').replace(':', '-'), str(int(thresh)).replace('-', 'n'), int(args.gpsstart), duration))) # get frame cache cache = gwdatafind.find_urls(args.ifo[0], args.frametype, int(args.gpsstart), int(args.gpsend)) cachesegs = statea & cache_segments(cache) if not os.path.exists(args.output_path): os.makedirs(args.output_path) # initialize a ligolw table for each threshold and store them in a dict names = ("time", "frequency", "snr") dtypes = ("f8",) * len(names) tables = {} for thresh in args.threshold: tables[str(thresh)] = EventTable( names=names, dtype=dtypes, meta={"channel": args.channel}, ) # for each science segment, read in the data from frames, check for # threshold crossings, and if the rate of crossings is less than # rate_thresh, write to a sngl_burst table for seg in cachesegs: LOGGER.debug("Processing {}:".format(seg)) c = sieve_cache(cache, segment=seg) if not c: LOGGER.warning(" No {} data files for this segment, " "skipping".format(args.frametype)) continue data = get_data(args.channel, seg[0], seg[1], nproc=args.nproc, source=c, verbose="Reading data:".rjust(30)) for thresh in args.threshold: times = find_crossings(data, thresh) rate = float(times.size)/abs(seg) if times.size else 0 LOGGER.info(" Found {0} crossings of {1}, rate: {2} Hz".format( times.size, thresh, rate, )) if times.size and rate < args.rate_thresh: existing = tables[str(thresh)] tables[str(thresh)] = vstack_tables( (existing, table_from_times(times, snr=10., frequency=100., names=existing.colnames), ), join_type="exact", ) n = max(map(len, tables.values())) for thresh, outfile in outfiles.items(): tables[thresh].write( outfile, path="triggers", format="hdf5", overwrite=True, ) LOGGER.info("{0} events written to {1}".format( str(len(tables[thresh])).rjust(len(str(n))), outfile, ))
# -- run from command-line ---------------------------------------------------- if __name__ == "__main__": main()