上涨趋势策略

奔跑吧程序猿

共 8796字,需浏览 18分钟

 ·

2021-04-04 08:45

看了聚宽上大神写的上涨趋势策略,感觉不错。感兴趣的可以转至苦咖啡《上涨趋势策略修改版》,地址为:

https://www.joinquant.com/view/community/detail/15c783ea414ddac3abb8fd55f39df2cc?type=1

随着中美对抗的持续进行,原有很多以美国为出口主要对象的企业必然走向内卷,大企业之间的竞争也将进一步加大,只会强者恒强,进一步走向垄断。A股全面注册制也将到来,小盘股时代终将结束,跟着机构抱团,找大型的,具有垄断特征的股票才是出路。

作者在掘金平台上对该策略进行了实现,对2019到2020年进行了回测,表现不错。

b74c5a136458ac7bbea9b7a948cf9d84.webp

策略源码如下,觉得有帮助的,还望点赞关注!对量化投资感兴趣的小伙伴可以添加如下微信群,一起用技术改变世界,实现财务自由。

掘金实盘可参考掘金量化实盘指导

# coding=utf-8from gm.api import *
# 克隆自聚宽文章:https://www.joinquant.com/post/31232# 标题:2020年 267%年化 13%回撤 上涨趋势策略修改版# 作者:苦咖啡
# 克隆自聚宽文章:https://www.joinquant.com/post/31232# 标题:2020年 267%年化 13%回撤 上涨趋势策略修改版# 作者:苦咖啡
# 克隆自聚宽文章:https://www.joinquant.com/post/30863# 标题:上涨股策略2020年收益123%# 作者:scottchenrui
# 2021-02-06# 张连重 掘金实现
import numpy as npfrom scipy.stats import linregressfrom gm.api import *from datetime import datetimeimport pandas as pdfrom jqdatasdk import *
# jqdata login  需替换为个人聚宽账号auth('15520768082','ZXXXXXX')

# 初始化函数,设定要操作的股票、基准等等def init(context): # 最大建仓数量 context.max_hold_stock_nums = 2 # 选出来的股票 context.target_lists = []
schedule(schedule_func=check_stocks, date_rule='1w', time_rule='09:32:00') schedule(schedule_func=sell, date_rule='1w', time_rule='09:34:00') schedule(schedule_func=buy, date_rule='1w', time_rule='09:35:00')

# 股票筛选def check_stocks(context): today = context.now.strftime("%Y-%m-%d") last_date = get_trade_days(end_date=today, count=2)[0] current = context.now.strftime("%Y-%m-%d %H:%M:%S") last_date = get_trade_days(end_date=today, count=2)[0] # 沪深300成分股 check_out_lists = get_index_stocks("000300.XSHG", date=last_date)
check_out_lists = st_check(context,check_out_lists) check_out_lists = science_check(context,check_out_lists) stocks = filter_stks_listed_n_days(check_out_lists,last_date,100) df = get_price(stocks,end_date=current,frequency='1m',fields=['open','close','high_limit','low_limit','paused'] ,count=1,panel=False) df = df[(df['close'] < df['high_limit']) & (df['close'] > df['low_limit']) & (df['paused'] == 0)] checked_stocks = df['code'].tolist()
# 昨收盘价不高于260元/股 s_close_1 = get_price(checked_stocks,end_date=last_date,frequency='1d',fields=['close'] ,count=1,panel=False) s_close_1.drop(['time'], axis=1,inplace=True) s_close_1.set_index(["code"], inplace=True) check_out_lists = list(s_close_1[s_close_1['close'] <= 260].index)
# 近30个交易日的最高价 / 昨收盘价 <=1.1, 即HHV(HIGH,30)/C[-1] <= 1.1 high_30 = get_price(check_out_lists,end_date=last_date,frequency='1d',fields=['high'] ,count=30,panel=False) high_max_30 = high_30.pivot(index='time',columns='code',values='high').max() dict_high_max_30 = {'code':high_max_30.index,'high':high_max_30.values} df_high_max_30 = pd.DataFrame(dict_high_max_30) df_high_max_30.set_index('code',drop=True,inplace=True) s_fall = df_high_max_30['high'] / s_close_1['close'] check_out_lists = list(s_fall[s_fall <= 1.1].index) # 近7个交易日的交易量均值 与 近180给交易日的成交量均值 相比,放大不超过1.5倍 MA(VOL,7)/MA(VOL,180) <=1.5 volume_180 = get_price(check_out_lists,end_date=last_date,frequency='1d',fields=['volume'] ,count=180,panel=False) volume_mean_7 = volume_180.pivot(index='time',columns='code',values='volume').iloc[-7:].mean() volume_mean_180 = volume_180.pivot(index='time',columns='code',values='volume').mean() s_vol_ratio = volume_mean_7 / volume_mean_180 check_out_lists = list(s_vol_ratio[s_vol_ratio <= 1.5].index)

