Source code for gwdetchar.conlog

# coding=utf-8
# Copyright (C) LIGO Scientific Collaboration (2019)
#
# This file is part of the GW DetChar python package.
#
# GW DetChar is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GW DetChar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GW DetChar.  If not, see <http://www.gnu.org/licenses/>.

"""Check whether state records have changed between two reference times
"""

import gwdatafind
import numpy
import os
import re
import sys

from gwpy.io import gwf as io_gwf
from gwpy.table import EventTable

from . import cli
from .io.datafind import get_data

__author__ = 'Alex Urban <alexander.urban@ligo.org>'
__credits__ = ('Andrew Lundgren <andrew.lundgren@ligo.org>, '
               'Joshua Smith <joshua.smith@ligo.org>, '
               'Duncan Macleod <duncan.macleod@ligo.org>')

# set up logger
PROG = ('python -m gwdetchar.conlog' if sys.argv[0].endswith('.py')
        else os.path.basename(sys.argv[0]))
LOGGER = cli.logger(name=PROG.split('python -m ').pop())


# -- utilities ----------------------------------------------------------------

[docs] def _discover_data_source(obs, frametype, start, end, preview): """Determine filepaths to local gravitational-wave frame files """ # get paths to frame files cache1 = gwdatafind.find_urls( obs, frametype, start - preview, start, ) cache2 = gwdatafind.find_urls( obs, frametype, end, end + 1, ) return (cache1, cache2)
[docs] def _discover_data(channels, caches, start, end, preview, nproc=1): """Load timeseries data from local gravitational-wave frame files """ (cache1, cache2) = caches # get data from frames data1 = get_data( channels, start=start - preview, end=start, source=cache1, nproc=nproc, verbose='Reading initial data:'.rjust(30), ) data2 = get_data( channels, start=end, end=end + 1, source=cache2, nproc=nproc, verbose='Reading final data:'.rjust(30), ) return (data1, data2)
[docs] def _get_available_channels(caches): """Determine available channels from local gravitational-wave frame files """ (cache1, cache2) = caches try: available = ( set(io_gwf.iter_channel_names(cache1[-1])) & set(io_gwf.iter_channel_names(cache2[0])) ) except (IndexError, TypeError): raise RuntimeError("Could not find data in the time range requested") return available
# -- parse command-line -------------------------------------------------------
[docs] def create_parser(): """Create a command-line parser for this entry point """ # initialize argument parser parser = cli.create_parser( prog=PROG, description=__doc__, ) # set arguments from the cli module cli.add_gps_start_stop_arguments(parser) cli.add_ifo_option(parser) cli.add_frametype_option(parser, help='the frametype name, defaults to second ' 'trends for the selected interferometer') cli.add_nproc_option(parser) # other optional arguments parser.add_argument( '-o', '--output', default='changes.csv', help='Path to output data file, default: %(default)s', ) parser.add_argument( '-c', '--channels', default=None, required=False, help='file containing columnar list of channels to ' 'process, default is to find all relevant channels ' 'from frames', ) parser.add_argument( '-s', '--search', nargs='*', default=[], help='process channels matching these regex patterns, ' 'can be given multiple times, default is to analyze ' 'all relevant channels from frames', ) parser.add_argument( '-p', '--preview', default=10, type=int, help='Time (seconds) over which to test that channel is ' 'normally kept constant, default: %(default)s', ) # return the argument parser return parser
# -- main code block ----------------------------------------------------------
[docs] def main(args=None): """Run the Conlog command-line tool """ parser = create_parser() args = parser.parse_args(args=args) # get IFO and frametype ifo = args.ifo.upper() obs = ifo[0] frametype = args.frametype or '{}_T'.format(ifo) preview_time = max(1, args.preview) # get paths to frame files (cache1, cache2) = _discover_data_source( obs, frametype, args.gpsstart, args.gpsend, preview_time, ) # get list of channels to analyze LOGGER.info('Determining channels to analyze') available = _get_available_channels((cache1, cache2)) if args.channels: reqchannels = set(numpy.loadtxt(args.channels, dtype=str, ndmin=1)) channels = list(reqchannels & available) else: channels = [x for x in available if x.endswith('.mean')] if args.search: # if requested, get channels matching regex patterns re_requested = re.compile('({})'.format('|'.join(args.search))) channels = [x for x in channels if re_requested.search(x)] LOGGER.debug('Found {} channels in frames'.format(len(channels))) # get data from frames (data1, data2) = _discover_data( channels, (cache1, cache2), args.gpsstart, args.gpsend, preview_time, nproc=args.nproc, ) # initialize columns LOGGER.debug('Analyzing {} channels'.format(len(data1))) changes = [] value1 = [] value2 = [] diff = [] # identify channels for channel in data1: xoft1 = data1[channel].value xoft2 = data2[channel].value if numpy.any(numpy.diff(xoft1) != 0): continue if xoft1[-1] == xoft2[0]: continue changes.append(channel) value1.append(xoft1[-1]) value2.append(xoft2[0]) diff.append(xoft2[0] - xoft1[-1]) # record output LOGGER.debug('Analysis complete') table = EventTable([changes, value1, value2, diff], names=('channel', 'initial_value', 'final_value', 'difference')) # log output LOGGER.info('The following {0} channels record a state ' 'change between {1} and {2}:\n\n'.format( len(changes), args.gpsstart, args.gpsend, )) print(table) print('\n\n') # save output table.write(args.output, overwrite=True) LOGGER.info('Output written to {}'.format(args.output))
# -- run code ----------------------------------------------------------------- if __name__ == "__main__": main()