===== Accounting with HTCondor-CE (Draft) =====
A good thing with HTCondor-CE is that accounting can be greatly simplified by the fact that job history files are self consistent, as they contain both grid and batch data: thus, there is no need for blah records, nor to search for matches from distinct sets of grid and batch usage records.
To enable job history files creation one has to define on each schedd
* ''PER_JOB_HISTORY_DIR=/your/preferred/path/''
then there will be a HTCondor job history file for each finished job. Each row in the file has the format: ''key = value''.
Using python parsing the file into a python dictionary is almost straightforward:
def jobfile2dict(fn):
try:
f = open(fn,'r')
except IOError:
return {}
return dict([map(str.strip, x.split('=',1)) for x in f])
It is sufficient to read the ''(key,value)'' pairs of interest to forge an usage record to add in a table of some database. After that
the job history file is moved to another folder, to prevent doublecounting.
See below an example script performing just that. Note that it extract more ''key/value'' pairs than those strictly needed for accounting,
such as the job exit status and the hostname of the Worker Node, which can be used to lookup the HS06 power of the machine and obtain a punctual WallClkockTime normalization.
#!/usr/bin/env python
import os, sys, time, json
import psycopg2
from socket import getfqdn
myfqdn = getfqdn()
myhn = myfqdn.split('.')[0]
myconf = '/etc/t1acct/htc_gratia_to_db.conf'
mylog = '/tmp/htc_acct.log'
"""
Example configuration file
[root@ce02-htc ~]# cat /etc/t1acct/htc_gratia_to_db.conf
{
"job": {
"htc_jobdir": "/var/lib/gratia/data/",
"ce_htc_jobdir": "/var/lib/ce_gratia/data/",
"htc_jobparsed": "/var/lib/gratia/data.tmp/",
"htc_jobaccounted": "/var/lib/gratia/data.bck/"
},
"log":{
"logfile": "/tmp/acct2apel_pg.log"
},
"db": {
"database":"acct",
"host":"myhostdb.fully.qualified.domain",
"user":"accguy",
"password":"type_your_one_here"
}
}
"""
def mlog(msg, logfile = mylog):
"""mlog(msg, logfile = mylog) --> append the string msg+'\n' to the specified logfile. If logfile is not specified, mylog is used instead, which must be externally defined.
"""
f = open(logfile, 'a')
f.write("%s: "%now()+msg+'\n')
f.flush()
f.close()
try:
f = open(myconf,'r')
cnf = json.load(f)
except Exception,e:
mlog("ERROR: %s"%str(e))
sys.exit(1)
mylog = cnf['log']['logfile']
f.close()
def help():
print "Usage example: python htc_gratiajobs.py"
print "parse HTCondor history job files from $(PER_JOB_HISTORY_DIR)"
print "insert accounting data in database and move file to other folder"
sys.exit(0)
def now():
return time.ctime(time.time())
class dbconn():
def __init__(self,database='dbname',host='dbhost.cr.cnaf.infn.it',user='dbuser',password='dbpasswd'):
self.conn = psycopg2.connect(database=database,host=host,user=user,password=password)
self.curs = self.conn.cursor()
conndict = cnf['db']
qc = dbconn(**conndict)
dt2sec = lambda x : x.days * 86400 + x.seconds
idf = lambda x : x
thishn = lambda x : myhn
# cfr. manual: condor-V8_8_1-Manual/JobClassAdAttributes.html
# ExitStatus:
# The way that HTCondor previously dealt with a job's exit status. This attribute should no longer be used. It is not always accurate in heterogeneous pools, or if the job exited with a signal. Instead, see the attributes: ExitBySignal, ExitCode, and ExitSignal.
#APEL: takes the following keys:
#GlobalJobId Owner RemoteWallClockTime RemoteUserCpu RemoteSysCpu JobStartDate EnteredCurrentStatus ResidentSetSize_RAW ImageSize_RAW RequestCpus
d = {'ClusterId': 'jobid',
'Cmd': 'jobname',
'fromhost':'fromhost',
'EnteredCurrentStatus': 'eventtimeepoch',
'CumulativeRemoteSysCpu': 'stime',
'CumulativeRemoteUserCpu': 'utime',
'CumulativeSlotTime': 'runtime',
'ExitCode': 'exitstatus',
'GlobalJobId': 'ceid',
'JobStartDate': 'starttimeepoch',
'LastRemoteHost': 'exechosts',
'OriginalCpus': 'numprocessors',
'Owner': 'username',
'ProcId': 'idx',
'QDate': 'submittimeepoch',
'Requirements': 'resreq',
'RoutedFromJobId': 'bljobid',
'ResidentSetSize_RAW': 'maxrmem',
'ImageSize_RAW':'maxrswap',
'x509UserProxyFirstFQAN': 'userfqan',
'x509UserProxyVOName': 'queue',
'x509userproxysubject': 'userdn',
'GPUsProvisioned':'gpu'
}
jdir = cnf['job']['htc_jobdir']
def jobfile2dict(fn):
try:
f = open(fn,'r')
except IOError:
return {}
return dict([map(str.strip, x.split('=',1)) for x in f.readlines()])
def cleanval(v):
if v is None:
return v
if v and v[0] == v[-1] == '"' :
return v[1:-1]
try:
return int(float(v))
except ValueError,TypeError:
return v
getwn = lambda s: s and s.split('@',1)[-1].split('.')[0]
#use os.scandir() iterator on python3
F = os.listdir(jdir)
if not os.path.isdir(cnf['job']['htc_jobparsed']):
os.mkdir(cnf['job']['htc_jobparsed'])
if not os.path.isdir(cnf['job']['htc_jobaccounted']):
os.mkdir(cnf['job']['htc_jobaccounted'])
K = d.keys()
sq = """INSERT INTO htjob (%s) VALUES (%s)"""%(','.join([d[k] for k in K]),('%s,'*len(d)).rstrip(','))
for n,fn in enumerate(F):
if not fn.startswith('history.'): continue
jf = os.path.join(jdir,fn)
jd = jobfile2dict(jf)
dtup = dict([(k,jd.get(k)) for k in d.keys()])
df = {}
for k,v in dtup.items():
df[k] = cleanval(v)
df['fromhost'] = thishn(myfqdn)
df['LastRemoteHost'] = getwn(df.get('LastRemoteHost',''))
df['Cmd'] = jd['Cmd'][-80:]
df['Requirements'] = jd.get('Requirements','')[:255]
for k in ['CumulativeRemoteSysCpu','CumulativeRemoteUserCpu','GPUsProvisioned','JobStartDate','ExitCode']:
df[k] = df[k] or 0
print n,qc.curs.mogrify(sq,tuple([df[k] for k in K]))
qc.curs.execute(sq,tuple([df[k] for k in K]))
os.rename(jf,os.path.join(cnf['job']['htc_jobparsed'],fn))
qc.conn.commit()
qc.curs.close()
qc.conn.close()
for fn in os.listdir(cnf['job']['htc_jobparsed']):
os.rename(os.path.join(cnf['job']['htc_jobparsed'],fn),os.path.join(cnf['job']['htc_jobaccounted'],fn))