# 对近120个交易日的股价进行线性回归:入选条件 slope / intercept > 0.005 and r_value**2 > 0.8 target_dict = {} x = np.arange(120) for stock in check_out_lists: y = get_price(stock,end_date=last_date,frequency='1d',fields=['close'] ,count=120,panel=False)['close'] slope, intercept, r_value, p_value, std_err = linregress(x, y) if slope / intercept > 0.005 and r_value ** 2 > 0.8: target_dict[stock] = r_value ** 2
# 入选股票按照R Square 降序排序, 取前N名 context.target_lists = [] if target_dict: df_score = pd.DataFrame.from_dict( target_dict, orient='index', columns=['score', ] ).sort_values( by='score', ascending=False ) # context.target_lists = list(df_score.index[:context.max_hold_stock_nums])
# 去除ST股票def st_check(context,security_list): currentDate = context.now.strftime("%Y-%m-%d") current_data = get_extras('is_st', security_list, end_date=currentDate, df=True, count=1) security_list = [stock for stock in security_list if not current_data[stock][-1]] # 返回结果 return security_list
#def filter_stks_listed_n_days(source_stk_list, end_date, n=100): # type: (list, Union[str, datetime.date, datetime.datetime], int) -> list """ 过滤掉source_stk_list中尚未上市的,或者上市天数不足n天的股票 """ # 1. 取得n个交易日之前的交易日期trd_date trd_date = get_trade_days(end_date=end_date, count=n)[0] # 2. 取trd_date日就已经上市的所有股票 all_stks = get_all_securities(date=trd_date) # 3. 过滤source_stk_list,剔除掉不在all_stks中的 valid_stk_list = list(all_stks[all_stks.index.isin(source_stk_list)].index) return valid_stk_list
# 去除科创板股票def science_check(context,security_list): stock_list = [] for stock in security_list: if not stock.startswith('688'): stock_list.append(stock) # 返回结果 return stock_list



# 交易函数 - 入场def buy(context): buy_lists = context.target_lists if buy_lists: amount = context.account().cash['nav'] / len(buy_lists) for stock in buy_lists: stock_code_jj = transStockCode(stock,'JJ') if stock_code_jj in get_position_list(context): account_position = context.account().position(symbol=stock_code_jj,side = PositionSide_Long) if abs(amount - account_position['amount']) > account_position['price'] * 100 + 5: _order = order_target_value(symbol=stock, value=amount, position_side=PositionSide_Long, order_type=OrderType_Market) if _order is not None: print('调仓: %s (%s)' % (get_security_info(stock).display_name, stock)) for stock in buy_lists: stock_code_jj = transStockCode(stock,'JJ') if stock_code_jj not in get_position_list(context): _order = order_target_value(symbol=stock_code_jj, value=amount, position_side=PositionSide_Long, order_type=OrderType_Market) if _order is not None: print('买入: %s (%s)' % (get_security_info(stock).display_name, stock))

def sell(context): """ 卖出不在buy_lists中的股票 """ buy_lists = context.target_lists for stock in get_position_list(context): stock_code_jk = transStockCode(stock,'JK') if stock_code_jk not in buy_lists: _order = order_target_value(symbol=stock, value=0, position_side=PositionSide_Long, order_type=OrderType_Market) if _order is not None: print('卖出: %s (%s)' % (get_security_info(stock_code_jk).display_name, stock))
## def transStockListCode(stock_list_src,target_type): stock_list_target = [] if target_type=='JJ': for stock in stock_list_src: target = 'SHSE' if stock[7:]=='XSHG' else 'SZSE' stock_list_target.append(target +'.'+ stock[0:6]) elif target_type=='JK': for stock in stock_list_src: target = 'XSHG' if stock[0:4]=='SHSE' else 'XSHE' stock_list_target.append(stock[5:] +'.'+ target) return stock_list_target

## 单股名称转换def transStockCode(stock_src,target_type): if target_type=='JJ': target = 'SHSE' if stock_src[7:]=='XSHG' else 'SZSE' return target +'.'+ stock_src[0:6] elif target_type=='JK': target = 'XSHG' if stock_src[0:4]=='SHSE' else 'XSHE' return stock_src[5:] +'.'+ target
## 获取持仓股票列表def get_position_list(context): in_stock = [] Account_positions = context.account().positions() for position in Account_positions: in_stock.append(position['symbol']) return in_stock
if __name__ == '__main__': ''' strategy_id策略ID, 由系统生成 filename文件名, 请与本文件名保持一致 mode运行模式, 实时模式:MODE_LIVE回测模式:MODE_BACKTEST token绑定计算机的ID, 可在系统设置-密钥管理中生成 backtest_start_time回测开始时间 backtest_end_time回测结束时间 backtest_adjust股票复权方式, 不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST backtest_initial_cash回测初始资金 backtest_commission_ratio回测佣金比例 backtest_slippage_ratio回测滑点比例 ''' run(strategy_id='b056605f-687a-11eb-8a4e-5254009e6375', filename='main.py', mode=MODE_BACKTEST,        token='f8e34a0dacdbbbba2f2cab37b9c01772f80c53', backtest_start_time='2019-01-01 08:00:00', backtest_end_time='2021-02-10 16:00:00', backtest_adjust=ADJUST_PREV, backtest_initial_cash=50000, backtest_commission_ratio=0.0001, backtest_slippage_ratio=0.0001)


浏览 65
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报