上涨趋势策略
看了聚宽上大神写的上涨趋势策略,感觉不错。感兴趣的可以转至苦咖啡《上涨趋势策略修改版》,地址为:
https://www.joinquant.com/view/community/detail/15c783ea414ddac3abb8fd55f39df2cc?type=1
随着中美对抗的持续进行,原有很多以美国为出口主要对象的企业必然走向内卷,大企业之间的竞争也将进一步加大,只会强者恒强,进一步走向垄断。A股全面注册制也将到来,小盘股时代终将结束,跟着机构抱团,找大型的,具有垄断特征的股票才是出路。
作者在掘金平台上对该策略进行了实现,对2019到2020年进行了回测,表现不错。
策略源码如下,觉得有帮助的,还望点赞关注!对量化投资感兴趣的小伙伴可以添加如下微信群,一起用技术改变世界,实现财务自由。
掘金实盘可参考掘金量化实盘指导
# coding=utf-8
from gm.api import *
# 克隆自聚宽文章:https://www.joinquant.com/post/31232
# 标题:2020年 267%年化 13%回撤 上涨趋势策略修改版
# 作者:苦咖啡
# 克隆自聚宽文章:https://www.joinquant.com/post/31232
# 标题:2020年 267%年化 13%回撤 上涨趋势策略修改版
# 作者:苦咖啡
# 克隆自聚宽文章:https://www.joinquant.com/post/30863
# 标题:上涨股策略2020年收益123%
# 作者:scottchenrui
# 2021-02-06
# 张连重 掘金实现
import numpy as np
from scipy.stats import linregress
from gm.api import *
from datetime import datetime
import pandas as pd
from jqdatasdk import *
# jqdata login 需替换为个人聚宽账号
auth('15520768082','ZXXXXXX')
# 初始化函数,设定要操作的股票、基准等等
def init(context):
# 最大建仓数量
context.max_hold_stock_nums = 2
# 选出来的股票
context.target_lists = []
schedule(schedule_func=check_stocks, date_rule='1w', time_rule='09:32:00')
schedule(schedule_func=sell, date_rule='1w', time_rule='09:34:00')
schedule(schedule_func=buy, date_rule='1w', time_rule='09:35:00')
# 股票筛选
def check_stocks(context):
today = context.now.strftime("%Y-%m-%d")
last_date = get_trade_days(end_date=today, count=2)[0]
current = context.now.strftime("%Y-%m-%d %H:%M:%S")
last_date = get_trade_days(end_date=today, count=2)[0]
# 沪深300成分股
check_out_lists = get_index_stocks("000300.XSHG", date=last_date)
check_out_lists = st_check(context,check_out_lists)
check_out_lists = science_check(context,check_out_lists)
stocks = filter_stks_listed_n_days(check_out_lists,last_date,100)
df = get_price(stocks,end_date=current,frequency='1m',fields=['open','close','high_limit','low_limit','paused'] ,count=1,panel=False)
df = df[(df['close'] < df['high_limit']) & (df['close'] > df['low_limit']) & (df['paused'] == 0)]
checked_stocks = df['code'].tolist()
# 昨收盘价不高于260元/股
s_close_1 = get_price(checked_stocks,end_date=last_date,frequency='1d',fields=['close'] ,count=1,panel=False)
s_close_1.drop(['time'], axis=1,inplace=True)
s_close_1.set_index(["code"], inplace=True)
check_out_lists = list(s_close_1[s_close_1['close'] <= 260].index)
# 近30个交易日的最高价 / 昨收盘价 <=1.1, 即HHV(HIGH,30)/C[-1] <= 1.1
high_30 = get_price(check_out_lists,end_date=last_date,frequency='1d',fields=['high'] ,count=30,panel=False)
high_max_30 = high_30.pivot(index='time',columns='code',values='high').max()
dict_high_max_30 = {'code':high_max_30.index,'high':high_max_30.values}
df_high_max_30 = pd.DataFrame(dict_high_max_30)
df_high_max_30.set_index('code',drop=True,inplace=True)
s_fall = df_high_max_30['high'] / s_close_1['close']
check_out_lists = list(s_fall[s_fall <= 1.1].index)
# 近7个交易日的交易量均值 与 近180给交易日的成交量均值 相比,放大不超过1.5倍 MA(VOL,7)/MA(VOL,180) <=1.5
volume_180 = get_price(check_out_lists,end_date=last_date,frequency='1d',fields=['volume'] ,count=180,panel=False)
volume_mean_7 = volume_180.pivot(index='time',columns='code',values='volume').iloc[-7:].mean()
volume_mean_180 = volume_180.pivot(index='time',columns='code',values='volume').mean()
s_vol_ratio = volume_mean_7 / volume_mean_180
check_out_lists = list(s_vol_ratio[s_vol_ratio <= 1.5].index)
# 对近120个交易日的股价进行线性回归:入选条件 slope / intercept > 0.005 and r_value**2 > 0.8
target_dict = {}
x = np.arange(120)
for stock in check_out_lists:
y = get_price(stock,end_date=last_date,frequency='1d',fields=['close'] ,count=120,panel=False)['close']
slope, intercept, r_value, p_value, std_err = linregress(x, y)
if slope / intercept > 0.005 and r_value ** 2 > 0.8:
target_dict[stock] = r_value ** 2
# 入选股票按照R Square 降序排序, 取前N名
context.target_lists = []
if target_dict:
df_score = pd.DataFrame.from_dict(
target_dict, orient='index', columns=['score', ]
).sort_values(
by='score', ascending=False
)
#
context.target_lists = list(df_score.index[:context.max_hold_stock_nums])
# 去除ST股票
def st_check(context,security_list):
currentDate = context.now.strftime("%Y-%m-%d")
current_data = get_extras('is_st', security_list, end_date=currentDate, df=True, count=1)
security_list = [stock for stock in security_list if not current_data[stock][-1]]
# 返回结果
return security_list
#
def filter_stks_listed_n_days(source_stk_list, end_date, n=100):
# type: (list, Union[str, datetime.date, datetime.datetime], int) -> list
"""
过滤掉source_stk_list中尚未上市的,或者上市天数不足n天的股票
"""
# 1. 取得n个交易日之前的交易日期trd_date
trd_date = get_trade_days(end_date=end_date, count=n)[0]
# 2. 取trd_date日就已经上市的所有股票
all_stks = get_all_securities(date=trd_date)
# 3. 过滤source_stk_list,剔除掉不在all_stks中的
valid_stk_list = list(all_stks[all_stks.index.isin(source_stk_list)].index)
return valid_stk_list
# 去除科创板股票
def science_check(context,security_list):
stock_list = []
for stock in security_list:
if not stock.startswith('688'):
stock_list.append(stock)
# 返回结果
return stock_list
# 交易函数 - 入场
def buy(context):
buy_lists = context.target_lists
if buy_lists:
amount = context.account().cash['nav'] / len(buy_lists)
for stock in buy_lists:
stock_code_jj = transStockCode(stock,'JJ')
if stock_code_jj in get_position_list(context):
account_position = context.account().position(symbol=stock_code_jj,side = PositionSide_Long)
if abs(amount - account_position['amount']) > account_position['price'] * 100 + 5:
_order = order_target_value(symbol=stock, value=amount, position_side=PositionSide_Long, order_type=OrderType_Market)
if _order is not None:
print('调仓: %s (%s)' % (get_security_info(stock).display_name, stock))
for stock in buy_lists:
stock_code_jj = transStockCode(stock,'JJ')
if stock_code_jj not in get_position_list(context):
_order = order_target_value(symbol=stock_code_jj, value=amount, position_side=PositionSide_Long, order_type=OrderType_Market)
if _order is not None:
print('买入: %s (%s)' % (get_security_info(stock).display_name, stock))
def sell(context):
"""
卖出不在buy_lists中的股票
"""
buy_lists = context.target_lists
for stock in get_position_list(context):
stock_code_jk = transStockCode(stock,'JK')
if stock_code_jk not in buy_lists:
_order = order_target_value(symbol=stock, value=0, position_side=PositionSide_Long, order_type=OrderType_Market)
if _order is not None:
print('卖出: %s (%s)' % (get_security_info(stock_code_jk).display_name, stock))
##
def transStockListCode(stock_list_src,target_type):
stock_list_target = []
if target_type=='JJ':
for stock in stock_list_src:
target = 'SHSE' if stock[7:]=='XSHG' else 'SZSE'
stock_list_target.append(target +'.'+ stock[0:6])
elif target_type=='JK':
for stock in stock_list_src:
target = 'XSHG' if stock[0:4]=='SHSE' else 'XSHE'
stock_list_target.append(stock[5:] +'.'+ target)
return stock_list_target
## 单股名称转换
def transStockCode(stock_src,target_type):
if target_type=='JJ':
target = 'SHSE' if stock_src[7:]=='XSHG' else 'SZSE'
return target +'.'+ stock_src[0:6]
elif target_type=='JK':
target = 'XSHG' if stock_src[0:4]=='SHSE' else 'XSHE'
return stock_src[5:] +'.'+ target
## 获取持仓股票列表
def get_position_list(context):
in_stock = []
Account_positions = context.account().positions()
for position in Account_positions:
in_stock.append(position['symbol'])
return in_stock
if __name__ == '__main__':
'''
strategy_id策略ID, 由系统生成
filename文件名, 请与本文件名保持一致
mode运行模式, 实时模式:MODE_LIVE回测模式:MODE_BACKTEST
token绑定计算机的ID, 可在系统设置-密钥管理中生成
backtest_start_time回测开始时间
backtest_end_time回测结束时间
backtest_adjust股票复权方式, 不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST
backtest_initial_cash回测初始资金
backtest_commission_ratio回测佣金比例
backtest_slippage_ratio回测滑点比例
'''
run(strategy_id='b056605f-687a-11eb-8a4e-5254009e6375',
filename='main.py',
mode=MODE_BACKTEST,
token='f8e34a0dacdbbbba2f2cab37b9c01772f80c53',
backtest_start_time='2019-01-01 08:00:00',
backtest_end_time='2021-02-10 16:00:00',
backtest_adjust=ADJUST_PREV,
backtest_initial_cash=50000,
backtest_commission_ratio=0.0001,
backtest_slippage_ratio=0.0001)
评论