
| ''' 优化说明: 1.使用修正标准分 rsrs_score的算法有: 仅斜率slope,效果一般; 仅标准分zscore,效果不错; 修正标准分 = zscore * r2,效果最佳; 右偏标准分 = 修正标准分 * slope,效果不错。 2.将原策略的每次持有两只etf改成只买最优的一个,收益显著提高 3.将每周调仓换成每日调仓,收益显著提高 4.因为交易etf,所以手续费设为万分之三,印花税设为零,未设置滑点 5.修改股票池中候选etf,删除银行,红利等收益较弱品种,增加纳指etf以增加不同国家市场间轮动的可能性 6.根据研报,默认参数介已设定为最优 7.加入防未来函数 8.增加择时与选股模块的打印日志,方便观察每笔操作依据 '''
from jqdata import * import numpy as np
def initialize(context): set_benchmark('000300.XSHG') set_option('use_real_price', True) set_option("avoid_future_data", True) set_slippage(FixedSlippage(0.001)) set_order_cost(OrderCost(open_tax=0, close_tax=0, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5), type='fund') log.set_level('order', 'error') g.stock_pool = [ '159915.XSHE', '510300.XSHG', '510500.XSHG', ] g.stock_num = 1 g.momentum_day = 29 g.ref_stock = '000300.XSHG' g.N = 18 g.M = 600 g.score_threshold = 0.7 g.mean_day = 20 g.mean_diff_day = 3 g.slope_series = initial_slope_series()[:-1] run_daily(my_trade, time='11:30', reference_security='000300.XSHG') run_daily(check_lose, time='open', reference_security='000300.XSHG') run_daily(print_trade_info, time='15:30', reference_security='000300.XSHG')
def get_rank(stock_pool): score_list = [] for stock in g.stock_pool: data = attribute_history(stock, g.momentum_day, '1d', ['close']) y = data['log'] = np.log(data.close) x = data['num'] = np.arange(data.log.size) slope, intercept = np.polyfit(x, y, 1) annualized_returns = math.pow(math.exp(slope), 250) - 1 r_squared = 1 - (sum((y - (slope * x + intercept))**2) / ((len(y) - 1) * np.var(y, ddof=1))) score = annualized_returns * r_squared score_list.append(score) stock_dict=dict(zip(g.stock_pool, score_list)) sort_list=sorted(stock_dict.items(), key=lambda item:item[1], reverse=True) code_list=[] for i in range((len(g.stock_pool))): code_list.append(sort_list[i][0]) rank_stock = code_list[0:g.stock_num] print(code_list[0:5]) return rank_stock
def get_ols(x, y): slope, intercept = np.polyfit(x, y, 1) r2 = 1 - (sum((y - (slope * x + intercept))**2) / ((len(y) - 1) * np.var(y, ddof=1))) return (intercept, slope, r2)
def initial_slope_series(): data = attribute_history(g.ref_stock, g.N + g.M, '1d', ['high', 'low']) return [get_ols(data.low[i:i+g.N], data.high[i:i+g.N])[1] for i in range(g.M)]
def get_zscore(slope_series): mean = np.mean(slope_series) std = np.std(slope_series) return (slope_series[-1] - mean) / std
def get_timing_signal(stock): close_data = attribute_history(g.ref_stock, g.mean_day + g.mean_diff_day, '1d', ['close']) today_MA = close_data.close[g.mean_diff_day:].mean() before_MA = close_data.close[:-g.mean_diff_day].mean() high_low_data = attribute_history(g.ref_stock, g.N, '1d', ['high', 'low']) intercept, slope, r2 = get_ols(high_low_data.low, high_low_data.high) g.slope_series.append(slope) rsrs_score = get_zscore(g.slope_series[-g.M:]) * r2 if rsrs_score > g.score_threshold and today_MA > before_MA: print('BUY') return "BUY" elif rsrs_score < -g.score_threshold and today_MA < before_MA: print('SELL') return "SELL" else: print('KEEP') return "KEEP"
def filter_paused_stock(stock_list): current_data = get_current_data() return [stock for stock in stock_list if not current_data[stock].paused]
def filter_st_stock(stock_list): current_data = get_current_data() return [stock for stock in stock_list if not current_data[stock].is_st and 'ST' not in current_data[stock].name and '*' not in current_data[stock].name and '退' not in current_data[stock].name]
def filter_limitup_stock(context, stock_list): last_prices = history(1, unit='1m', field='close', security_list=stock_list) current_data = get_current_data() return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or last_prices[stock][-1] < current_data[stock].high_limit]
def filter_limitdown_stock(context, stock_list): last_prices = history(1, unit='1m', field='close', security_list=stock_list) current_data = get_current_data() return [stock for stock in stock_list if stock in context.portfolio.positions.keys() or last_prices[stock][-1] > current_data[stock].low_limit]
def order_target_value_(security, value): if value == 0: log.debug("Selling out %s" % (security)) else: log.debug("Order %s to value %f" % (security, value)) return order_target_value(security, value)
def open_position(security, value): order = order_target_value_(security, value) if order != None and order.filled > 0: return True return False
def close_position(position): security = position.security order = order_target_value_(security, 0) if order != None: if order.status == OrderStatus.held and order.filled == order.amount: return True return False
def adjust_position(context, buy_stocks): for stock in context.portfolio.positions: if stock not in buy_stocks: log.info("[%s]已不在应买入列表中" % (stock)) position = context.portfolio.positions[stock] close_position(position) else: log.info("[%s]已经持有无需重复买入" % (stock)) position_count = len(context.portfolio.positions) if g.stock_num > position_count: value = context.portfolio.cash / (g.stock_num - position_count) for stock in buy_stocks: if context.portfolio.positions[stock].total_amount == 0: if open_position(stock, value): if len(context.portfolio.positions) == g.stock_num: break
def my_trade(context): check_out_list = get_rank(g.stock_pool) check_out_list = filter_st_stock(check_out_list) check_out_list = filter_limitup_stock(context, check_out_list) check_out_list = filter_limitdown_stock(context, check_out_list) check_out_list = filter_paused_stock(check_out_list) print('今日自选股:{}'.format(check_out_list)) timing_signal = get_timing_signal(g.ref_stock) print('今日择时信号:{}'.format(timing_signal)) if timing_signal == 'SELL': for stock in context.portfolio.positions: position = context.portfolio.positions[stock] close_position(position) elif timing_signal == 'BUY' or timing_signal == 'KEEP': adjust_position(context, check_out_list) else: pass
def check_lose(context): for position in list(context.portfolio.positions.values()): securities=position.security cost=position.avg_cost price=position.price ret=100*(price/cost-1) value=position.value amount=position.total_amount if ret <=-20: order_target_value(position.security, 0) print("!!!!!!触发止损信号: 标的={},标的价值={},浮动盈亏={}% !!!!!!" .format(securities,format(value,'.2f'),format(ret,'.2f')))
def print_trade_info(context): trades = get_trades() for _trade in trades.values(): print('成交记录:'+str(_trade)) for position in list(context.portfolio.positions.values()): securities=position.security cost=position.avg_cost price=position.price ret=100*(price/cost-1) value=position.value amount=position.total_amount print('代码:{}'.format(securities)) print('成本价:{}'.format(format(cost,'.2f'))) print('现价:{}'.format(price)) print('收益率:{}%'.format(format(ret,'.2f'))) print('持仓(股):{}'.format(amount)) print('市值:{}'.format(format(value,'.2f'))) print('一天结束') print('———————————————————————————————————————分割线————————————————————————————————————————')
|