##// END OF EJS Templates
Some refactoring on PB11 wrappers...
Some refactoring on PB11 wrappers Most sciqlop core wrappers are moved into a dedicated python module. We needs to get rid off sqpapp! All current sciqlop modules should either be stateless or act as real singletons they must not need any app to be used. This will ease testing, wrapping and usage. Signed-off-by: Alexis Jeandet <alexis.jeandet@member.fsf.org>

File last commit:

r1341:f18e017310bc
r1341:f18e017310bc
Show More
TestAmdaMiniFuzz.py
284 lines | 8.7 KiB | text/x-python | PythonLexer
import sys
import os
import copy
if not hasattr(sys, 'argv'):
sys.argv = ['']
current_script_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(current_script_path)
import amda
import pytestamda
import pysciqlopcore
import sciqlopqt
import datetime
import random
import string
import jsonpickle
import argparse
class Variable:
class Range:
def __init__(self, start, stop):
self.start = start
self.stop = stop
def __init__(self, name, t_start, t_stop):
self.name = name
self.range = Variable.Range(t_start, t_stop)
class OperationCtx:
def __init__(self, prev_ctx=None):
if prev_ctx is None:
self.t_start = datetime.datetime(2012, 10, 20, 8, 10, 00)
self.t_stop = datetime.datetime(2012, 10, 20, 12, 0, 0)
self.variables = dict()
self.sync_group = set()
else:
self.t_start = copy.deepcopy(prev_ctx.t_start)
self.t_stop = copy.deepcopy(prev_ctx.t_stop)
self.variables = copy.deepcopy(prev_ctx.variables)
self.sync_group = copy.deepcopy(prev_ctx.sync_group)
__variables__ = dict()
__variables_in_sync__ = set()
__OPS__ = {}
__WEIGHTS__ = {}
sync_group = sciqlopqt.QUuid()
pytestamda.VariableController.addSynchronizationGroup(sync_group)
def register(weight):
def register_(cls):
__OPS__[cls.__name__] = cls()
__WEIGHTS__[cls.__name__] = weight
return cls
return register_
def random_var(ctx):
if len(ctx.variables):
return random.choice(list(ctx.variables.keys()))
else:
return None
@register(2)
class CreateVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
new_random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(1, 40)))
ctx.variables[new_random_name] = Variable(new_random_name, ctx.t_start, ctx.t_stop)
return {"var_name": new_random_name}
@staticmethod
def do(var_name, *args, **kwargs):
__variables__[var_name] = pytestamda.VariableController.createVariable(var_name,
pytestamda.amda_provider())
@register(1)
class DeleteVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
if var_name:
ctx.variables.pop(var_name)
ctx.sync_group.discard(var_name)
return {"var_name": var_name}
@staticmethod
def do(var_name, *args, **kwargs):
if var_name:
variable = __variables__.pop(var_name)
pytestamda.VariableController.deleteVariable(variable)
class Zoom:
@staticmethod
def _compute_zoom_ranges(factor, variable):
delta = variable.range.stop - variable.range.start
center = variable.range.start + (delta / 2)
new_delta = delta * factor
t_start = center - new_delta / 2
t_stop = center + new_delta / 2
return t_start, t_stop
@staticmethod
def _zoom(factor, var_name, ctx):
var_list = ctx.sync_group if var_name in ctx.sync_group else [var_name]
for var_name in var_list:
variable = ctx.variables[var_name]
t_start, t_stop = Zoom._compute_zoom_ranges(variable=variable, factor=factor)
ctx.variables[var_name].range.start = t_start
ctx.variables[var_name].range.stop = t_stop
@staticmethod
def prepare(ctx, min, max):
var_name = random_var(ctx)
factor = random.uniform(min, max)
if var_name:
Zoom._zoom(factor, var_name, ctx)
return {"var_name": var_name, "factor": factor}
@staticmethod
def do(var_name, factor):
variable = __variables__[var_name]
t_start, t_stop = Zoom._compute_zoom_ranges(variable = variable, factor = factor)
sync = True if var_name in __variables_in_sync__ else False
pytestamda.VariableController.update_range(variable, pytestamda.SqpRange(t_start, t_stop), sync)
@register(10)
class ZoomIn:
@staticmethod
def prepare(ctx, *args, **kwargs):
return Zoom.prepare(ctx, min=1, max=2)
@staticmethod
def do(var_name, factor, *args, **kwargs):
if var_name:
Zoom.do(var_name, factor)
@register(10)
class ZoomOut:
@staticmethod
def prepare(ctx, *args, **kwargs):
return Zoom.prepare(ctx, min=.1, max=1)
@staticmethod
def do(var_name, factor, *args, **kwargs):
if var_name:
Zoom.do(var_name, factor)
@register(10)
class Shift:
@staticmethod
def _compute_range(variable, direction, factor):
delta = variable.range.stop - variable.range.start
offset = delta * factor * direction
return variable.range.start + offset, variable.range.stop + offset
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
direction = random.choice([1, -1])
factor = random.random()
if var_name:
var_list = ctx.sync_group if var_name in ctx.sync_group else [var_name]
for var_name in var_list:
variable = ctx.variables[var_name]
delta = variable.range.stop - variable.range.start
offset = delta * factor * direction
variable.range.start , variable.range.stop = Shift._compute_range(variable, direction, factor)
return {"var_name": var_name, "direction": direction, "factor": factor}
@staticmethod
def do(var_name, direction, factor, *args, **kwargs):
if var_name:
variable = __variables__[var_name]
t_start, t_stop = Shift._compute_range(variable, direction, factor)
sync = True if var_name in __variables_in_sync__ else False
pytestamda.VariableController.update_range(variable, pytestamda.SqpRange(t_start, t_stop), sync)
@register(3)
class WaitForDl:
@staticmethod
def prepare(ctx, *args, **kwargs):
return {}
@staticmethod
def do(*args, **kwargs):
pytestamda.VariableController.wait_for_downloads()
@register(2)
class SyncVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
if var_name:
ctx.sync_group.add(var_name)
return {"var_name": var_name}
@staticmethod
def do(var_name, *args, **kwargs):
if var_name:
pytestamda.VariableController.synchronizeVar(__variables__[var_name], sync_group)
__variables_in_sync__.add(var_name)
@register(2)
class DeSyncVar:
@staticmethod
def prepare(ctx, *args, **kwargs):
var_name = random_var(ctx)
if var_name:
ctx.sync_group.discard(var_name)
return {"var_name": var_name}
@staticmethod
def do(var_name, *args, **kwargs):
if var_name:
pytestamda.VariableController.deSynchronizeVar(__variables__[var_name], sync_group)
__variables_in_sync__.discard(var_name)
#parser = argparse.ArgumentParser()
#parser.add_argument("-r", "--reload", help="reload")
#cli_args = parser.parse_args()
def run_scenario(args,ops):
t_start, t_stop = datetime.datetime(2012, 10, 20, 8, 10, 00), datetime.datetime(2012, 10, 20, 12, 0, 0)
pytestamda.TimeController.setTime(pytestamda.SqpRange(t_start, t_stop))
for op_name,arg in zip(ops,args):
operation = __OPS__[op_name]
operation.do(**arg)
def build_scenario(name, steps=100):
context_stack = [OperationCtx()]
ops = [CreateVar.__name__] # start with one variable minimum
ops += random.choices(list(__OPS__.keys()), weights=list(__WEIGHTS__.values()), k=steps)
ops.append(SyncVar.__name__)
for op_name in ops:
ctx = OperationCtx(context_stack[-1])
operation = __OPS__[op_name]
args.append(operation.prepare(ctx))
context_stack.append(ctx)
os.makedirs(name)
js = jsonpickle.encode({"args": args, "context_stack": context_stack, "operations": ops})
with open(name+"/input.json", "w") as file:
file.write(js)
return ops, args, context_stack
def load_scenario(name):
with open(name+"/input.json", "r") as file:
js = file.read()
data = jsonpickle.decode(js)
return data["operations"], data["args"], data["context_stack"]
if __name__ == '__main__':
scenario = ''.join(random.choices(string.ascii_letters + string.digits, k=32))
#ops,args, context_stack = build_scenario(scenario, 300)
#print("Generated scenario {}".format(scenario))
name = "EuI9qFMLZJ4k7vf8seTww8Z4wGBpspd8"
ops, args, context_stack = load_scenario(name)
syncs = [WaitForDl.__name__]*len(ops)
syncs_args = [{}]*len(ops)
ops2 = [op for pair in zip(ops,syncs) for op in pair]
args2 = [arg for pair in zip(args,syncs_args) for arg in pair]
run_scenario(args=args2, ops=ops2)