# -*- erecord_package: utf-8 -*-
"""erecord_cmn.utils.erecord_package
Methods about erecord package
"""
import os
import json
import shutil
from erecord_cmn.utils.vle import get_rep_pkg_data_path
from erecord_cmn.utils.vle import get_rep_pkg_path
from erecord_cmn.utils.vle import get_rep_pkg_exp_path
from erecord_cmn.utils.vle import vle_bash_cmd
from erecord_cmn.utils.process import call_subprocess
#from erecord_cmn.utils.coding import byteify
###############################################################################
[docs]def get_value_type(json_value_type):
""" Conversion of a value type coming from erecord pkg (json format)
Note : json_value_type values are :
- vle::value::Value::type values
- "" for unknown vle::value::Value::type value
or for vle::value::Value::USER that is treated as unknown
"""
if json_value_type == "BOOLEAN":
return "boolean"
elif json_value_type == "INTEGER":
return "integer"
elif json_value_type == "DOUBLE":
return "double"
elif json_value_type == "STRING":
return "string"
elif json_value_type == "XMLTYPE":
return "xml"
elif json_value_type == "SET":
return "set"
elif json_value_type == "MAP":
return "map"
elif json_value_type == "TUPLE":
return "tuple"
elif json_value_type == "TABLE":
return "table"
elif json_value_type == "MATRIX":
return "matrix"
elif json_value_type == "NIL":
return "none"
else : # "" and else
return ""
###############################################################################
stderr_filename = "stderr.txt"
stdout_filename = "stdout.txt"
ok_filename = "OK"
[docs]def control_in(vlehome_path, vle_version, pkgname, vpzname) :
"""Makes controls and returns a message about errors met
Controls :
- path about pkgname is a directory
- path about vpzname is a file
Note : the returned message is "" if no error met.
"""
pkgname_path = get_rep_pkg_path(rep_path=vlehome_path,
vle_version=vle_version, pkgname=pkgname)
pkg_exp_path = get_rep_pkg_exp_path(rep_path=vlehome_path,
vle_version=vle_version, pkgname=pkgname)
vpzname_path = os.path.join(pkg_exp_path, vpzname)
errormsg = ""
if not os.path.isdir(pkgname_path) :
msg = pkgname + " is not a directory"
if errormsg != "" :
errormsg = errormsg + " -- "
errormsg = errormsg + msg
if not os.path.isfile(vpzname_path) :
msg = vpzname + " is not a file"
if errormsg != "" :
errormsg = errormsg + " -- "
errormsg = errormsg + msg
return errormsg
[docs]def copy_erecord_data_file(vlehome_path, vle_version, src_folder, file_name):
"""Copies file_name file from src_folder to data folder of erecord pkg"""
erecord_data_path = get_rep_pkg_data_path(rep_path = vlehome_path,
vle_version = vle_version,
pkgname = "erecord")
shutil.copyfile(src = os.path.join(src_folder, file_name),
dst = os.path.join(erecord_data_path, file_name))
[docs]def create_and_fill_jsonfile(content, folder_path, file_name) :
jsonfile_tmp_path = os.path.join(folder_path, file_name)
f = open(jsonfile_tmp_path, 'w')
f.write(json.dumps(content)) ###f.write(json.dumps(byteify(content)))
f.close()
[docs]def run_simulation(runvle_path, vlehome_path, vle_usr_path, pkgname, vpzname) :
cmd = "cd " + runvle_path + ";"
cmd = cmd + vle_bash_cmd(vle_usr_path)
cmd = cmd + "export VLE_HOME=" + vlehome_path + ";"
cmd = cmd + "vle -P " + pkgname + " " + vpzname + ";"
#envir = os.environ
stderr_path = os.path.join(runvle_path, stderr_filename)
stdout_path = os.path.join(runvle_path, stdout_filename)
retcode = call_subprocess(cmd, runvle_path, stderr_path, stdout_path)
#call_subprocess(cmd, runvle_path, stderr_path, stdout_path, env=envir)
(OK_found, msg_stdout, msg_stderr) = control_out(runvle_path)
cr_ok = OK_found
# Careful : stderr is not empty when OK in case vle-2.0.0
if msg_stderr != "" :
filtered_stderr = ""
msg = msg_stderr.split("\n")
for line in msg :
if line is '' :
pass
elif "debug:" in line :
pass
#elif "warning" in line or "Warning" in line or "WARNING" in line :
# pass
else :
filtered_stderr = filtered_stderr + line
msg_stderr = filtered_stderr
if msg_stderr != "" :
cr_ok = False
# msg_stdout is not studied
errormsg = ""
if not OK_found :
errormsg = errormsg + " -- OK signal absent -- "
errormsg = errormsg + msg_stderr
return (cr_ok, errormsg)
[docs]def get_file_jsoncontent(folder_path, file_name) :
output_file = os.path.join(folder_path, file_name)
f = open(output_file, "r")
res = f.read()
f.close()
r = json.loads(res)
return r
[docs]def control_out(runvle_path) :
"""Makes controls and returns reports
Controls :
- stderr_filename under runvle_path
- stdout_filename under runvle_path
- ok_filename under runvle_path
"""
stderr_path = os.path.join(runvle_path, stderr_filename)
stdout_path = os.path.join(runvle_path, stdout_filename)
ok_path = os.path.join(runvle_path, ok_filename)
msg_stderr = ""
msg_stdout = ""
OK_found = False
stderr_file = open(stderr_path)
msg_stderr = stderr_file.read()
stderr_file.close()
stdout_file = open(stdout_path)
msg_stdout = stdout_file.read()
stdout_file.close()
if os.path.exists(ok_path) :
if os.path.isfile(ok_path) :
OK_found = True
return (OK_found, msg_stdout, msg_stderr)
###############################################################################
[docs]def vle_modify_vpz(runvle_path, vlehome_path, vle_version, vle_usr_path,
pkgname, vpzname,
storaged_list, conditions, begin, duration):
"""Prepares and runs modify_vpz.vpz simulation of erecord pkg
As a result : the vpzname file simulator of the pkgname package (under
vlehome_path) has been modified and is ready to be simulated or read.
Attention : give a pkgname, vpzname that is not a link but a copy
"""
def update(modify_parameters,
storaged_list, conditions, begin, duration) :
"""Updates according to storaged_list, conditions, begin, duration
The value of a parameter of a condition is supposed to always be
given as multiple, if its type is 'list'. The case where a parameter
value of 'list' type would be given as a singleton is not treated.
"""
if begin is not None :
modify_parameters["begin"] = begin
if duration is not None :
modify_parameters["duration"] = duration
modify_parameters["conditions"] = conditions
views = dict()
at_least_one = False
if storaged_list is not None :
if (len(storaged_list) > 0) :
at_least_one = True
views["storaged_list"] = list()
for v in storaged_list :
views["storaged_list"].append(v)
if at_least_one :
modify_parameters["views"] = views
def control_in(vlehome_path, vle_version, pkgname, vpzname) :
"""Makes controls and returns a message about errors met
Controls :
- path about pkgname is a directory, is not a link
- path about vpzname is a file, is not a link
Note : the returned message is "" if no error met.
"""
pkgname_path = get_rep_pkg_path(rep_path=vlehome_path,
vle_version=vle_version, pkgname=pkgname)
pkg_exp_path = get_rep_pkg_exp_path(rep_path=vlehome_path,
vle_version=vle_version, pkgname=pkgname)
vpzname_path = os.path.join(pkg_exp_path, vpzname)
errormsg = ""
if os.path.isdir(pkgname_path) :
if os.path.islink(pkgname_path) :
msg = "directory dedicated to " + pkgname + " package"
msg = msg + " is of link type (links refused)"
if errormsg != "" :
errormsg = errormsg + " -- "
errormsg = errormsg + msg
else :
msg = pkgname + " is not a directory"
if errormsg != "" :
errormsg = errormsg + " -- "
errormsg = errormsg + msg
if os.path.isfile(vpzname_path) :
if os.path.islink(vpzname_path) :
msg = vpzname + " file is of link type (links refused)"
if errormsg != "" :
errormsg = errormsg + " -- "
errormsg = errormsg + msg
else :
msg = vpzname + " is not a file"
if errormsg != "" :
errormsg = errormsg + " -- "
errormsg = errormsg + msg
return errormsg
errormsg = control_in(vlehome_path=vlehome_path, vle_version=vle_version,
pkgname=pkgname, vpzname=vpzname)
if errormsg != "" :
raise Exception(errormsg)
# Build modify_input.json input file
modify_parameters = dict()
modify_parameters["pkgname"] = pkgname
modify_parameters["vpzname"] = vpzname
update(modify_parameters, storaged_list, conditions, begin, duration)
create_and_fill_jsonfile(content=modify_parameters,
folder_path=runvle_path,
file_name="modify_input.json")
# Install/copy modify_input.json into erecord/data
copy_erecord_data_file(vlehome_path=vlehome_path, vle_version=vle_version,
src_folder=runvle_path,
file_name="modify_input.json")
# Run modify_vpz.vpz simulation
(cr_ok, errormsg) = run_simulation(runvle_path=runvle_path,
vlehome_path=vlehome_path,
vle_usr_path=vle_usr_path,
pkgname="erecord",
vpzname="modify_vpz.vpz")
if not cr_ok :
raise Exception(" vle run_simulation -- " + errormsg)
###############################################################################
[docs]def vle_read_vpz(runvle_path, vlehome_path, vle_version, vle_usr_path,
pkgname, vpzname):
"""Prepares and runs read_vpz.vpz simulation of erecord pkg
This simulation produces (under runvle_path) the read_output.json json file
that contains the input information of vpzname simulator.
Returns the input information of vpzname as json data
"""
errormsg = control_in(vlehome_path=vlehome_path, vle_version=vle_version,
pkgname=pkgname, vpzname=vpzname)
if errormsg != "" :
raise Exception(errormsg)
# Build read_input.json input file
read_parameters = dict()
read_parameters["pkgname"] = pkgname
read_parameters["vpzname"] = vpzname
create_and_fill_jsonfile(content=read_parameters,
folder_path=runvle_path,
file_name="read_input.json")
# Install/copy read_input.json into erecord/data
copy_erecord_data_file(vlehome_path=vlehome_path, vle_version=vle_version,
src_folder=runvle_path, file_name="read_input.json")
# Run read_vpz.vpz simulation
(cr_ok, errormsg) = run_simulation(runvle_path=runvle_path,
vlehome_path=vlehome_path,
vle_usr_path=vle_usr_path,
pkgname="erecord",
vpzname="read_vpz.vpz")
if not cr_ok :
raise Exception(" vle run_simulation -- " + errormsg)
# Exploitation of read_output.json file result
r = get_file_jsoncontent(folder_path=runvle_path,
file_name="read_output.json")
return r
###############################################################################
[docs]def vle_run_vpz(runvle_path, vlehome_path, vle_version, vle_usr_path,
pkgname, vpzname,
plan, restype):
"""Prepares and runs run_vpz.vpz simulation of erecord pkg
This simulation produces (under runvle_path) the run_output.json json file
that contains the results of vpzname simulation.
Returns results of vpzname simulation as json data
"""
errormsg = control_in(vlehome_path=vlehome_path, vle_version=vle_version,
pkgname=pkgname, vpzname=vpzname)
if errormsg != "" :
raise Exception(errormsg)
# Build run_input.json input file
run_parameters = dict()
run_parameters["pkgname"] = pkgname
run_parameters["vpzname"] = vpzname
run_parameters["plan"] = plan
run_parameters["restype"] = restype
create_and_fill_jsonfile(content=run_parameters,
folder_path=runvle_path,
file_name="run_input.json")
# Install/copy run_input.json into erecord/data
copy_erecord_data_file(vlehome_path=vlehome_path, vle_version=vle_version,
src_folder=runvle_path, file_name="run_input.json")
# Run run_vpz.vpz simulation
(cr_ok, errormsg) = run_simulation(runvle_path=runvle_path,
vlehome_path=vlehome_path,
vle_usr_path=vle_usr_path,
pkgname="erecord",
vpzname="run_vpz.vpz")
if not cr_ok :
raise Exception(" vle run_simulation -- " + errormsg)
# Exploitation of run_output.json file result
r = get_file_jsoncontent(folder_path=runvle_path,
file_name="run_output.json")
return r
###############################################################################
#
# prints
#
[docs]def print_vle_read_vpz_result(res):
expected_keys = ["experiment_name", "begin", "duration",
"cond_list", "view_list", "names"]
keys = res.keys()
for key in keys :
if key not in expected_keys :
print "unexpected key : ", key
print " (content : ", res[key], ")"
key = "experiment_name"
if key in keys :
print key, " : ", res[key]
key = "begin"
if key in keys :
print key, ": ", res[key]
begin = res[key]
print " begin value : ", begin["value"]
print " begin value type: ", type(begin["value"])
key = "duration"
if key in keys :
print key, ": ", res[key]
duration = res[key]
print " duration value : ", duration["value"]
print " duration value type: ", type(duration["value"])
key = "cond_list"
if key in keys :
#print key, " : ", res[key]
conditions = res[key]
print "Conditions"
for cname in conditions.keys() :
cond = conditions[cname]
par_list = cond["par_list"]
print " - ", cname, " : "
for pname in par_list.keys() :
par = par_list[pname]
print " - ", pname, " : ", par
key = "view_list"
if key in keys :
print key, " : ", res[key]
views = res[key]
print "Views"
for vname in views.keys() :
view = views[vname]
out_list = view["out_list"]
print " - ", vname, " : "
for oname in out_list :
print " - ", oname
key = "names"
if key in keys :
names = res[key]
print "Names :"
if "cond_names" in names.keys() :
print " - cond names :"
for name in names["cond_names"] :
print " - ", name
if "par_names" in names.keys() :
print " - par names :"
for name in names["par_names"] :
print " - ", name
if "view_names" in names.keys() :
print " - view names :"
for name in names["view_names"] :
print " - ", name
if "out_names" in names.keys() :
print " - out names :"
for name in names["out_names"] :
print " - ", name
[docs]def print_vle_run_vpz_result(res):
expected_keys = ["plan", "restype", "res"]
keys = res.keys()
for key in keys :
if key not in expected_keys :
print "unexpected key : ", key
print " (content : ", res[key], ")"
key = "plan"
if key in keys :
print key, " : ", res[key]
key = "restype"
if key in keys :
print key, " : ", res[key]
key = "res"
if key in keys :
print key, ": ", res[key]