Source code for gwdetchar.lasso.core

# coding=utf-8
# Copyright (C) Duncan Macleod (2018)
#
# This file is part of the GW DetChar python package.
#
# GW DetChar is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GW DetChar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GW DetChar.  If not, see <http://www.gnu.org/licenses/>.

"""Lasso regression utilities
"""

from math import log

import numpy
from scipy.interpolate import UnivariateSpline

from sklearn.linear_model import Lasso

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
__credits__ = 'Alex Macedo, Jeff Bidler, Oli Patane, Marissa Walker, ' \
              'Alex Urban, Josh Smith'


# -- utilities ----------------------------------------------------------------

[docs] def find_outliers(ts, N=5, method='s'): """Find outliers within a `TimeSeries` Parameters ---------- ts : `~gwpy.timeseries.TimeSeries` data to find outliers within N : `float`, optional if `method='s'`: number of standard deviations to consider an outlier if `method='pf'`: percentile range limit to consider an outlier default for both methods: 5 method : `str`, optional outlier identification method to be used, must be `'s'` (standard deviation method) or `'pf'` (percentil range method) default: `'s'` Returns ------- out : `ndarray` array indices of the input where outliers occur """ if method == 'pf': ts = ts.value # strip out Quantity extras quantile = numpy.quantile(ts, N) outliers = [] for i, x in enumerate(ts): if x < quantile: outliers.append(i) return numpy.array(outliers) else: ts = ts.value # strip out Quantity extras return numpy.nonzero(abs(ts - ts.mean()) > N*ts.std())[0]
[docs] def remove_outliers(ts, N=5, method='s'): """Find and remove outliers within a `TimeSeries` Parameters ---------- ts : `~gwpy.timeseries.TimeSeries` data to find outliers within N : `float`, optional if `method='s'`: number of standard deviations to consider an outlier if `method='pf'`: percentile range limit to consider an outlier default for both methods: 5 method : `str`, optional outlier identification method to be used, must be `'s'` (standard deviation method) or `'pf'` (percentil range method) default: `'s'` Notes ----- This action is done in-place, with no `return` statement. """ if method == 'pf': outliers = find_outliers(ts, N=N, method='pf') print("There are %d outliers in this data" % len(outliers)) unit = ts.unit mask = numpy.ones(ts.size, dtype=bool) mask[outliers] = False spline = UnivariateSpline(ts.times.value[mask], ts.value[mask], s=0, k=3) ts[outliers] = spline(ts.times.value[outliers]) * unit if outliers[-1] == (len(ts) - 1): ts = ts[:-1] if outliers[0] == 0: ts = ts[1:] print('Outlier removal complete') else: outliers = find_outliers(ts, N=N, method='s') c = 1 while outliers.any(): print("-- Pass %d: removing %d outliers in %s" % (c, outliers.size, ts.name)) unit = ts.unit cache = outliers mask = numpy.ones(ts.size, dtype=bool) mask[outliers] = False spline = UnivariateSpline(ts.times.value[mask], ts.value[mask], s=0, k=3) ts[outliers] = spline(ts.times.value[outliers]) * unit outliers = find_outliers(ts, N=N, method='s') print(" Completed %d removal passes" % c) if numpy.array_equal(outliers, cache): print(" Outliers did not change, breaking recursion") break print(" %d outliers remain" % len(outliers)) c += 1
[docs] def fit(data, target, alpha=None): """Fit some data to the target using a Lasso model Parameters ---------- data : `numpy.ndarray` the data target : `numpy.ndarray` the target data alpha : `float` the Lasso alpha parameter, if `None` one will be determined using :func:`find_alpha` Returns ------- model : `~sklearn.linear_model.Lasso` the fitted model """ if alpha is None: alpha = find_alpha(data, target) model = Lasso(alpha) return model.fit(data, target)
[docs] def find_alpha(data, target): """Find the best alpha value to use for the given data """ # build list of alphas num = 100 alphas = numpy.logspace(-1, 0, num, endpoint=True) nchans = numpy.zeros(num) coef_path = numpy.zeros((data.shape[1], num)) # fit each alpha for i, alpha in enumerate(alphas): model = fit(data, target, alpha=alpha) nchans[i] = numpy.nonzero(model.coef_)[0].size coef_path[:, i] = model.coef_ # prune zeros nonzero = nchans.nonzero()[0] alphas = alphas[nonzero] nchans = nchans[nonzero] coef_path = coef_path.take(nonzero, axis=1) # determine best alpha nsamps = data.shape[0] mean_squared_error = numpy.mean( (target[:, numpy.newaxis] - numpy.dot(data, coef_path)) ** 2, axis=0) sigma2 = numpy.var(target) eps64 = numpy.finfo('float64').eps criterion = (nsamps * mean_squared_error / (sigma2 + eps64) + log(nsamps) * nchans) # Eqns. 2.15--16 in (Zou et al, 2007) n_best = numpy.argmin(criterion) return alphas[n_best]
[docs] def remove_flat(tsdict): """Remove flat timeseries from a `TimeSeriesDict` """ outdict = tsdict.copy() for key in tsdict.keys(): series = tsdict[key].value if series.min() == series.max(): outdict.pop(key) return outdict
[docs] def remove_bad(tsdict): """Remove data that cannot be scaled from a `TimeSeriesDict` """ outdict = tsdict.copy() for key in tsdict.keys(): series = tsdict[key].value if numpy.isnan(series).any() or numpy.isinf(series).any(): outdict.pop(key) return outdict