当前位置: 首页 > news >正文

大A数据采集教程-3 自动采集当日数据

前言

上一个教程已经完成了历史数据的创建,这个教程完成每日自动更新日K

Baostock数据更新时间

当前交易日17:30,完成日K线数据入库

创建每天18点自动更新日K

根据Baostock的规则,我们设定每天18:00的定时任务,当你下班回家当日数据已经给你整理得巴巴适适
我们使用系统自带的crontab完成定时任务

crontab-e

添加定时任务,脚本路径根据自己需求存储,日志位置按个人喜好存储

018* * * /bin/bash /home/zero/scripts/run.sh>>/var/log/stock_sync.log2>&1

创建run.sh脚本

因为python 是使用的虚拟环境,所以需要先激活虚拟环境再调用脚本

#!/bin/bash# 项目路径(根据您提供的信息设置)SCRIPT_DIR="/root"PROJECT_DIR="/root/venv"SCRIPT_NAME="daily_stock_sync.py"# 检查项目目录是否存在if[!-d"$PROJECT_DIR"];thenecho"错误:项目目录$PROJECT_DIR不存在"exit1fi# 检查虚拟环境是否存在VENV_ACTIVATE="$PROJECT_DIR/bin/activate"if[!-f"$VENV_ACTIVATE"];thenecho"错误:虚拟环境激活脚本不存在,请确认路径是否正确"echo"预期路径:$VENV_ACTIVATE"exit1fi# 检查脚本文件是否存在SCRIPT_PATH="$SCRIPT_DIR/$SCRIPT_NAME"if[!-f"$SCRIPT_PATH"];thenecho"错误:脚本文件$SCRIPT_PATH不存在"exit1fi# 激活虚拟环境并执行脚本cd"$PROJECT_DIR"||exit1source"$VENV_ACTIVATE"python"$SCRIPT_PATH"

运行脚本时先使用接口query_trade_dates判断当天是否是交易日,如果是遍历获取当天的日K数据

importbaostockasbsimportpandasaspdimporttimefromdatetimeimportdatetimefromsqlalchemyimportcreate_enginefromsqlalchemy.excimportIntegrityError# === 配置信息 ===MYSQL_CONFIG={'host':'',# 数据库地址'port':,# 端口'user':'',# 用户名'password':'',# 密码'db':'',# 数据库名'charset':'utf8mb4'}engine=create_engine(f"mysql+pymysql://{MYSQL_CONFIG['user']}:{MYSQL_CONFIG['password']}@{MYSQL_CONFIG['host']}:{MYSQL_CONFIG['port']}/{MYSQL_CONFIG['db']}?charset={MYSQL_CONFIG['charset']}")# === 核心函数 ===defis_trading_day()->bool:"""判断当天是否为交易日"""today=datetime.now().strftime('%Y-%m-%d')rs=bs.query_trade_dates(start_date=today,end_date=today)ifrs.error_code!='0':print(f"交易日查询失败:{rs.error_msg}")returnFalsetrading_days=[]whilers.next():trading_days.append(rs.get_row_data())returnlen(trading_days)>0andtrading_days[0][1]=='1'# 1=交易日defget_stock_codes()->list:"""获取需要更新的股票代码列表(示例:从数据库stock_basic表读取)"""query="SELECT code FROM stock_basic WHERE status = '上市'"# 假设存在基础信息表codes=pd.read_sql(query,engine)['code'].tolist()print(f"共获取{len(codes)}只上市股票代码")returncodesdefget_daily_k_data(code:str,trade_date:str)->pd.DataFrame:"""获取单只股票的当日K线数据"""# Baostock日线接口参数说明:# adjustflag:复权类型,1=后复权,2=前复权,3=不复权(与数据库字段对应)fields="date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ,psTTM,pcfNcfTTM"rs=bs.query_history_k_data_plus(code=code,fields=fields,start_date=trade_date,end_date=trade_date,frequency="d",# 日线数据adjustflag="2"# 前复权(根据需求调整为1/2/3))ifrs.error_code!='0':print(f"股票{code}数据查询失败:{rs.error_msg}")returnpd.DataFrame()data_list=[]whilers.next():data=rs.get_row_data()data_dict={'trade_date':data[0],# 交易日期'code':data[1],# 股票代码'open':float(data[2])ifdata[2]elseNone,'high':float(data[3])ifdata[3]elseNone,'low':float(data[4])ifdata[4]elseNone,'close':float(data[5])ifdata[5]elseNone,'pre_close':float(data[6])ifdata[6]elseNone,'volume':int(data[7])ifdata[7]elseNone,'amount':float(data[8])ifdata[8]elseNone,'adjustflag':data[9],# 复权类型'turn':float(data[10])ifdata[10]elseNone,'tradestatus':data[11],# 交易状态(1=正常,0=停牌)'pctChg':float(data[12])ifdata[12]elseNone,'peTTM':float(data[13])ifdata[13]elseNone,'pbMRQ':float(data[14])ifdata[14]elseNone,'psTTM':float(data[15])ifdata[15]elseNone,'pcfNcfTTM':float(data[16])ifdata[16]elseNone,'change_amount':round(float(data[5])-float(data[6]),4)if(data[5]anddata[6])elseNone,# 计算涨跌额'isST':1if'ST'incodeelse0# 简单判断是否ST股(需根据实际股票名称优化)}data_list.append(data_dict)returnpd.DataFrame(data_list)defbatch_save_k_data_to_db(df:pd.DataFrame):"""批量写入K线数据到数据库"""ifdf.empty:print("无数据可写入")returntry:# 使用replace避免重复写入(利用唯一索引idx_code_trade_date)df.to_sql(name='stock_quote_daily',con=engine,if_exists='append',index=False,chunksize=1000)print(f"成功写入{len(df)}条K线数据({datetime.now()})")exceptIntegrityError:# 若触发唯一索引冲突,说明数据已存在,跳过写入print(f"部分数据已存在,跳过重复写入({datetime.now()})")exceptExceptionase:print(f"写入数据库失败:{str(e)}")# === 主执行逻辑 ===defmain():today=datetime.now().strftime('%Y-%m-%d')print(f"===== 开始执行{today}K线数据更新任务 =====")# 1. 登录Baostocklg=bs.login()iflg.error_code!='0':print(f"Baostock登录失败:{lg.error_msg}")returntry:# 2. 判断是否为交易日ifnotis_trading_day():print(f"今日({today})非交易日,任务结束")return# 3. 获取股票代码列表codes=get_stock_codes()ifnotcodes:print("未获取到股票代码列表,任务结束")return# 4. 批量获取K线数据并写入数据库batch_size=50# 每批处理50只股票(避免接口请求过于频繁)all_data=[]foriinrange(0,len(codes),batch_size):batch_codes=codes[i:i+batch_size]print(f"处理批次{i//batch_size+1}/{(len(codes)+batch_size-1)//batch_size}{len(batch_codes)}只股票)")forcodeinbatch_codes:df=get_daily_k_data(code,today)ifnotdf.empty:all_data.append(df)time.sleep(0.5)# 控制请求频率(Baostock限制每秒2次)# 每批数据合并后写入数据库ifall_data:batch_df=pd.concat(all_data,ignore_index=True)batch_save_k_data_to_db(batch_df)all_data=[]# 清空列表准备下一批finally:# 5. 登出Baostockbs.logout()print(f"====={today}K线数据更新任务结束 =====")if__name__=="__main__":main()
http://www.cnnetsun.cn/news/107083.html

