|
|
import sys
|
|
|
import os
|
|
|
import copy
|
|
|
|
|
|
if not hasattr(sys, 'argv'):
|
|
|
sys.argv = ['']
|
|
|
current_script_path = os.path.dirname(os.path.realpath(__file__))
|
|
|
sys.path.append(current_script_path)
|
|
|
import amda
|
|
|
import pytestamda
|
|
|
import pysciqlopcore
|
|
|
import sciqlopqt
|
|
|
|
|
|
import datetime
|
|
|
import random
|
|
|
import string
|
|
|
import jsonpickle
|
|
|
import argparse
|
|
|
|
|
|
class Variable:
|
|
|
class Range:
|
|
|
def __init__(self, start, stop):
|
|
|
self.start = start
|
|
|
self.stop = stop
|
|
|
|
|
|
def __init__(self, name, t_start, t_stop):
|
|
|
self.name = name
|
|
|
self.range = Variable.Range(t_start, t_stop)
|
|
|
|
|
|
|
|
|
class OperationCtx:
|
|
|
def __init__(self, prev_ctx=None):
|
|
|
if prev_ctx is None:
|
|
|
self.t_start = datetime.datetime(2012, 10, 20, 8, 10, 00)
|
|
|
self.t_stop = datetime.datetime(2012, 10, 20, 12, 0, 0)
|
|
|
self.variables = dict()
|
|
|
self.sync_group = set()
|
|
|
else:
|
|
|
self.t_start = copy.deepcopy(prev_ctx.t_start)
|
|
|
self.t_stop = copy.deepcopy(prev_ctx.t_stop)
|
|
|
self.variables = copy.deepcopy(prev_ctx.variables)
|
|
|
self.sync_group = copy.deepcopy(prev_ctx.sync_group)
|
|
|
|
|
|
|
|
|
__variables__ = dict()
|
|
|
|
|
|
__variables_in_sync__ = set()
|
|
|
|
|
|
__OPS__ = {}
|
|
|
__WEIGHTS__ = {}
|
|
|
|
|
|
sync_group = sciqlopqt.QUuid()
|
|
|
pytestamda.VariableController.addSynchronizationGroup(sync_group)
|
|
|
|
|
|
|
|
|
def register(weight):
|
|
|
def register_(cls):
|
|
|
__OPS__[cls.__name__] = cls()
|
|
|
__WEIGHTS__[cls.__name__] = weight
|
|
|
return cls
|
|
|
|
|
|
return register_
|
|
|
|
|
|
|
|
|
def random_var(ctx):
|
|
|
if len(ctx.variables):
|
|
|
return random.choice(list(ctx.variables.keys()))
|
|
|
else:
|
|
|
return None
|
|
|
|
|
|
|
|
|
@register(2)
|
|
|
class CreateVar:
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
new_random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(1, 40)))
|
|
|
ctx.variables[new_random_name] = Variable(new_random_name, ctx.t_start, ctx.t_stop)
|
|
|
return {"var_name": new_random_name}
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, *args, **kwargs):
|
|
|
__variables__[var_name] = pytestamda.VariableController.createVariable(var_name,
|
|
|
pytestamda.amda_provider())
|
|
|
|
|
|
|
|
|
@register(1)
|
|
|
class DeleteVar:
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
var_name = random_var(ctx)
|
|
|
if var_name:
|
|
|
ctx.variables.pop(var_name)
|
|
|
ctx.sync_group.discard(var_name)
|
|
|
return {"var_name": var_name}
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, *args, **kwargs):
|
|
|
if var_name:
|
|
|
variable = __variables__.pop(var_name)
|
|
|
pytestamda.VariableController.deleteVariable(variable)
|
|
|
|
|
|
|
|
|
class Zoom:
|
|
|
@staticmethod
|
|
|
def _compute_zoom_ranges(factor, variable):
|
|
|
delta = variable.range.stop - variable.range.start
|
|
|
center = variable.range.start + (delta / 2)
|
|
|
new_delta = delta * factor
|
|
|
t_start = center - new_delta / 2
|
|
|
t_stop = center + new_delta / 2
|
|
|
return t_start, t_stop
|
|
|
|
|
|
@staticmethod
|
|
|
def _zoom(factor, var_name, ctx):
|
|
|
var_list = ctx.sync_group if var_name in ctx.sync_group else [var_name]
|
|
|
for var_name in var_list:
|
|
|
variable = ctx.variables[var_name]
|
|
|
t_start, t_stop = Zoom._compute_zoom_ranges(variable=variable, factor=factor)
|
|
|
ctx.variables[var_name].range.start = t_start
|
|
|
ctx.variables[var_name].range.stop = t_stop
|
|
|
|
|
|
@staticmethod
|
|
|
def prepare(ctx, min, max):
|
|
|
var_name = random_var(ctx)
|
|
|
factor = random.uniform(min, max)
|
|
|
if var_name:
|
|
|
Zoom._zoom(factor, var_name, ctx)
|
|
|
return {"var_name": var_name, "factor": factor}
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, factor):
|
|
|
variable = __variables__[var_name]
|
|
|
t_start, t_stop = Zoom._compute_zoom_ranges(variable = variable, factor = factor)
|
|
|
sync = True if var_name in __variables_in_sync__ else False
|
|
|
pytestamda.VariableController.update_range(variable, pytestamda.SqpRange(t_start, t_stop), sync)
|
|
|
|
|
|
|
|
|
@register(10)
|
|
|
class ZoomIn:
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
return Zoom.prepare(ctx, min=1, max=2)
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, factor, *args, **kwargs):
|
|
|
if var_name:
|
|
|
Zoom.do(var_name, factor)
|
|
|
|
|
|
|
|
|
@register(10)
|
|
|
class ZoomOut:
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
return Zoom.prepare(ctx, min=.1, max=1)
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, factor, *args, **kwargs):
|
|
|
if var_name:
|
|
|
Zoom.do(var_name, factor)
|
|
|
|
|
|
|
|
|
@register(10)
|
|
|
class Shift:
|
|
|
@staticmethod
|
|
|
def _compute_range(variable, direction, factor):
|
|
|
delta = variable.range.stop - variable.range.start
|
|
|
offset = delta * factor * direction
|
|
|
return variable.range.start + offset, variable.range.stop + offset
|
|
|
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
var_name = random_var(ctx)
|
|
|
direction = random.choice([1, -1])
|
|
|
factor = random.random()
|
|
|
if var_name:
|
|
|
var_list = ctx.sync_group if var_name in ctx.sync_group else [var_name]
|
|
|
for var_name in var_list:
|
|
|
variable = ctx.variables[var_name]
|
|
|
delta = variable.range.stop - variable.range.start
|
|
|
offset = delta * factor * direction
|
|
|
variable.range.start , variable.range.stop = Shift._compute_range(variable, direction, factor)
|
|
|
return {"var_name": var_name, "direction": direction, "factor": factor}
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, direction, factor, *args, **kwargs):
|
|
|
if var_name:
|
|
|
variable = __variables__[var_name]
|
|
|
t_start, t_stop = Shift._compute_range(variable, direction, factor)
|
|
|
sync = True if var_name in __variables_in_sync__ else False
|
|
|
pytestamda.VariableController.update_range(variable, pytestamda.SqpRange(t_start, t_stop), sync)
|
|
|
|
|
|
|
|
|
@register(3)
|
|
|
class WaitForDl:
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
return {}
|
|
|
|
|
|
@staticmethod
|
|
|
def do(*args, **kwargs):
|
|
|
pytestamda.VariableController.wait_for_downloads()
|
|
|
|
|
|
|
|
|
@register(2)
|
|
|
class SyncVar:
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
var_name = random_var(ctx)
|
|
|
if var_name:
|
|
|
ctx.sync_group.add(var_name)
|
|
|
return {"var_name": var_name}
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, *args, **kwargs):
|
|
|
if var_name:
|
|
|
pytestamda.VariableController.synchronizeVar(__variables__[var_name], sync_group)
|
|
|
__variables_in_sync__.add(var_name)
|
|
|
|
|
|
|
|
|
@register(2)
|
|
|
class DeSyncVar:
|
|
|
@staticmethod
|
|
|
def prepare(ctx, *args, **kwargs):
|
|
|
var_name = random_var(ctx)
|
|
|
if var_name:
|
|
|
ctx.sync_group.discard(var_name)
|
|
|
return {"var_name": var_name}
|
|
|
|
|
|
@staticmethod
|
|
|
def do(var_name, *args, **kwargs):
|
|
|
if var_name:
|
|
|
pytestamda.VariableController.deSynchronizeVar(__variables__[var_name], sync_group)
|
|
|
__variables_in_sync__.discard(var_name)
|
|
|
|
|
|
|
|
|
#parser = argparse.ArgumentParser()
|
|
|
#parser.add_argument("-r", "--reload", help="reload")
|
|
|
#cli_args = parser.parse_args()
|
|
|
|
|
|
|
|
|
def run_scenario(args,ops):
|
|
|
t_start, t_stop = datetime.datetime(2012, 10, 20, 8, 10, 00), datetime.datetime(2012, 10, 20, 12, 0, 0)
|
|
|
pytestamda.TimeController.setTime(pytestamda.SqpRange(t_start, t_stop))
|
|
|
|
|
|
for op_name,arg in zip(ops,args):
|
|
|
operation = __OPS__[op_name]
|
|
|
operation.do(**arg)
|
|
|
|
|
|
|
|
|
def build_scenario(name, steps=100):
|
|
|
context_stack = [OperationCtx()]
|
|
|
ops = [CreateVar.__name__] # start with one variable minimum
|
|
|
ops += random.choices(list(__OPS__.keys()), weights=list(__WEIGHTS__.values()), k=steps)
|
|
|
ops.append(SyncVar.__name__)
|
|
|
for op_name in ops:
|
|
|
ctx = OperationCtx(context_stack[-1])
|
|
|
operation = __OPS__[op_name]
|
|
|
args.append(operation.prepare(ctx))
|
|
|
context_stack.append(ctx)
|
|
|
os.makedirs(name)
|
|
|
js = jsonpickle.encode({"args": args, "context_stack": context_stack, "operations": ops})
|
|
|
with open(name+"/input.json", "w") as file:
|
|
|
file.write(js)
|
|
|
return ops, args, context_stack
|
|
|
|
|
|
|
|
|
def load_scenario(name):
|
|
|
with open(name+"/input.json", "r") as file:
|
|
|
js = file.read()
|
|
|
data = jsonpickle.decode(js)
|
|
|
return data["operations"], data["args"], data["context_stack"]
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
scenario = ''.join(random.choices(string.ascii_letters + string.digits, k=32))
|
|
|
#ops,args, context_stack = build_scenario(scenario, 300)
|
|
|
#print("Generated scenario {}".format(scenario))
|
|
|
name = "EuI9qFMLZJ4k7vf8seTww8Z4wGBpspd8"
|
|
|
ops, args, context_stack = load_scenario(name)
|
|
|
syncs = [WaitForDl.__name__]*len(ops)
|
|
|
syncs_args = [{}]*len(ops)
|
|
|
ops2 = [op for pair in zip(ops,syncs) for op in pair]
|
|
|
args2 = [arg for pair in zip(args,syncs_args) for arg in pair]
|
|
|
run_scenario(args=args2, ops=ops2)
|
|
|
|