##// END OF EJS Templates
Wrapper prototype working, with all kind of Serie...
Wrapper prototype working, with all kind of Serie Signed-off-by: Alexis Jeandet <alexis.jeandet@member.fsf.org>

File last commit:

r1341:f18e017310bc
r1479:704e0f1deb02
Show More
TestAmdaMiniFuzz.py
284 lines | 8.7 KiB | text/x-python | PythonLexer
import sys
import os
import copy
if not hasattr(sys, 'argv'):
sys.argv = ['']
current_script_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(current_script_path)
import amda
import pytestamda
import pysciqlopcore
import sciqlopqt
import datetime
import random
import string
import jsonpickle
import argparse
class Variable:
class Range:
def __init__(self, start, stop):
self.start = start
self.stop = stop
def __init__(self, name, t_start, t_stop):
self.name = name
self.range = Variable.Range(t_start, t_stop)
class OperationCtx:
def __init__(self, prev_ctx=None):
if prev_ctx is None:
self.t_start = datetime.datetime(2012, 10, 20, 8, 10, 00)
self.t_stop = datetime.datetime(2012, 10, 20, 12, 0, 0)
self.variables = dict()
self.sync_group = set()
else:
self.t_start = copy.deepcopy(prev_ctx.t_start)
self.t_stop = copy.deepcopy(prev_ctx.t_stop)
self.variables = copy.deepcopy(prev_ctx.variables)
self.sync_group = copy.deepcopy(prev_ctx.sync_group)
__variables__ = dict()
__variables_in_sync__ = set()
__OPS__ = {}
__WEIGHTS__ = {}
sync_group = sciqlopqt.QUuid()
pytestamda.VariableController.addSynchronizationGroup(sync_group)
def register(weight):
def register_(cls):
__OPS__[cls.__name__] = cls()
__WEIGHTS__[cls.__name__] = weight
return cls
return register_
def random_var(ctx):
if len(ctx.variables):
return random.choice(list(ctx.variables.keys()))
else:
return None
@register(2)
class CreateVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
new_random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(1, 40)))
ctx.variables[new_random_name] = Variable(new_random_name, ctx.t_start, ctx.t_stop)
return {"var_name": new_random_name}
@staticmethod
def do(var_name, *args, **kwargs):
__variables__[var_name] = pytestamda.VariableController.createVariable(var_name,
pytestamda.amda_provider())
@register(1)
class DeleteVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
if var_name:
ctx.variables.pop(var_name)
ctx.sync_group.discard(var_name)
return {"var_name": var_name}
@staticmethod
def do(var_name, *args, **kwargs):
if var_name:
variable = __variables__.pop(var_name)
pytestamda.VariableController.deleteVariable(variable)
class Zoom:
@staticmethod
def _compute_zoom_ranges(factor, variable):
delta = variable.range.stop - variable.range.start
center = variable.range.start + (delta / 2)
new_delta = delta * factor
t_start = center - new_delta / 2
t_stop = center + new_delta / 2
return t_start, t_stop
@staticmethod
def _zoom(factor, var_name, ctx):
var_list = ctx.sync_group if var_name in ctx.sync_group else [var_name]
for var_name in var_list:
variable = ctx.variables[var_name]
t_start, t_stop = Zoom._compute_zoom_ranges(variable=variable, factor=factor)
ctx.variables[var_name].range.start = t_start
ctx.variables[var_name].range.stop = t_stop
@staticmethod
def prepare(ctx, min, max):
var_name = random_var(ctx)
factor = random.uniform(min, max)
if var_name:
Zoom._zoom(factor, var_name, ctx)
return {"var_name": var_name, "factor": factor}
@staticmethod
def do(var_name, factor):
variable = __variables__[var_name]
t_start, t_stop = Zoom._compute_zoom_ranges(variable = variable, factor = factor)
sync = True if var_name in __variables_in_sync__ else False
pytestamda.VariableController.update_range(variable, pytestamda.SqpRange(t_start, t_stop), sync)
@register(10)
class ZoomIn:
@staticmethod
def prepare(ctx, *args, **kwargs):
return Zoom.prepare(ctx, min=1, max=2)
@staticmethod
def do(var_name, factor, *args, **kwargs):
if var_name:
Zoom.do(var_name, factor)
@register(10)
class ZoomOut:
@staticmethod
def prepare(ctx, *args, **kwargs):
return Zoom.prepare(ctx, min=.1, max=1)
@staticmethod
def do(var_name, factor, *args, **kwargs):
if var_name:
Zoom.do(var_name, factor)
@register(10)
class Shift:
@staticmethod
def _compute_range(variable, direction, factor):
delta = variable.range.stop - variable.range.start
offset = delta * factor * direction
return variable.range.start + offset, variable.range.stop + offset
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
direction = random.choice([1, -1])
factor = random.random()
if var_name:
var_list = ctx.sync_group if var_name in ctx.sync_group else [var_name]
for var_name in var_list:
variable = ctx.variables[var_name]
delta = variable.range.stop - variable.range.start
offset = delta * factor * direction
variable.range.start , variable.range.stop = Shift._compute_range(variable, direction, factor)
return {"var_name": var_name, "direction": direction, "factor": factor}
@staticmethod
def do(var_name, direction, factor, *args, **kwargs):
if var_name:
variable = __variables__[var_name]
t_start, t_stop = Shift._compute_range(variable, direction, factor)
sync = True if var_name in __variables_in_sync__ else False
pytestamda.VariableController.update_range(variable, pytestamda.SqpRange(t_start, t_stop), sync)
@register(3)
class WaitForDl:
@staticmethod
def prepare(ctx, *args, **kwargs):
return {}
@staticmethod
def do(*args, **kwargs):
pytestamda.VariableController.wait_for_downloads()
@register(2)
class SyncVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
if var_name:
ctx.sync_group.add(var_name)
return {"var_name": var_name}
@staticmethod
def do(var_name, *args, **kwargs):
if var_name:
pytestamda.VariableController.synchronizeVar(__variables__[var_name], sync_group)
__variables_in_sync__.add(var_name)
@register(2)
class DeSyncVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
if var_name:
ctx.sync_group.discard(var_name)
return {"var_name": var_name}
@staticmethod
def do(var_name, *args, **kwargs):
if var_name:
pytestamda.VariableController.deSynchronizeVar(__variables__[var_name], sync_group)
__variables_in_sync__.discard(var_name)
#parser = argparse.ArgumentParser()
#parser.add_argument("-r", "--reload", help="reload")
#cli_args = parser.parse_args()
def run_scenario(args,ops):
t_start, t_stop = datetime.datetime(2012, 10, 20, 8, 10, 00), datetime.datetime(2012, 10, 20, 12, 0, 0)
pytestamda.TimeController.setTime(pytestamda.SqpRange(t_start, t_stop))
for op_name,arg in zip(ops,args):
operation = __OPS__[op_name]
operation.do(**arg)
def build_scenario(name, steps=100):
context_stack = [OperationCtx()]
ops = [CreateVar.__name__] # start with one variable minimum
ops += random.choices(list(__OPS__.keys()), weights=list(__WEIGHTS__.values()), k=steps)
ops.append(SyncVar.__name__)
for op_name in ops:
ctx = OperationCtx(context_stack[-1])
operation = __OPS__[op_name]
args.append(operation.prepare(ctx))
context_stack.append(ctx)
os.makedirs(name)
js = jsonpickle.encode({"args": args, "context_stack": context_stack, "operations": ops})
with open(name+"/input.json", "w") as file:
file.write(js)
return ops, args, context_stack
def load_scenario(name):
with open(name+"/input.json", "r") as file:
js = file.read()
data = jsonpickle.decode(js)
return data["operations"], data["args"], data["context_stack"]
if __name__ == '__main__':
scenario = ''.join(random.choices(string.ascii_letters + string.digits, k=32))
#ops,args, context_stack = build_scenario(scenario, 300)
#print("Generated scenario {}".format(scenario))
name = "EuI9qFMLZJ4k7vf8seTww8Z4wGBpspd8"
ops, args, context_stack = load_scenario(name)
syncs = [WaitForDl.__name__]*len(ops)
syncs_args = [{}]*len(ops)
ops2 = [op for pair in zip(ops,syncs) for op in pair]
args2 = [arg for pair in zip(args,syncs_args) for arg in pair]
run_scenario(args=args2, ops=ops2)