相关文章:

  • RK3588语音AI部署终极指南:算子兼容性深度优化与实战解决方案
  • EmotiVoice语音好奇感模拟促进知识探索
  • Abaqus轮轨瞬态动力学分析:从模型搭建到inp文件生成
  • 使用Playwright集成亮数据IP代理获取AI热点
  • 探索工程模拟与分析的多元世界:从轨道到建筑
  • Cuberite服务器日志分析完全指南:从入门到实战
  • EmotiVoice语音合成服务灰度日志采集规范
  • EmotiVoice语音自然度评分达到MOS 4.5以上
  • GISBox教你快速获取建筑数据并生成可发布的3D模型
  • EmotiVoice情感语音合成API接口调用详细说明
  • SenseVoice多语言语音理解:突破传统ASR局限的专业术语识别方案
  • Redash数据可视化:让枯燥数据秒变商业洞察
  • Pyfa舰船配置工具:5个高效技巧助你成为EVE Online配置高手
  • 洛谷 P1892 [BalticOI 2003] 团伙
  • 洛谷 P2024 [NOI2001] 食物链
  • Animeko跨平台动漫追番神器:从入门到精通的完整指南
  • 中级软件设计师英语部分备考攻略:完形填空高频考点与解题技巧
  • 2025年下半年软件设计师易混淆知识点
  • Headscale配置终极指南:从零到精通的环境变量管理技巧
  • 测试架构师的成长路径:从技术执行到质量战略的跨越
  • 多人姿态估计终极指南:从零开始构建实时人体分析系统
  • 【ACWing】150. 括号画家
  • 如何快速掌握Vim插件管理:VAM的完整使用指南
  • 文献分区及影响因子批量查询
  • APKMirror安卓应用下载平台深度解析:从源码到实践
  • 终极FreeMarker模板调试工具:3分钟解决模板语法问题
  • QQScreenShot独立版技术解析:基于模块化架构的屏幕捕捉解决方案
  • 快速掌握SCPI Parser终极指南:构建专业仪器控制系统的完整解决方案
  • 自定义算子的“诞生记”:基于CANN Kernel自调工程的完整CI/CD流水线
  • 高效、稳定、可定制——EmotiVoice开源TTS优势全解析