# coding=utf-8
# Copyright (C) Alex Urban (2019)
#
# This file is part of the GW DetChar python package.
#
# GW DetChar is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GW DetChar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GW DetChar. If not, see <http://www.gnu.org/licenses/>.
"""Batch-mode utilities for omega scans
"""
import os
import subprocess
import sys
from pathlib import Path
from getpass import getuser
from pycondor import (Dagman, Job)
from .. import (cli, condor)
from ..cli import NOW_GPS
# authorship credits
__author__ = 'Alex Urban <alexander.urban@ligo.org>'
__credits__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
# set CLI docstring
CLI_DOCSTRING = """Batch-generate a series of Omega scans
GPS times can be given individually on the command-line, one after the other,
or can be bundled into one file formatted where the first column contains
the GPS times (other columns are ignored).
The output of this script is a condor workflow in the form of a DAG file,
with associated condor submit (`.sub`) file in the output directory.
Submitting the workflow to Condor will result in the scans being processed
in parallel.
"""
# set default accounting information
ACCOUNTING_GROUP = os.getenv(
'_CONDOR_ACCOUNTING_GROUP',
'ligo.dev.{epoch}.detchar.user_req.omegascan',
)
ACCOUNTING_GROUP_USER = os.getenv(
'_CONDOR_ACCOUNTING_USER',
getuser(),
)
# set program name
PROG = ('python -m gwdetchar.omega.batch' if sys.argv[0].endswith('.py')
else os.path.basename(sys.argv[0]))
# -- utilities ----------------------------------------------------------------
[docs]
def _monitor_dag_workflow(dagman):
"""Monitor a batch of Omega scans through the Condor pool
"""
print("Monitoring progress of {0.submit_file}".format(dagman))
try:
subprocess.check_call(
["pycondor", "monitor", dagman.submit_file],
)
except KeyboardInterrupt:
pass
[docs]
def _parse_analysis_times(tlist):
"""Parse an iterable of GPS time data for a batch of Omega scans
"""
if len(tlist) == 1:
try: # try converting to GPS
tlist = [float(tlist[0])]
except (TypeError, ValueError): # otherwise read as file
import numpy
tlist = numpy.loadtxt(tlist[0], dtype=float, ndmin=1)
else:
tlist = list(map(float, tlist))
return tlist
[docs]
def get_command_line_flags(ifo, fscale='log', colormap='viridis', nproc=8,
far=3.171e-8, config_file=None,
disable_correlation=False, disable_checkpoint=False,
ignore_state_flags=False):
"""Get a list of optional command-line arguments to `gwdetchar-omega`
"""
flags = [
"--ifo", ifo,
"--frequency-scaling", fscale,
"--colormap", colormap,
"--nproc", str(nproc),
"--far-threshold", str(far),
]
if config_file is not None:
flags.extend(("--config-file", os.path.abspath(config_file)))
if disable_correlation:
flags.append("--disable-correlation")
if disable_checkpoint:
flags.append("--disable-checkpoint")
if ignore_state_flags:
flags.append("--ignore-state-flags")
return flags
[docs]
def get_condor_arguments(accounting_group=ACCOUNTING_GROUP,
accounting_group_user=ACCOUNTING_GROUP_USER,
timeout=0, extra_commands=[], gps=NOW_GPS):
"""Get a list of arguments for Condor processing
"""
# get reference epoch
if '{epoch}' in accounting_group:
epoch = condor.accounting_epoch(gps)
accounting_group = accounting_group.format(epoch=epoch.lower())
# validate accounting tag
condor.is_valid(accounting_group)
# determine condor arguments
condorcmds = [
f"accounting_group = {accounting_group}",
f"accounting_group_user = {accounting_group_user}",
]
if timeout:
condorcmds.append("periodic_remove = {}".format(
'CurrentTime-EnteredCurrentStatus > {}'.format(
3600 * timeout),
))
condorcmds.extend(extra_commands)
return condorcmds
[docs]
def generate_dag(
times,
flags=[],
tag='gwdetchar-omega-batch',
submit=False,
outdir=os.getcwd(),
universe='vanilla',
request_cpus=1,
request_disk=1, # GB
request_memory=4, # GB
condor_commands=get_condor_arguments(),
):
"""Construct a Directed Acyclic Graph (DAG) for a batch of omega scans
Parameters
----------
times : `list` of `float`
list of GPS times to scan
flags : `list` of `str`, optional
a list of command-line flags to run for each job, defaults to an
empty list
tag : `str`, optional
a helpful string to use to name the DAG,
default: `'gwdetchar-omega-batch'`
submit : `bool`, optional
submit the DAG to condor, default: `False`
outdir : `str`, optional
the output directory in which to store files, will result in
sub-directories called `'condor'` and `'log'`, default: `os.getcwd`
universe : `str`, optional
condor universe to run in, default: `'vanilla'`
request_cpus : `int`, optional
number of CPUs to request for condor job
request_disk : `float`, optional
amount of disk (in gigabytes) to request for condor job
request_memory : `float`, optional
amount of memory (in gigabytes) to request for condor job
condor_commands : `list` of `str`, optional
list of condor settings to process with, defaults to the output of
`get_condor_arguments`
Returns
-------
dagman : `~pycondor.Dagman`
the fully built DAG object
"""
outdir = Path(outdir)
initialdir = outdir
logdir = outdir / "logs"
subdir = outdir / "condor"
# set condor file transfer options
if universe == "local":
condor_file_transfer = []
else:
outdir = Path(".")
input_files = []
for i, flag in enumerate(map(Path, flags)):
if flag.is_file():
input_files.append(str(flag))
flags[i] = flag.name
condor_file_transfer = [
"should_transfer_files = yes",
"transfer_executable = false",
f"transfer_input_files = {','.join(input_files)}",
"transfer_output_files = $(GPSTIME)",
"when_to_transfer_output = ON_SUCCESS",
"success_exit_code = 0",
]
# set environment
getenv = ", ".join([
"CONDA_EXE", # for the HTML conda package list
# for data discovery
"GWDATAFIND_SERVER",
"NDSSERVER",
])
# create DAG and jobs
dagman = Dagman(name=tag, submit=str(subdir))
job = Job(
dag=dagman,
name='gwdetchar-omega',
executable=sys.executable,
universe=universe,
submit=str(subdir),
log=str(logdir),
error=str(logdir),
output=str(logdir),
getenv=getenv,
request_cpus=request_cpus,
request_disk=f"{request_disk} GB",
request_memory=f"{request_memory} GB",
extra_lines=condor_commands + [
f"initialdir = {initialdir}",
] + condor_file_transfer,
)
# make a node in the workflow for each event time
for t in times:
cmd = " ".join([
"-m",
"gwdetchar.omega",
str(t),
"--output-directory",
str(outdir / str(t)),
] + flags)
# hack the command to insert another argument into the dagman file
cmd += f'" GPSTIME="{t}'
job.add_arg(cmd, name=str(t).replace(".", "_"))
# write and submit the DAG
dagman.build(fancyname=False)
print("Workflow generated for {} times".format(len(times)))
if submit:
dagman.submit_dag(
submit_options=f"-force -include_env {getenv.replace(' ', '')}")
print(
"Submitted to condor, check status via:\n\n"
f"$ condor_q {getuser()}"
)
else:
print(
"Submit to condor via:\n\n"
f"$ condor_submit_dag -include_env {getenv.replace(' ', '')} "
f"{dagman.submit_file}"
)
return dagman
# -- parse command-line -------------------------------------------------------
[docs]
def create_parser():
"""Create a command-line parser for this entry point
"""
# initialize argument parser
parser = cli.create_parser(
prog=PROG,
description=CLI_DOCSTRING,
)
pargs = parser.add_argument_group('Omega scan options')
cargs = parser.add_argument_group('Condor options')
# required arguments
parser.add_argument(
'gpstime',
nargs='+',
help='GPS time(s) to scan, or path to a file'
'containing a single column of such times',
)
cli.add_ifo_option(
parser,
)
parser.add_argument(
'-o',
'--output-dir',
default=os.getcwd(),
help='output directory for all scans, default: %(default)s',
)
# optional omega scan arguments
pargs.add_argument(
'-f',
'--config-file',
help='path to configuration file to use, default: '
'choose based on observatory, epoch, and pipeline',
)
pargs.add_argument(
'-d',
'--disable-correlation',
action='store_true',
default=False,
help='disable cross-correlation of aux '
'channels, default: False',
)
pargs.add_argument(
'-D',
'--disable-checkpoint',
action='store_true',
default=False,
help='disable checkpointing from previous '
'runs, default: False',
)
pargs.add_argument(
'-s',
'--ignore-state-flags',
action='store_true',
default=False,
help='ignore state flag definitions in '
'the configuration, default: False',
)
pargs.add_argument(
'-t',
'--far-threshold',
type=float,
default=3.171e-8,
help='white noise false alarm rate threshold (Hz) for '
'processing channels, default: %(default)s',
)
pargs.add_argument(
'-y',
'--frequency-scaling',
default='log',
help='scaling of all frequency axes, default: %(default)s',
)
pargs.add_argument(
'-c',
'--colormap',
default='viridis',
help='name of colormap to use, default: %(default)s',
)
cli.add_nproc_option(
pargs,
)
# optional condor arguments
cargs.add_argument(
'-u',
'--universe',
default='vanilla',
type=str,
help='universe for condor processing',
)
cargs.add_argument(
'--submit',
action='store_true',
default=False,
help='submit DAG directly to condor queue',
)
cargs.add_argument(
'--monitor',
action='store_true',
default=False,
help='monitor the DAG progress after submission; '
'only used with --submit',
)
cargs.add_argument(
'--condor-accounting-group',
default=ACCOUNTING_GROUP,
help='accounting_group for condor submission on the LIGO '
'Data Grid, include \'{epoch}\' (with curly brackets) '
'to auto-substitute the appropriate epoch based on '
'the GPS times',
)
cargs.add_argument(
'--condor-accounting-group-user',
default=ACCOUNTING_GROUP_USER,
help='accounting_group_user for condor submission on the '
'LIGO Data Grid',
)
cargs.add_argument(
'--condor-timeout',
type=float,
default=None,
metavar='T',
help='configure condor to terminate jobs after T hours '
'to prevent idling, default: %(default)s',
)
cargs.add_argument(
'--condor-command',
action='append',
default=[],
help='Extra condor submit commands to add to '
'gw_summary submit file. Can be given '
'multiple times in the form "key=value"',
)
# return the argument parser
return parser
# -- main code block ----------------------------------------------------------
[docs]
def main(args=None):
"""Run the command-line Omega scan tool in batch mode
"""
parser = create_parser()
args = parser.parse_args(args=args)
# set up output directory
outdir = os.path.abspath(
os.path.expanduser(args.output_dir))
# parse analysis times
times = _parse_analysis_times(
getattr(args, 'gpstime'))
# get condor arguments
condorcmds = get_condor_arguments(
accounting_group=args.condor_accounting_group,
accounting_group_user=args.condor_accounting_group_user,
timeout=args.condor_timeout,
extra_commands=args.condor_command,
gps=max(times),
)
# get command-line flags
flags = get_command_line_flags(
args.ifo,
fscale=args.frequency_scaling,
colormap=args.colormap,
nproc=args.nproc,
far=args.far_threshold,
config_file=args.config_file,
disable_correlation=args.disable_correlation,
disable_checkpoint=args.disable_checkpoint,
ignore_state_flags=args.ignore_state_flags,
)
# -- generate workflow ------------
# write and submit the DAG
dagman = generate_dag(
times,
flags=flags,
tag="gwdetchar-omega-batch",
submit=args.submit,
outdir=outdir,
universe=args.universe,
condor_commands=condorcmds,
request_cpus=args.nproc,
)
# monitor DAG progress
if (args.submit and args.monitor):
_monitor_dag_workflow(dagman)
# -- run from command-line ----------------------------------------------------
if __name__ == "__main__":
main()