import os
import subprocess
from .utils import (checkdir, get_condor_version, requires_command,
split_command_string, decode_string)
from .basenode import BaseNode
from .job import Job
from .visualize import visualize as _visualize
def _get_subdag_string(dagman):
if not isinstance(dagman, Dagman):
raise TypeError(
'Expecting a Dagman object, got {}'.format(type(dagman)),
)
subdag_string = 'SUBDAG EXTERNAL {} {}'.format(dagman.submit_name,
dagman.submit_file)
return subdag_string
def _iter_job_args(job):
"""
Iterates over Job args list. Yields the name (and JobArg) for each node
to be used when adding job to a Dagman (i.e. the name in the
'JOB name job_submit_file' line).
Parameters
----------
job : Job
Job to iterate over. Note that the submit file for job must be built
prior to using _iter_job_args.
Yields
------
node_name : str
Node name to use in Dagman object.
job_arg : JobArg namedtuple
Job argument object (``arg``, ``name``, ``retry`` attributes).
"""
if not isinstance(job, Job):
raise TypeError('Expecting a Job object, got {}'.format(type(job)))
if not getattr(job, '_built', False):
raise ValueError('Job {} must be built before adding it '
'to a Dagman'.format(job.name))
if len(job.args) == 0:
return
else:
for idx, job_arg in enumerate(job):
arg, name, retry = job_arg
if name is not None:
node_name = '{}_{}'.format(job.submit_name, name)
else:
node_name = '{}_arg_{}'.format(job.submit_name, idx)
yield node_name, job_arg
def _get_parent_child_string(node):
"""Constructs the parent/child line for node to be added to a Dagman
"""
if not isinstance(node, BaseNode):
raise ValueError('Expecting a Job or Dagman object, '
'got {}'.format(type(node)))
parent_string = 'Parent'
for parent_node in node.parents:
if isinstance(parent_node, Job) and len(parent_node) > 0:
for node_name, job_arg in _iter_job_args(parent_node):
parent_string += ' {}'.format(node_name)
else:
parent_string += ' {}'.format(parent_node.submit_name)
child_string = 'Child'
if isinstance(node, Job) and len(node) > 0:
for node_name, job_arg in _iter_job_args(node):
child_string += ' {}'.format(node_name)
else:
child_string += ' {}'.format(node.submit_name)
parent_child_string = parent_string + ' ' + child_string
return parent_child_string
[docs]class Dagman(BaseNode):
"""
Dagman object consisting of a series of Jobs and sub-Dagmans to manage.
Note that the ``submit`` parameter can be explicitly given or configured
by setting the ``PYCONDOR_SUBMIT_DIR`` environment variable. An explicitly
given value for ``submit`` will be used over the environment variable,
while the environment variable will be used over a default value.
Parameters
----------
name : str
Name of the Dagman instance. This will also be the name of the
corresponding error, log, output, and submit files associated with
this Dagman.
submit : str
Path to directory where condor dagman submit files will be written
(defaults to the directory was the Dagman was submitted from).
extra_lines : list or None, optional
List of additional lines to be added to submit file.
.. versionadded:: 0.1.1
dag : Dagman, optional
If specified, Dagman will be added to dag as a subdag
(default is None).
verbose : int, optional
Level of logging verbosity option are 0-warning, 1-info,
2-debugging (default is 0).
Attributes
----------
jobs : list
The list of jobs for this Dagman instance to manage.
parents : list
List of parent Jobs and Dagmans. Ensures that Jobs and Dagmans in the
parents list will complete before this Dagman is submitted to HTCondor.
children : list
List of child Jobs and Dagmans. Ensures that Jobs and Dagmans in the
children list will be submitted only after this Dagman has completed.
"""
def __init__(self, name, submit=None, extra_lines=None, dag=None,
verbose=0):
super(Dagman, self).__init__(name, submit, extra_lines, dag, verbose)
self.nodes = []
self._has_bad_node_names = False
self.logger.debug('{} initialized'.format(self.name))
def __repr__(self):
nondefaults = ''
for attr in sorted(vars(self)):
if getattr(self, attr) and attr not in ['name', 'nodes', 'logger']:
nondefaults += ', {}={}'.format(attr, getattr(self, attr))
output = 'Dagman(name={}, n_nodes={}{})'.format(self.name,
len(self.nodes),
nondefaults)
return output
def __iter__(self):
return iter(self.nodes)
def __len__(self):
return len(self.nodes)
def __contains__(self, item):
return item in self.nodes
def _hasnode(self, node):
return node in self.nodes
def _add_node(self, node):
# Don't bother adding node if it's already been added
if self._hasnode(node):
return self
if isinstance(node, BaseNode):
self.nodes.append(node)
else:
raise TypeError('Expecting a Job or Dagman. '
'Got an object of type {}'.format(type(node)))
self.logger.debug(
'Added {} to Dagman {}'.format(node.name, self.name))
return self
[docs] def add_job(self, job):
"""Add job to Dagman
Parameters
----------
job : Job
Job to append to Dagman jobs list.
Returns
-------
self : object
Returns self.
"""
self._add_node(job)
return self
[docs] def add_subdag(self, dag):
"""Add dag to Dagman
Parameters
----------
dag : Dagman
Subdag to append to Dagman jobs list.
Returns
-------
self : object
Returns self.
"""
self._add_node(dag)
return self
def _get_job_arg_lines(self, job, fancyname):
"""Constructs the lines to be added to a Dagman related to job
"""
if not isinstance(job, Job):
raise TypeError('Expecting a Job object, got {}'.format(type(job)))
if not getattr(job, '_built', False):
raise ValueError('Job {} must be built before adding it '
'to a Dagman'.format(job.name))
job_arg_lines = []
if len(job.args) == 0:
job_line = 'JOB {} {}'.format(job.submit_name, job.submit_file)
job_arg_lines.append(job_line)
else:
for node_name, job_arg in _iter_job_args(job):
# Check that '.' or '+' are not in node_name
if '.' in node_name or '+' in node_name:
self._has_bad_node_names = True
arg, name, retry = job_arg
# Add JOB line with Job submit file
job_line = 'JOB {} {}'.format(node_name, job.submit_file)
job_arg_lines.append(job_line)
# Add job ARGS line for command line arguments
arg_line = 'VARS {} ARGS="{}"'.format(node_name, arg)
job_arg_lines.append(arg_line)
# Define job_name variable if there are arg_names for job
if job._has_arg_names:
if name is not None:
job_name = node_name
else:
job_name = job.submit_name
job_name_line = 'VARS {} job_name="{}"'.format(node_name,
job_name)
job_arg_lines.append(job_name_line)
# Add retry line for Job
if retry is not None:
retry_line = 'Retry {} {}'.format(node_name, retry)
job_arg_lines.append(retry_line)
return job_arg_lines
[docs] def build(self, makedirs=True, fancyname=True):
"""Build and saves the submit file for Dagman
Parameters
----------
makedirs : bool, optional
If Job directories (e.g. error, output, log, submit) don't exist,
create them (default is ``True``).
fancyname : bool, optional
Appends the date and unique id number to error, log, output, and
submit files. For example, instead of ``dagname.submit`` the submit
file becomes ``dagname_YYYYMMD_id``. This is useful when running
several Dags/Jobs of the same name (default is ``True``).
Returns
-------
self : object
Returns self.
"""
if getattr(self, '_built', False):
self.logger.warning(
'{} submit file has already been built. '
'Skipping the build process...'.format(self.name),
)
return self
name = self._get_fancyname() if fancyname else self.name
submit_file = os.path.join(self.submit, '{}.submit'.format(name))
self.submit_file = submit_file
self.submit_name = name
checkdir(self.submit_file, makedirs)
# Build submit files for all nodes in self.nodes
# Note: nodes must be built before the submit file for self is built
for node_index, node in enumerate(self.nodes, start=1):
if isinstance(node, Job):
node._build_from_dag(makedirs, fancyname)
elif isinstance(node, Dagman):
node.build(makedirs, fancyname)
else:
raise TypeError('Nodes must be either a Job or Dagman object')
# Write dag submit file
self.logger.info('Building DAG submission file {}...'.format(
self.submit_file))
lines = []
parent_child_lines = []
for node_index, node in enumerate(self.nodes, start=1):
self.logger.info('Working on {} [{} of {}]'.format(node.name,
node_index, len(self.nodes)))
# Build the BaseNode submit file
if isinstance(node, Job):
# Add Job variables to Dagman submit file
job_arg_lines = self._get_job_arg_lines(node, fancyname)
lines.extend(job_arg_lines)
elif isinstance(node, Dagman):
subdag_string = _get_subdag_string(node)
lines.append(subdag_string)
else:
raise TypeError('Nodes must be either a Job or Dagman object')
# Add parent/child information, if necessary
if node.hasparents():
parent_child_string = _get_parent_child_string(node)
parent_child_lines.append(parent_child_string)
# Add any extra lines to submit file, if specified
if self.extra_lines:
lines.extend(self.extra_lines)
# Write lines to dag submit file
with open(submit_file, 'w') as dag:
dag.writelines('\n'.join(
lines
+ ['\n#Inter-job dependencies']
+ parent_child_lines),
)
self._built = True
self.logger.info('Dagman submission file for {} successfully '
'built!'.format(self.name))
return self
[docs] @requires_command('condor_submit_dag')
def submit_dag(self, submit_options=None):
"""Submits Dagman to condor
Parameters
----------
submit_options : str, optional
Options to be passed to ``condor_submit_dag`` for this Dagman
(see the `condor_submit_dag documentation
<http://research.cs.wisc.edu/htcondor/manual/current/condor_submit_dag.html>`_
for possible options).
Returns
-------
self : object
Returns self.
"""
# Construct condor_submit_dag command
command = 'condor_submit_dag'
if submit_options is not None:
command += ' {}'.format(submit_options)
command += ' {}'.format(self.submit_file)
submit_dag_proc = subprocess.Popen(
split_command_string(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Check that there are no illegal node names for newer condor versions
condor_version = get_condor_version()
if condor_version >= (8, 7, 2) and self._has_bad_node_names:
raise RuntimeError(
"Found an illegal character (either '+' or '.') in the "
"name for a node in Dagman {}. As of HTCondor version "
"8.7.2, '+' and '.' are prohibited in Dagman node names. "
"This means a '+' or '.' character is in a Job name, "
"Dagman name, or the name for a Job argument.".format(
self.name,
),
)
# Execute condor_submit_dag command
out, err = submit_dag_proc.communicate()
print(decode_string(out))
return self
[docs] @requires_command('condor_submit_dag')
def build_submit(self, makedirs=True, fancyname=True, submit_options=None):
"""Calls build and submit sequentially
Parameters
----------
makedirs : bool, optional
If Job directories (e.g. error, output, log, submit) don't exist,
create them (default is ``True``).
fancyname : bool, optional
Appends the date and unique id number to error, log, output, and
submit files. For example, instead of ``dagname.submit`` the submit
file becomes ``dagname_YYYYMMD_id``. This is useful when running
several Dags/Jobs of the same name (default is ``True``).
submit_options : str, optional
Options to be passed to ``condor_submit_dag`` for this Dagman
(see the `condor_submit_dag documentation
<http://research.cs.wisc.edu/htcondor/manual/current/condor_submit_dag.html>`_
for possible options).
Returns
-------
self : object
Returns self.
"""
self.build(makedirs, fancyname)
self.submit_dag(submit_options=submit_options)
return self
[docs] def visualize(self, filename=None):
"""Visualize Dagman graph
Parameters
----------
filename : str or None, optional
File to save graph diagram to. If ``None`` then no file is saved.
Valid file extensions are 'png', 'pdf', 'dot', 'svg', 'jpeg', 'jpg'.
"""
g = _visualize(self, filename=filename)
return g