Source code for xpipeline.xcondor

from glue import pipeline
import os
# -------------------------------------------------------------------------
#      Define special job classes.
# -------------------------------------------------------------------------

[docs]class XsearchJob(pipeline.CondorDAGJob, pipeline.AnalysisJob): """ An x search job """ def __init__(self,cp): """ cp = ConfigParser object from which options are read. """ # ---- Get path to executable from parameters file. # self.__executable = cp.get('condor','xsearch') # ---- Get path to executable. os.system('which xpipeline-analysis > path_file.txt') f = open('path_file.txt','r') xpipeline_analysisstr = f.read() f.close() os.system('rm path_file.txt') self.__executable = xpipeline_analysisstr # ---- Get condor universe from parameters file. self.__universe = cp.get('condor','universe') pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) pipeline.AnalysisJob.__init__(self,cp,False) self.__param_file = None # ---- Add required environment variables. self.add_condor_cmd('getenv',"true") # ----Add Accounting Group Flag grouptag = 'ligo.' + \ cp.get('condor','ProdDevSim') + '.' +\ cp.get('condor','Era') + '.' +\ cp.get('condor','Group') + '.' +\ cp.get('condor','SearchType') self.add_condor_cmd('accounting_group',grouptag) # ----Add Username Flag self.add_condor_cmd('accounting_group_user', cp.get('condor','UserName')) # ---- Add priority specification self.add_condor_cmd('priority', cp.get('condor', 'condorPriority')) # --- add minimal memory needed self.add_condor_cmd('request_memory', cp.get('condor', 'minimalSearchJobCPUMem')) # ---- Path and file names for standard out, standard error for this job. self.set_stdout_file('logs/xsearch-$(cluster)-$(process).out') self.set_stderr_file('logs/xsearch-$(cluster)-$(process).err') # ---- Name of condor job submission file to be written. self.set_sub_file('xsearch.sub')
[docs]class XsearchGPUJob(pipeline.CondorDAGJob, pipeline.AnalysisJob): """ An x search job """ def __init__(self,cp): """ cp = ConfigParser object from which options are read. """ # FOR GPUS # ---- Get path to executable from parameters file. # self.__executable = cp.get('condor','xsearch') # ---- Get path to executable. os.system('which xpipeline-analysis > path_file.txt') f = open('path_file.txt','r') xpipeline_analysisstr = f.read() f.close() os.system('rm path_file.txt') self.__executable = xpipeline_analysisstr # ---- Get condor universe from parameters file. self.__universe = cp.get('condor','universe') pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) pipeline.AnalysisJob.__init__(self,cp,False) self.__param_file = None # ---- Add required environment variables. self.add_condor_cmd('environment',"USER=$ENV(USER);HOME=$ENV(HOME);" \ "LD_LIBRARY_PATH=$ENV(LD_LIBRARY_PATH);" \ "XPIPE_INSTALL_BIN=$ENV(XPIPE_INSTALL_BIN);" \ "PATH=/usr/bin:/bin") # ----Add Accounting Group Flag grouptag = 'ligo.' + \ cp.get('condor','ProdDevSim') + '.' +\ cp.get('condor','Era') + '.' +\ cp.get('condor','Group') + '.' +\ cp.get('condor','SearchType') self.add_condor_cmd('accounting_group',grouptag) # ----Add Username Flag self.add_condor_cmd('accounting_group_user',cp.get('condor','UserName')) # ---- Add priority specification self.add_condor_cmd('priority', cp.get('condor', 'condorPriority')) # --- add minimal memory needed self.add_condor_cmd('request_memory', cp.get('condor', 'minimalSearchJobGPUMem')) # ---- Path and file names for standard out, standard error for this job. self.set_stdout_file('logs/xsearch-$(cluster)-$(process).out') self.set_stderr_file('logs/xsearch-$(cluster)-$(process).err') # If on Atlas, use getenv=true to pass variables #if atlasFlag: # self.add_condor_cmd('getenv',"true") self.add_condor_cmd('Requirements','TARGET.WantGPU =?= True') self.add_condor_cmd('+WantGPU','True') # ---- Name of condor job submission file to be written. self.set_sub_file('xsearch_gpu.sub')
[docs]class XsearchNode(pipeline.CondorDAGNode, pipeline.AnalysisNode): """ xsearch node """ def __init__(self,job): """ job = A CondorDAGJob. """ pipeline.CondorDAGNode.__init__(self,job) pipeline.AnalysisNode.__init__(self) self.__x_jobnum = None self.__x_injnum = None # ---- Set parameters file.
[docs] def set_param_file(self, path): self.add_var_arg('--parameter-file ' + path) self.__param_file = path
[docs] def get_param_file(self): return self.__param_file
[docs] def set_x_jobnum(self, n): self.add_var_arg('--event-numbers ' + str(n)) self.__x_jobnum = n
[docs] def get_x_jobnum(self): return self.__x_jobnum
[docs] def set_output_dir(self, path): self.add_var_arg('--job-type ' + path) self.__output_dir = path
[docs] def get_output_dir(self,path): return self.__output_dir
[docs] def set_x_injnum(self, n): self.add_var_arg('--injection-numbers ' + n) self.__x_injnum = n
[docs] def get_x_injnum(self): return self.__x_injnum
[docs]class XmergeJob(pipeline.CondorDAGJob, pipeline.AnalysisJob): """ An x merge job """ def __init__(self,cp): """ cp = ConfigParser object from which options are read. """ # ---- Get path to executable. os.system('which xmergegrbresults > path_file.txt') f = open('path_file.txt','r') xmergestr = f.read() f.close() os.system('rm path_file.txt') self.__executable = xmergestr # ---- Get condor universe from parameters file. self.__universe = cp.get('condor','universe') pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) pipeline.AnalysisJob.__init__(self,cp,False) # ---- Add name of 'output' directory as first argument. self.add_arg('output') # ---- Add required environment variables. self.add_condor_cmd('environment',"USER=$ENV(USER);HOME=$ENV(HOME);" \ "LD_LIBRARY_PATH=$ENV(LD_LIBRARY_PATH)") # ----Add Accounting Group Flag grouptag = 'ligo.' + \ cp.get('condor','ProdDevSim') + '.' +\ cp.get('condor','Era') + '.' +\ cp.get('condor','Group') + '.' +\ cp.get('condor','SearchType') self.add_condor_cmd('accounting_group',grouptag) # ----Add Username Flag self.add_condor_cmd('accounting_group_user',cp.get('condor','UserName')) # ---- Add priority specification self.add_condor_cmd('priority', cp.get('condor', 'condorPriority')) # --- add minimal memory needed self.add_condor_cmd('request_memory', cp.get('condor', 'minimalMergeJobMem')) # ---- Path and file names for standard out, standard error for this job. self.set_stdout_file('logs/xmerge-$(cluster)-$(process).out') self.set_stderr_file('logs/xmerge-$(cluster)-$(process).err') # If on Atlas #if atlasFlag: # self.add_condor_cmd('getenv',"true") # ---- Name of condor job submission file to be written. self.set_sub_file('xmerge.sub')
[docs]class XmergeNode(pipeline.CondorDAGNode, pipeline.AnalysisNode): """ merge results that were cut into blocks of less than maxInjNum jobs """ def __init__(self,job): pipeline.CondorDAGNode.__init__(self,job) pipeline.AnalysisNode.__init__(self)
[docs] def set_dir_prefix(self,path): self.add_var_arg(path) self.__dir_prefix = path
[docs] def get_dir_prefix(self): return self.__dir_prefix
[docs] def set_sn_flag(self,path): self.add_var_arg(path) self.__sn_flag = path
[docs] def get_sn_flag(self): return self.__sn_flag
[docs]class XmergeClusteredJob(pipeline.CondorDAGJob, pipeline.AnalysisJob): """ An x merge clustered job """ def __init__(self,cp,nDet): # ---- Get path to executable. """ cp = ConfigParser object from which options are read. """ if nDet==3: os.system('which xmergegrbclusteredresults > path_file.txt') else: os.system('which xmergegrbclusteredresultsTwoDets > path_file.txt') f = open('path_file.txt','r') xmergestr = f.read() f.close() os.system('rm path_file.txt') self.__executable = xmergestr # ---- Get condor universe from parameters file. self.__universe = cp.get('condor','universe') pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) pipeline.AnalysisJob.__init__(self,cp,False) # ---- Add name of 'output' directory as first argument. self.add_arg('output_clustered') # ---- Add required environment variables. self.add_condor_cmd('environment',"USER=$ENV(USER);HOME=$ENV(HOME);" \ "LD_LIBRARY_PATH=$ENV(LD_LIBRARY_PATH)") # ----Add Accounting Group Flag grouptag = 'ligo.' + \ cp.get('condor','ProdDevSim') + '.' +\ cp.get('condor','Era') + '.' +\ cp.get('condor','Group') + '.' +\ cp.get('condor','SearchType') self.add_condor_cmd('accounting_group',grouptag) # ----Add Username Flag self.add_condor_cmd('accounting_group_user',cp.get('condor','UserName')) # ---- Add priority specification. self.add_condor_cmd('priority', cp.get('condor', 'condorPriority')) # ---- Add minimal memory request. Note that with current default minimalMem # we never access the "else" statement below. self.add_condor_cmd('request_memory', cp.get('condor', 'minimalMergeClusterJobMem')) # ---- Path and file names for standard out, standard error for this job. self.set_stdout_file('logs/xmergeclustered-$(cluster)-$(process).out') self.set_stderr_file('logs/xmergeclustered-$(cluster)-$(process).err') # If on Atlas #if atlasFlag: # self.add_condor_cmd('getenv',"true") # ---- Name of condor job submission file to be written. self.set_sub_file('xmergeclustered.sub')
[docs]class XmergeClusteredNode(pipeline.CondorDAGNode, pipeline.AnalysisNode): """ cluster and merge results that were cut into blocks of less than maxInjNum jobs """ def __init__(self,job): pipeline.CondorDAGNode.__init__(self,job) pipeline.AnalysisNode.__init__(self)
[docs] def set_dir_prefix(self,path): self.add_var_arg(path) self.__dir_prefix = path
[docs] def get_dir_prefix(self): return self.__dir_prefix
[docs] def set_sn_flag(self,path): self.add_var_arg(path) self.__sn_flag = path
[docs] def get_sn_flag(self): return self.__sn_flag
[docs] def set_sc_flag(self,path): self.add_var_arg(path) self.__sc_flag = path
[docs] def get_sc_flag(self): return self.__sc_flag
[docs]class XtmvaJob(pipeline.CondorDAGJob, pipeline.AnalysisJob): """ An xtmva classification job. """ def __init__(self,cp,nDet): """ cp = ConfigParser object from which options are read. """ # ---- Get path to executable. self.__executable = os.getcwd() + "/xtmvapy/xtmva.py \n" # ---- Get condor universe from parameters file. self.__universe = cp.get('condor','universe') pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) pipeline.AnalysisJob.__init__(self,cp,False) # ---- Add name of 'output' directory as first argument. cwdstr_split = os.path.split(os.getcwd()) fdir = cwdstr_split[0] grbdir = cwdstr_split[1] argumentstr = " --fdir=" + fdir + " --grbname=" + grbdir + " --nifo=" + str(nDet) + " --cname=xtmva.ini " print(argumentstr, file=sys.stdout) # ---- Add name of 'output' directory as first argument. self.add_arg(argumentstr) # ---- Define environment variables that are needed by xtmva.py. We hard-code # most of them to avoid need to create a special gen-wrapper-script*.sh # script for this one function. envarstr = "USER=" + os.getenv("USER") + ";" \ + "HOME=" + os.getenv("HOME") + ";" \ + "LD_LIBRARY_PATH=" + os.getenv("LD_LIBRARY_PATH") + ";" \ + "XPIPE_INSTALL_BIN=$ENV(XPIPE_INSTALL_BIN);" \ + "PATH=" + os.getenv("ROOTSYS") + "/bin:" + os.getenv("XPIPE_INSTALL_BIN") + ":/usr/bin:/bin;" \ + "LIBPATH=" + os.getenv("LIBPATH") + ";" \ + "DYLD_LIBRARY_PATH=" + os.getenv("DYLD_LIBRARY_PATH") + ";" \ + "PYTHONPATH=" + os.getenv("PYTHONPATH") + ";" \ + "SHLIB_PATH=" + os.getenv("SHLIB_PATH") + ";" \ + "ROOTSYS=" + os.getenv("ROOTSYS") # ---- Add required environment variables. self.add_condor_cmd('environment',envarstr) # ---- Add priority specification. self.add_condor_cmd('priority', cp.get('condor', 'condorPriority')) # ---- Add minimal memory request. self.add_condor_cmd('request_memory', cp.get('condor', 'minimalXtmvaJobMem')) # ----Add Accounting Group Flag grouptag = 'ligo.' + \ cp.get('condor','ProdDevSim') + '.' +\ cp.get('condor','Era') + '.' +\ cp.get('condor','Group') + '.' +\ cp.get('condor','SearchType') self.add_condor_cmd('accounting_group',grouptag) # ----Add Username Flag self.add_condor_cmd('accounting_group_user',cp.get('condor','UserName')) # ---- Path and file names for standard out, standard error for this job. self.set_stdout_file('logs/xtmva-$(cluster)-$(process).out') self.set_stderr_file('logs/xtmva-$(cluster)-$(process).err') # If on Atlas #if atlasFlag: # self.add_condor_cmd('getenv',"true") # ---- Name of condor job submission file to be written. self.set_sub_file('xtmva.sub')
[docs]class XtmvaNode(pipeline.CondorDAGNode, pipeline.AnalysisNode): """ cluster and merge results that were cut into blocks of less than maxInjNum jobs """ def __init__(self,job): pipeline.CondorDAGNode.__init__(self,job) pipeline.AnalysisNode.__init__(self)