User Tools

Site Tools


progetti:htcondor-tf:htcondor-ce_accounting

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
progetti:htcondor-tf:htcondor-ce_accounting [2019/05/10 09:42]
dalpra@infn.it
progetti:htcondor-tf:htcondor-ce_accounting [2019/07/10 11:52] (current)
dalpra@infn.it
Line 1: Line 1:
 +===== Accounting with HTCondor-CE (Draft) =====
  
 +A good thing with HTCondor-CE is that accounting can be greatly simplified by the fact that job history files are self consistent, as they contain both grid and batch data: thus, there is no need for blah records, nor to search for matches from distinct sets of grid and batch usage records.
 +
 +To enable job history files creation one has to define on each schedd
 +  * ''PER_JOB_HISTORY_DIR=/your/preferred/path/''
 +
 +then there will be a HTCondor job history file for each finished job. Each row in the file has the format: ''key = value''.
 +Using python parsing the file into a python dictionary is almost straightforward:
 +
 +<code>
 +def jobfile2dict(fn):
 +    try:
 +        f = open(fn,'r')
 +    except IOError:
 +        return {}
 +    return dict([map(str.strip, x.split('=',1)) for x in f])
 +</code>
 +
 +It is sufficient to read the ''(key,value)'' pairs of interest to forge an usage record to add in a table of some database. After that
 +the job history file is moved to another folder, to prevent doublecounting.
 +
 +See below an example script performing just that. Note that it extract more ''key/value'' pairs than those strictly needed for accounting,
 +such as the job exit status and the hostname of the Worker Node, which can be used to lookup the HS06 power of the machine and obtain a punctual WallClkockTime normalization.
 +
 +<code>
 +#!/usr/bin/env python
 +
 +import os, sys, time, json
 +import psycopg2
 +from socket import getfqdn
 +
 +myfqdn = getfqdn()
 +myhn  = myfqdn.split('.')[0]
 +myconf = '/etc/t1acct/htc_gratia_to_db.conf'
 +mylog = '/tmp/htc_acct.log'
 +
 +"""
 +Example configuration file
 +[root@ce02-htc ~]# cat /etc/t1acct/htc_gratia_to_db.conf
 +{
 +"job": {
 +  "htc_jobdir": "/var/lib/gratia/data/",
 +  "ce_htc_jobdir": "/var/lib/ce_gratia/data/",
 +  "htc_jobparsed": "/var/lib/gratia/data.tmp/",
 +  "htc_jobaccounted": "/var/lib/gratia/data.bck/"
 +  },
 +"log":{
 +  "logfile": "/tmp/acct2apel_pg.log"
 +  },
 +"db": {
 +  "database":"acct",
 +  "host":"myhostdb.fully.qualified.domain",
 +  "user":"accguy",
 +  "password":"type_your_one_here"
 +  }
 +}
 +
 +"""
 +
 +def mlog(msg, logfile = mylog):
 +  """mlog(msg, logfile = mylog) --> append the string msg+'\n' to the specified logfile. If logfile is not specified, mylog is used instead, which must be externally defined.
 +  """
 +  f = open(logfile, 'a')
 +  f.write("%s: "%now()+msg+'\n')
 +  f.flush()
 +  f.close()
 +
 +try:
 +    f = open(myconf,'r')
 +    cnf =  json.load(f)
 +except Exception,e:
 +    mlog("ERROR: %s"%str(e))
 +    sys.exit(1)
 +
 +mylog = cnf['log']['logfile']
 +f.close()
 +
 +def help():
 +    print "Usage example: python htc_gratiajobs.py"
 +    print "parse HTCondor history job files from $(PER_JOB_HISTORY_DIR)"
 +    print "insert accounting data in database and move file to other folder"
 +    sys.exit(0)
 +
 +def now():
 +    return time.ctime(time.time())
 +
 +class dbconn():
 +    def __init__(self,database='dbname',host='dbhost.cr.cnaf.infn.it',user='dbuser',password='dbpasswd'):
 +        self.conn = psycopg2.connect(database=database,host=host,user=user,password=password)
 +        self.curs = self.conn.cursor()
 +
 +conndict = cnf['db']
 +qc = dbconn(**conndict)
 +        
 +dt2sec = lambda x : x.days * 86400 + x.seconds
 +idf = lambda x : x
 +thishn = lambda x : myhn
 +
 +# cfr. manual: condor-V8_8_1-Manual/JobClassAdAttributes.html
 +# ExitStatus:
 +# The way that HTCondor previously dealt with a job's exit status. This attribute should no longer be used. It is not always accurate in heterogeneous pools, or if the job exited with a signal. Instead, see the attributes: ExitBySignal, ExitCode, and ExitSignal. 
 +
 +#APEL: takes the following keys:
 +#GlobalJobId Owner RemoteWallClockTime RemoteUserCpu RemoteSysCpu JobStartDate EnteredCurrentStatus ResidentSetSize_RAW ImageSize_RAW RequestCpus
 +
 +d = {'ClusterId': 'jobid',
 +     'Cmd': 'jobname',
 +     'fromhost':'fromhost',
 +     'EnteredCurrentStatus': 'eventtimeepoch',
 +     'CumulativeRemoteSysCpu': 'stime',
 +     'CumulativeRemoteUserCpu': 'utime',
 +     'CumulativeSlotTime': 'runtime',
 +     'ExitCode': 'exitstatus',
 +     'GlobalJobId': 'ceid',
 +     'JobStartDate': 'starttimeepoch',
 +     'LastRemoteHost': 'exechosts',
 +     'OriginalCpus': 'numprocessors',
 +     'Owner': 'username',
 +     'ProcId': 'idx',
 +     'QDate': 'submittimeepoch',
 +     'Requirements': 'resreq',
 +     'RoutedFromJobId': 'bljobid',
 +     'ResidentSetSize_RAW': 'maxrmem',
 +     'ImageSize_RAW':'maxrswap',
 +     'x509UserProxyFirstFQAN': 'userfqan',
 +     'x509UserProxyVOName': 'queue',
 +     'x509userproxysubject': 'userdn',
 +     'GPUsProvisioned':'gpu'
 +}
 +
 +jdir = cnf['job']['htc_jobdir']
 +
 +def jobfile2dict(fn):
 +    try:
 +        f = open(fn,'r')
 +    except IOError:
 +        return {}
 +    return dict([map(str.strip, x.split('=',1)) for x in f.readlines()])
 +
 +def cleanval(v):
 +    if v is None:
 +        return v
 +    if v and v[0] == v[-1] == '"' :
 +        return v[1:-1]
 +    try:
 +        return int(float(v))
 +    except ValueError,TypeError:
 +        return v
 +
 +getwn = lambda s: s and s.split('@',1)[-1].split('.')[0]
 +
 +#use  os.scandir() iterator on python3
 +F = os.listdir(jdir)
 +if not os.path.isdir(cnf['job']['htc_jobparsed']):
 +    os.mkdir(cnf['job']['htc_jobparsed'])
 +
 +if not os.path.isdir(cnf['job']['htc_jobaccounted']):
 +    os.mkdir(cnf['job']['htc_jobaccounted'])
 +
 +K = d.keys()
 +sq = """INSERT INTO htjob (%s) VALUES (%s)"""%(','.join([d[k] for k in K]),('%s,'*len(d)).rstrip(','))
 +for n,fn in enumerate(F):
 +    if not fn.startswith('history.'): continue
 +    jf = os.path.join(jdir,fn)
 +    jd = jobfile2dict(jf)
 +    dtup = dict([(k,jd.get(k)) for k in d.keys()])
 +    df = {}
 +    for k,v in dtup.items():
 +        df[k] = cleanval(v)
 +    df['fromhost'] = thishn(myfqdn)
 +    df['LastRemoteHost'] = getwn(df.get('LastRemoteHost',''))
 +    df['Cmd'] = jd['Cmd'][-80:]
 +    df['Requirements'] = jd.get('Requirements','')[:255]
 +    for k in ['CumulativeRemoteSysCpu','CumulativeRemoteUserCpu','GPUsProvisioned','JobStartDate','ExitCode']:
 +        df[k] = df[k] or 0
 +    print n,qc.curs.mogrify(sq,tuple([df[k] for k in K]))
 +    qc.curs.execute(sq,tuple([df[k] for k in K]))
 +    os.rename(jf,os.path.join(cnf['job']['htc_jobparsed'],fn))
 +
 +qc.conn.commit()
 +qc.curs.close()
 +qc.conn.close()
 +
 +for fn in os.listdir(cnf['job']['htc_jobparsed']):
 +    os.rename(os.path.join(cnf['job']['htc_jobparsed'],fn),os.path.join(cnf['job']['htc_jobaccounted'],fn))
 +
 +
 +
 +</code>
progetti/htcondor-tf/htcondor-ce_accounting.txt ยท Last modified: 2019/07/10 11:52 by dalpra@infn.it