# coding=utf-8
# Copyright (C) Duncan Macleod (2015)
#
# This file is part of the GW DetChar python package.
#
# GW DetChar is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GW DetChar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GW DetChar. If not, see <http://www.gnu.org/licenses/>.
"""Utilies for analysing sensor saturations
"""
import re
from itertools import zip_longest
from multiprocessing import (cpu_count, Pool)
import numpy
from astropy.units import Quantity
from gwpy.io.cache import file_segment
from gwpy.timeseries import StateTimeSeries
from ..io.datafind import get_data
__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
__credits__ = 'Dan Hoak <daniel.hoak@ligo.org>' \
'Alex Urban <alexander.urban@ligo.org>'
DEFAULT_NPROC = min(8, cpu_count())
re_limit = re.compile(r'_LIMIT\Z')
re_limen = re.compile(r'_LIMEN\Z')
re_swstat = re.compile(r'_SWSTAT\Z')
re_software = re.compile(
'(%s)' % '|'.join([re_limit.pattern, re_limen.pattern, re_swstat.pattern]))
# -- utilities ----------------------------------------------------------------
[docs]
def _find_saturations(data):
"""Thin wrapper around find_saturations
"""
out = find_saturations(
data[0], data[1], precision=.99, segments=True)
out.name = out.name[:-7]
return out
[docs]
def find_saturations(timeseries, limit=2**16, precision=1, segments=False):
"""Find the times of software saturations of the given `TimeSeries`
Parameters
----------
timeseries : `~gwpy.timeseries.TimeSeries`
the input data to search
limit : `float`, `~gwpy.timeseries.TimeSeries`
the limit above which a saturation has occurred
precision : `float` in range (0, 1]
the precision of the check for saturation
segments : `bool`, default: `False`
return saturation segments, otherwise return times when
saturations occur
Returns
-------
times : `numpy.ndarray`
the array of times when this timeseries started saturating, OR
segments : `~gwpy.segments.DataQualityFlag`
the flag containing segments during which this timeseries
was actively saturating
"""
if isinstance(limit, Quantity):
limit = limit.value
# check saturated at minimum and maximum
limit = limit * precision
saturated = timeseries.value <= -limit
saturated |= timeseries.value >= limit
if segments:
saturation = saturated.view(StateTimeSeries)
saturation.__metadata_finalize__(timeseries)
flag = saturation.to_dqflag(
description="Software saturation indicated by " + timeseries.name,
)
flag.isgood = False
return flag
else:
return timeseries.times[1:].value[
numpy.diff(saturated.astype(int)) > 0]
[docs]
def grouper(iterable, n, fillvalue=None):
"""Separate an iterable into sub-sets of `n` elements
"""
args = [iter(iterable)] * n
return zip_longest(fillvalue=fillvalue, *args)
[docs]
def find_limit_channels(channels, skip=None):
"""Find all 'LIMIT' channels that have a matching 'LIMEN' or 'SWSTAT'
Parameters
----------
channels : `list` of `str`
the list of channel names to search
Returns
-------
limits : `list` or `str`
the list of channels whose name ends in '_LIMIT' for whom a matching
channel ending in '_LIMEN' or '_SWSTAT' was found
"""
# find relevant channels and sort them
if skip:
re_skip = re.compile('(%s)' % '|'.join(skip))
useful = sorted(x for x in channels if re_software.search(x) and
not re_skip.search(x))
else:
useful = sorted(x for x in channels if re_software.search(x))
# map limits to limen or swstat
limits = sorted(x[:-6] for x in useful if re_limit.search(x))
limens = sorted(x[:-6] for x in useful if re_limen.search(x)
and x[:-6] in limits)
swstats = sorted(x[:-7] for x in useful if re_swstat.search(x)
and x[:-7] in limits)
return (limens, swstats)
[docs]
def is_saturated(channel, cache, start=None, end=None, indicator='LIMEN',
nproc=DEFAULT_NPROC):
"""Check whether a channel has saturated its software limit
Parameters
----------
channel : `str`, or `list` of `str`
either a single channel name, or a list of channel names
cache : `list`
a `list` of file paths, the cache must be contiguous
start : `~gwpy.time.LIGOTimeGPS`, `int`
the GPS start time of the check
end : `~gwpy.time.LIGOTimeGPS`, `int`
the GPS end time of the check
indicator : `str`
the suffix of the indicator channel, either `'LIMEN'` or `'SWSTAT'`
nproc : `int`
the number of parallel processes to use for frame reading
Returns
-------
saturated : `bool`, `None`, or `DataQualityFlag`, or `list` of the same
one of the following given the conditions
- `None` : if the channel doesn't have a software limit
- `False` : if the channel didn't saturate
- `~gwpy.segments.DataQualityFlag` : otherwise
OR, a `list` of the above if a `list` of channels was given in the
first place
"""
if isinstance(channel, (list, tuple)):
channels = channel
else:
channels = [channel]
# parse prefix
for i, c in enumerate(channels):
if c.endswith('_LIMIT'):
channels[i] = c[:-6]
# check limit if set
indicators = ['{}_{}'.format(c, indicator) for c in channels]
gps = file_segment(cache[0])[0]
data = get_data(indicators, gps, gps+1, source=cache, nproc=nproc)
# check limits for returned channels
if len(data) < len(channels): # exclude nonexistent channels
channels = [
c for c in channels if '{}_{}'.format(c, indicator) in data]
indicators = ['{}_{}'.format(c, indicator) for c in channels]
if indicator.upper() == 'LIMEN':
active = dict((c, data[indicators[i]].value[0]) for
i, c in enumerate(channels))
elif indicator.upper() == 'SWSTAT':
active = dict(
(c, data[indicators[i]].astype('uint32').value[0] >> 13 & 1) for
i, c in enumerate(channels))
else:
raise ValueError("Don't know how to determine if limit is set for "
"indicator %r" % indicator)
# get output/limit data for all with active limits
activechans = [c for c in channels if active[c]]
datachans = ['%s_%s' % (c, s) for c in activechans for
s in ('LIMIT', 'OUTPUT')]
data = get_data(datachans, start, end, source=cache, nproc=nproc)
# find saturations of the limit for each channel
dataiter = ((data['%s_OUTPUT' % c], data['%s_LIMIT' % c])
for c in activechans)
if nproc > 1:
with Pool(processes=nproc) as pool:
saturations = list(pool.map(_find_saturations, dataiter))
else:
saturations = list(map(_find_saturations, dataiter))
# return many or one (based on input)
if isinstance(channel, (list, tuple)):
return saturations
else:
return saturations[0]