股票下载 选股策略 双均线 输出结果收益
整 了大把个月,终于把代码运行起来了,把代码做成了一个类函数
import sqlite3
import pandas as pd
import numpy as np
import requests
import re
import os
import time
import datetime
import tushare as ts
from sqlalchemy import create_engine
import psycopg2
class Jeoj:def __init__(self):try:os.chdir(r'E:\study_python\DB')#数据存储的数径ts.set_token('*****************')#用户token---没有的去tushare注册就有了self.pro = ts.pro_api()#实例化self.DB_name='postgres'self.Table_Name='postgres'#self.Table_Name='KANG'self.engine = create_engine(f'postgresql+psycopg2://postgres:SY639000@localhost:5432/postgres')self.Stock_Basic=self.pro.stock_basic(exchange='',list_status='L',feilds='ts_code,symbol,name,area,insdustry,list_date')#交易日的股票信息#self.conn = self.engine.raw_connection()#self.cursor=self.conn.cursor()print("数据库连接成功")except Exception as e:print(f"数据库连接失败:{e}")def wrapFun(func):def inner(self,*args,**kwargs):try:print(f"本次执行的命令为:【{func.__name__}】;表名为:【{self.Table_Name}】")self.conn = self.engine.raw_connection()self.cursor=self.conn.cursor()ru=func(self,*args,**kwargs)#self.conn.commit()self.cursor.close()self.conn.close()print(f'【{func.__name__}】命令执行成功')return ruexcept Exception as e:print(f"【{func.__name__}】命令执行失败:{e}")self.cursor.close()self.conn.close()return inner @wrapFundef Create_Table(self): # 创建表的SQL语句,默认编码为UTF-8SQL = f'''CREATE TABLE IF NOT EXISTS {self.Table_Name} (ts_code VARCHAR(20),trade_date DATE,open DECIMAL,high DECIMAL,low DECIMAL,close DECIMAL,pre_close DECIMAL,change DECIMAL,pct_chg DECIMAL,vol DECIMAL,amount DECIMAL);'''self.cursor.execute(SQL)@wrapFundef get_count(self):#检查数据库中表是否为空cursor=self.cursorcursor.execute(f"select count(*) from {self.Table_Name} ")Nb=cursor.fetchall()print(f'the count number of {self.Table_Name}is ',Nb)return Nb[0][0]@wrapFundef db_time(self):#找出数据库最近的时间,并增加一天cursor=self.cursorsql_max=f'select max("trade_date") from {self.Table_Name}'cursor.execute(sql_max)da=cursor.fetchall()#print('da is ',da[0])if da[0]!=(None,):ds=datetime.datetime.strptime(str(da[0][0]),'%Y%m%d')+datetime.timedelta(days=1)ddt=ds.strftime('%Y%m%d') else:ddt='19820101'return ddt@wrapFundef GetTables(self):#来检查数据库中是否有这个表cursor=self.cursorcursor.execute("select * from pg_tables where schemaname = 'public'")Tb=cursor.fetchall()if Tb:return True else: return False@wrapFun def data_down_to_DB(self): cursor = self.cursor#建立游标,通过此命令来实例化if self.GetTables()==False:#如果数据库是空的,将下载全部数据self.Create_Table()start='19820101'elif self.get_count()==0:start='19820101'else:#如果已有下载的数据,将在原有基础上更新start=self.db_time()end=datetime.datetime.now().strftime('%Y%m%d')#更新时所在的日期print('start is :',start,'; end is :',end)exchange_date=self.pro.trade_cal(exchange='',start_date=start,end_date=end)#调用TUSHARE中的交易日历df_date=exchange_date[exchange_date['is_open']==1]#找出开是的日期count=0for i in df_date['cal_date']:#此为进度条,只为看起去更直观些, 没有此循环,也不影响下载。count+=1time.sleep(0.1)df = self.pro.daily(trade_date=i)df.to_sql(f'{self.Table_Name}', self.engine, if_exists='append',index=False) dis=int(50*count/len(df_date))print('\r','■'*dis+'□'*(50-dis),'{:.2%} Date:{}'.format(count/len(df_date),i),end='')print('\n','【data_down_to_DB】股票数据下载已全部完成')@wrapFundef look_for_std(self,Name_std="pct_chg",Date="19820101",Head=500):'''按股票代码分组后求出指定列的标准差,Date时间起始点'''cursor = self.cursor#建立游标,通过此命令来实例化Stock_Basic=self.pro.stock_basic(exchange='',list_status='L',feilds='ts_code,symbol,name,area,insdustry,list_date')#交易日的股票信息sql_std=f'select "ts_code" as code, stddev({Name_std}) as std_dev from {self.Table_Name} where "trade_date" >\'{Date}\' and pct_chg<20 group by "ts_code" order by std_dev desc'cursor.execute(sql_std)da=cursor.fetchall()df_db=pd.DataFrame(da,columns=['ts_code',f'{Name_std}_std'])df_std=pd.merge(df_db, Stock_Basic, how='left', on='ts_code')return df_std.head(Head)@wrapFundef look_for_data(self,*args,Code="ts_code",DateName="trade_date",Date="20210410" ):#从数据库中读取想要的列到内存cursor = self.cursor#建立游标,通过此命令来实例化 col=list(args)+[Code,DateName]ab='","'.join(col)ag=f'"{ab}"'sql=f'select {ag} from {self.Table_Name} where "trade_date" >\'{Date}\' 'print('正在读取数据库中的数据,电脑配置的高低决定所用时间的长短,请喝杯茶,!--------耐心等待--------') cursor.execute(sql)da=cursor.fetchall()df_db=pd.DataFrame(da,columns=col)return df_db@wrapFundef get_data_from_func(self,*args,Code="ts_code",DateName="trade_date",Date="20210410" ,func='func',Head=500):#根据标准差函数(look_for_std())的结果找出对应的股票代码想要的列到内存cursor = self.cursor#建立游标,通过此命令来实例化 col=list(args)+[Code,DateName]ab='","'.join(col)ag=f'"{ab}"'code_list=func.ts_code.tolist()# 导入标准差结果code_tuple=tuple(code_list[:Head])sql=f'select {ag} from {self.Table_Name} where "trade_date" >\'{Date}\' and ts_code in{code_tuple} 'print('正在读取数据库中的数据,电脑配置的高低决定所用时间的长短,请喝杯茶,!--------耐心等待--------') cursor.execute(sql)da=cursor.fetchall()df_get=pd.DataFrame(da,columns=col)return df_get@wrapFundef count_from_date(self,Name="ts_code",Date="19820101"):#计算查询到数据的行数cursor = self.cursor#建立游标,通过此命令来实例化sql_count=f'select {Name} ,count(*) from {self.Table_Name} where "trade_date" >\'{Date}\' group by {Name} 'cursor.execute(sql_count)da=cursor.fetchall()df_count=pd.DataFrame(da,columns=['ts_code','count_A'])return df_count@wrapFundef count_pct_chg(self,Name="ts_code",Date="19820101",change_up=3):#按change_up查出询后的数据的行数cursor = self.cursor#建立游标,通过此命令来实例化#DT_time=datetime.datetime.now()-datetime.timedelta(days=DTdays)sql_count=f'select {Name} ,count(*) from {self.Table_Name} where "trade_date" >\'{Date}\' and "pct_chg">{change_up} group by {Name} 'cursor.execute(sql_count)da=cursor.fetchall()df_count_pct_chg=pd.DataFrame(da,columns=[Name,'count_B'])return df_count_pct_chg@wrapFundef max_change_from_count(self,Name="ts_code",count_days=100,Date="20200101",Head=500):'''将查询的count_from_date与count_pct_chg的函数中的count相比,得出频次最高的前Head数量的ts_code'''cursor = self.cursor#建立游标,通过此命令来实例化df_count=self.count_from_date(Name,Date)dfpc=self.count_pct_chg(Name,Date)dfm=pd.merge(df_count, dfpc, how='left', on='ts_code')dfm['count']= dfm['count_B']/dfm['count_A']dfm=pd.merge(dfm, www.Stock_Basic, how='left', on='ts_code')df_out=dfm[dfm['count_A']>count_days]# 只计算上市时间大于100天的股票df_h=df_out.sort_values('count',ascending=False)df=df_h.loc[:,[Name,'count_A','count','name']].head(Head)return df @wrapFundef get_ts_code(self,Name="ts_code",Name_std="pct_chg",Date="20200101",Head=500):#----------ouput:code'''找出大于3%涨幅的出现的频次与标准差,求效集,得出想要的股票'''cursor = self.cursor#建立游标,通过此命令来实例化df_count=self.max_change_from_count(Name=Name,count_days=100,Date=Date,Head=Head)#找出大于3%涨幅的出现的频次df_std=self.look_for_std(Name_std=Name_std,Date=Date, Head=Head)#找出标准差dfm=pd.merge(df_count,df_std, how='inner', on='ts_code')dfm=pd.merge(dfm, www.Stock_Basic, how='left', on='ts_code')return dfm @wrapFundef output_data(self,Date="20200101"):#----------ouput:data'''导出用于回测的原始数据'''cursor = self.cursor#建立游标,通过此命令来实例化Date=datetime.datetime.strptime(Date,'%Y%m%d')DT_time=Date-datetime.timedelta(days=50)Dt=DT_time.strftime('%Y%m%d')df=self.get_data_from_func("open", "high" ,"low", "close", "pre_close", "change", "pct_chg", "vol", "amount",Code="ts_code",DateName="trade_date",Date=Dt ,func=self.get_ts_code())df['open'] = pd.to_numeric(df['open']).round(2)df['close'] = pd.to_numeric(df['close']).round(2)df['vol'] = pd.to_numeric(df['vol']).round(2)df['pct_chg'] = pd.to_numeric(df['pct_chg']).round(2)df['trade_date']=df['trade_date'].astype('datetime64[ns]')return df
count=0
income=pd.DataFrame()
def Income(df):global income, df_in, Date_in, Date_outdf.reset_index(inplace=True)m5=5m20=20def roll(df,m5=5,m20=20):"""m5为5日平移均线,m20为20日平移均线"""df[f'close_m{m5}']=df['close'].rolling(window=m5).mean()df[f'close_m{m20}']=df['close'].rolling(window=m20).mean()df[f'vol_m{m5}']=df['vol'].rolling(window=m5).mean()df[f'vol_m{m20}']=df['vol'].rolling(window=m20).mean()return dfdf=df.groupby(['ts_code']).apply(roll,m5=5,m20=20)df.dropna( inplace=True)df['sign']=(df[f'close_m{m5}']>df[f'close_m{m20}']).apply(lambda x: 1 if x is True else -1)df_in=pd.DataFrame()df_out=pd.DataFrame()df['Mark']=0for k ,v in df.groupby('ts_code'):v.set_index('trade_date', inplace=True,)for j in range(2,v.shape[0]):in_c1=(df[f'close_m{m5}'].iat[j-2]df[f'close_m{m20}'].iat[j])in_v1=(df[f'vol_m{m5}'].iat[j-2]df[f'vol_m{m20}'].iat[j])out_c1=(df[f'close_m{m5}'].iat[j-2]>df[f'close_m{m20}'].iat[j-2])out_c2=(df[f'close_m{m5}'].iat[j-1]>=df[f'close_m{m20}'].iat[j-1])out_c3=(df[f'close_m{m5}'].iat[j]df[f'vol_m{m20}'].iat[j-2])out_v2=(df[f'vol_m{m5}'].iat[j-1]>=df[f'vol_m{m20}'].iat[j-1])out_v3=(df[f'vol_m{m5}'].iat[j]Date_out]# df_in在下一个循环时行的开始处小于死叉的日期df_in=df_in.reset_index()return income, df_inwhile len(df_in)>0:ss=df_in.groupby('ts_code')['trade_date'].min()ss.sort_values(inplace=True)Date_in=ss[0]Code=ss.index[0]#找到最早的日期df_or1=df_end[df_end['ts_code'].isin([Code])]#根据ts_code 筛选出df_end总表上的对应的df_or1.set_index('trade_date',inplace=True)#设置indexdf_or2= df_or1.copy()# 此处不复制会报错df_or3=df_or2[df_or2.index>Date_in].copy()df_or3.loc[:,'log_price']=np.log(df_or3.loc[:,'close'])#求出close的np对数函数df_or3.loc[:,'log_return']=df_or3.loc[:,'log_price'].diff()#将对数函数转成增加百分比,方便后期累加计算收益df_or3.loc[:,'cumsum_return']=df_or3.loc[:,'log_return'].cumsum()df_or3=df_or3.dropna()df_or3.to_csv(r'E:\study_python\or3.csv',index=True)if len(df_or3)>0:for i in df_or3.index:vv=df_or3.loc[i,'cumsum_return']ww=df_or3.loc[i,'Mark']if vv>=0.15:print('出现>=0.15的项目, 日期是',i,'value 是',vv)vvww()breakelif vv<=-0.05:print('出现<=-0.05的项目, 日期是',i,'value 是',vv)vvww()breakelif ww==-1:print('出现==-1的项目, 日期是',i,'value 是',vv)vvww()breakelse:if i==df_or3.index[-1]:print('执行ELSE, 日期是',i,'value 是',vv)vvww()breakelse:continueprint('count=',count,'Code is ',Code,'Date_in=',Date_in,'Date_out=',Date_out)income.dropna(inplace=True)income.loc[:,'cumsum_return']=income.loc[:,'log_return'].cumsum()income['cumsum_return'].plot(figsize=(20,16))plt.show()return income
www=Jeoj()#类实例化
#www.mean_from_count(Date="20200101",DTdays=30)
#www.get_ts_code(Name="ts_code",Date="20200101",Head=300)
df=www.output_data(Date='20000101')#启动选股程序
Income(df)# 调用双均线策略
# import pickle as pk
# pk_file=open(r'E:\study_python\DB\my_pk.pkl','wb')
# pk.dump(df,pk_file)
# pk_file.close()
print('ok')
#www.data_down_to_DB()#下载股票数据到本地目录
有喜欢的加微信一起进步13967332389, 也路过的高手指导下, 不胜三路感谢!
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
