forked from QuantFans/quantdigger
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqd.py
More file actions
68 lines (52 loc) · 1.57 KB
/
qd.py
File metadata and controls
68 lines (52 loc) · 1.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# -*- coding: utf-8 -*-
from quantdigger.engine.execute_unit import ExecuteUnit
# 系统角色
_simulator = None
def set_symbols(pcontracts,
dt_start="1980-1-1",
dt_end="2100-1-1",
n=None,
spec_date={}): # 'symbol':[,]
"""
Args:
pcontracts (list): list of pcontracts(string)
dt_start (datetime/str): start time of all pcontracts
dt_end (datetime/str): end time of all pcontracts
n (int): last n bars
spec_date (dict): time range for specific pcontracts
"""
global _simulator
_simulator = ExecuteUnit(pcontracts, dt_start, dt_end, n, spec_date)
return _simulator
def add_strategy(algos, settings={}):
""" 一个组合中的策略
Args:
algos (list): 一个策略组合
Returns:
Profile. 组合结果
"""
rst = _simulator.add_comb(algos, settings)
# @TODO 为什么去掉测试test_engine_vector.py无法通过
settings.clear()
return rst
def run():
_simulator.run()
class Strategy(object):
""" 策略基类"""
def __init__(self, name):
self.name = name
def on_init(self, ctx):
"""初始化数据"""
return
def on_symbol(self, ctx):
""" 逐合约逐根k线运行 """
return
def on_bar(self, ctx):
""" 逐根k线运行 """
# 停在最后一个合约处
return
def on_exit(self, ctx):
""" 策略结束前运行 """
# 停在最后一根bar
return
__all__ = ['set_symbols', 'add_strategy', 'run', 'Strategy']