import akshare as ak import pandas as pd import datetime from pymongo import MongoClient import pytz import requests import pandas as pd import time import math def technology(df): # 定义计算技术指标的函数 try: # df = df.dropna() # 删除缺失值,避免无效数据的干扰 df.sort_values(by="日期") # 以日期列为索引,避免计算错误 df["涨跌幅"] = df["收盘"]/df["收盘"].copy().shift(1) - 1 df["振幅"] = (df["最高"].copy()-df["最低"].copy())/df["开盘"].copy() df["资金波动"] = df["振幅"] / df["成交额"] for n in range(1, 5): df[f"过去{n}日资金波动"] = df["资金波动"].shift(n) if ("分钟" in name) | ("指数" in name) | ("行业" in name): for n in range(1, 10): df[f"过去{n}日总涨跌"] = df["开盘"]/(df["开盘"].copy().shift(n)) except Exception as e: print(f"发生bug: {e}") return df def rank(df): # 计算每个标的的各个指标在当日的排名,并将排名映射到 [0, 1] 的区间中 # 计算每个指标的排名 for column in df.columns: if ("未来函数" not in str(column)): df = pd.concat([df, (df[str(column)].rank( method="max", ascending=False) / len(df)).rename(f"{str(column)}_rank")], axis=1) return df client = MongoClient( "mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000") db = client["wth000"] name = "可转债" collection = db[f"实盘{name}"] # 获取当前日期,并通过akshare访问当日A股指数是否有数据,如果有数据则说明今日A股开盘,进行下一步的操作 start_date = datetime.datetime.now().strftime("%Y-%m-%d") day = ak.index_zh_a_hist( symbol="000002", start_date=start_date, period="daily") print(day) if not day.notna().empty: timestamp = datetime.datetime.strptime( start_date, "%Y-%m-%d").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp() k_data = ak.bond_zh_hs_cov_spot() k_data = k_data.rename(columns={"code": "代码", "open": "开盘", "high": "最高", "low": "最低", "buy": "买一", "sell": "卖一", "trade": "收盘", "volume": "成交量", "amount": "成交额", "ticktime": "标记时间", "涨跌额": "-0.552", "涨跌幅": "-0.255", }) try: latest = list(collection.find({"timestamp": timestamp}, { "timestamp": 1}).sort("timestamp", -1).limit(1)) if len(latest) == 0: upsert_docs = True start_date_query = start_date print(latest) else: upsert_docs = False latest_timestamp = latest[0]["timestamp"] start_date_query = datetime.datetime.fromtimestamp( latest_timestamp).strftime("%Y-%m-%d") try: k_data["timestamp"] = timestamp k_data["日期"] = start_date k_data["代码"] = k_data["代码"].apply(lambda x: float(x)) k_data["买一"] = k_data["买一"].apply(lambda x: float(x)) k_data["卖一"] = k_data["卖一"].apply(lambda x: float(x)) k_data["开盘"] = k_data["开盘"].apply(lambda x: float(x)) k_data["最高"] = k_data["最高"].apply(lambda x: float(x)) k_data["最低"] = k_data["最低"].apply(lambda x: float(x)) k_data["收盘"] = k_data["收盘"].apply(lambda x: float(x)) k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x)) k_data["成交额"] = k_data["成交额"].apply(lambda x: float(x)) k_data = k_data.to_dict("records") if upsert_docs: collection.insert_many(k_data) else: bulk_insert = [] for doc in k_data: print(doc["代码"], "数据更新") if doc["timestamp"] > latest_timestamp: # 否则,加入插入列表 bulk_insert.append(doc) if doc["timestamp"] == float(latest_timestamp): collection.update_many({"代码": doc["代码"], "timestamp": float(timestamp)}, { "$set": doc}, upsert=True) # 执行批量插入操作 if bulk_insert: collection.insert_many(bulk_insert) print("任务已经完成") except Exception as e: print(e) limit = 20000 if collection.count_documents({}) >= limit: oldest_data = collection.find().sort([("日期", 1)]).limit( collection.count_documents({})-limit) ids_to_delete = [data["_id"] for data in oldest_data] collection.delete_many({"_id": {"$in": ids_to_delete}}) print("数据清理成功") except Exception as e: print(e) time.sleep(1) # 获取数据并转换为DataFrame格式 df = pd.DataFrame(list(collection.find())) print(f"{name}数据读取成功") # 按照“代码”列进行分组并计算技术指标 df = df.groupby(["代码"], group_keys=False).apply(technology) # 分组并计算指标排名 df = df.groupby(["日期"], group_keys=False).apply(rank) try: df.sort_values(by="日期") # 以日期列为索引,避免计算错误 # 获取最后一天的数据 last_day = df.iloc[-1]["日期"] # 计算总共统计的股票数量 df = df[df[f"日期"] == last_day].copy() code = df[df["日期"] == df["日期"].min()]["代码"] # 获取首日标的数量,杜绝未来函数 num = math.ceil(len(code)/100) df = df[(df["涨跌幅"] <= 0.09)].copy() df = df[(df[f"过去1日资金波动_rank"] <= 0.1)].copy() dfend = df.groupby("日期", group_keys=True).apply( lambda x: x.nsmallest(1, f"开盘")).reset_index(drop=True) print(df) if len(df) < 200: # 发布到钉钉机器人 df["市场"] = f"实盘{name}" dfend["市场"] = f"实盘{name}" message = df[["市场", "代码", "日期", "开盘"]].copy().to_markdown() messageend = dfend[["市场", "代码", "日期", "开盘"]].copy().to_markdown() print(type(message)) webhook = "https://oapi.dingtalk.com/robot/send?access_token=f5a623f7af0ae156047ef0be361a70de58aff83b7f6935f4a5671a626cf42165" requests.post(webhook, json={"msgtype": "markdown", "markdown": { "title": f"{name}", "text": message}}) requests.post(webhook, json={"msgtype": "markdown", "markdown": { "title": f"{name}低价股", "text": messageend}}) except Exception as e: print(f"发生bug: {e}") buy_symbols = df["代码"].copy().drop_duplicates().tolist() # 获取所有不重复的交易标的 print(buy_symbols)