Source code for gwdetchar.io.ligolw
# coding=utf-8
# Copyright (C) Duncan Macleod (2015)
#
# This file is part of the GW DetChar python package.
#
# GW DetChar is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GW DetChar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GW DetChar. If not, see <http://www.gnu.org/licenses/>.
"""Utilties for LIGO_LW XML I/O
"""
from ligo.lw import (
ligolw,
table,
)
try:
from ligo.lw import lsctables
except ModuleNotFoundError as exc:
exc.msg = (
f"{exc.msg}, please install python-lal / python3-lal / lalsuite "
"to handle LIGO_LW files"
)
exc.args = (exc.msg,)
raise
from gwpy.segments import (Segment, DataQualityFlag, DataQualityDict)
__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
[docs]
def new_table(tab, *args, **kwargs):
"""Create a new `~ligo.lw.table.Table`
This is just a convenience wrapper around `~ligo.lw.lsctables.New`
Parameters
----------
tab : `type`, `str`
`~ligo.lw.table.Table` subclass, or name of table to create
*args, **kwargs
other parameters are passed directly to `~ligo.lw.lsctables.New`
Returns
-------
table : `~ligo.lw.table.Table`
a newly-created table with the relevant attributes and structure
"""
if isinstance(tab, str):
tab = lsctables.TableByName[table.Table.TableName(tab)]
return lsctables.New(tab, *args, **kwargs)
[docs]
def sngl_burst_from_times(times, **params):
"""Create a `SnglBurstTable` from an array of times
"""
columns = set(params.keys()) | {'peak_time', 'peak_time_ns', 'event_id'}
table = new_table('sngl_burst', columns=list(columns))
get_next_id = table.get_next_id
RowType = table.RowType
append = table.append
for t in times:
row = RowType()
row.event_id = get_next_id()
row.peak = t
for key, val in params.items():
setattr(row, key, val)
append(row)
return table
[docs]
def sngl_burst_from_segments(segs, **params):
"""Create a `SnglBurstTable from a `~ligo.segments.segmentlist`
"""
table = new_table('sngl_burst', columns=params.keys())
get_next_id = table.get_next_id
RowType = table.RowType
append = table.append
for seg in segs:
row = RowType()
row.event_id = get_next_id()
row.peak = seg[0] + abs(seg) / 2.
row.period = seg
for key, val in params.items():
setattr(row, key, val)
append(row)
return table
[docs]
def segments_from_sngl_burst(table, padding, known=None):
"""Create a `DataQualityDict` of segments from the given `SnglBurstTable`
This method creates a `~gwpy.segments.DataQualityFlag` for each unique
channel found in the given table by padding the peak time by the given
amount on each side
"""
out = DataQualityDict()
for row in table:
t = row.peak
seg = Segment(t-padding, t+padding)
try:
out[row.channel].active.append(seg)
except KeyError:
out[row.channel] = DataQualityFlag(row.channel, known=known,
active=[seg])
return out
[docs]
def table_to_document(table):
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
xmldoc.childNodes[0].appendChild(table)
return xmldoc