code / 交易执行 /实盘COIN.py
tradequant's picture
Upload 59 files
7ec53ba
import requests
import time
import math
from binance.client import Client
from pymongo import MongoClient
import time
import pandas as pd
def technology(df): # 定义计算技术指标的函数
try:
# df = df.dropna() # 删除缺失值,避免无效数据的干扰
df.sort_values(by="日期") # 以日期列为索引,避免计算错误
df["涨跌幅"] = df["收盘"]/df["收盘"].copy().shift(1) - 1
df["振幅"] = (df["最高"].copy()-df["最低"].copy())/df["开盘"].copy()
df["资金波动"] = df["振幅"] / df["成交额"]
for n in range(1, 5):
df[f"过去{n}日资金波动"] = df["资金波动"].shift(n)
if ("分钟" in name) | ("指数" in name) | ("行业" in name):
for n in range(1, 10):
df[f"过去{n}日总涨跌"] = df["开盘"]/(df["开盘"].copy().shift(n))
except Exception as e:
print(f"发生bug: {e}")
return df
def rank(df): # 计算每个标的的各个指标在当日的排名,并将排名映射到 [0, 1] 的区间中
# 计算每个指标的排名
for column in df.columns:
if ("未来函数" not in str(column)):
df = pd.concat([df, (df[str(column)].rank(
method="max", ascending=False) / len(df)).rename(f"{str(column)}_rank")], axis=1)
return df
client = MongoClient(
"mongodb://wth000:[email protected]:27017/dbname?authSource=wth000")
db = client["wth000"]
# 设置参数
name = "COIN"
collection = db[f"实盘{name}"]
# 币安的api配置
api_key = "0jmNVvNZusoXKGkwnGLBghPh8Kmc0klh096VxNS9kn8P0nkAEslVUlsuOcRoGrtm"
api_secret = "PbSWkno1meUckhmkLyz8jQ2RRG7KgmZyAWhIF0qPdCJrmDSFxoxGdMG5gZeYYCgy"
try:
# 创建Binance客户端
client = Client(api_key, api_secret)
# 获取所有USDT计价的现货交易对
ticker_prices = client.get_exchange_info()["symbols"]
usdt_ticker_prices = [
ticker_price for ticker_price in ticker_prices if ticker_price["quoteAsset"] == "USDT" and ("DOWN" not in ticker_price["symbol"]) and ("UP" not in ticker_price["symbol"])]
print(f"当前币安现货有{len(ticker_prices)}个交易对")
# 遍历所有现货交易对,并获取日K线数据
for ticker_price in usdt_ticker_prices:
symbol = ticker_price["symbol"]
data_list = []
# 找到该标的最新的时间戳
latest_data = collection.find_one(
{"代码": symbol}, {"timestamp": 1}, sort=[("timestamp", -1)])
latest_timestamp = latest_data["timestamp"] if latest_data else 0
klines = client.get_klines(
symbol=symbol,
interval=Client.KLINE_INTERVAL_1DAY,
limit=3
)
# 插入到集合中
for kline in klines:
timestamp = kline[0] / 1000
if timestamp < latest_timestamp: # 如果时间戳小于等于最后时间戳,直接跳过
continue
date = time.strftime("%Y-%m-%d %H:%M:%S",
time.gmtime(timestamp))
if timestamp == latest_timestamp:
update_data = {
"timestamp": timestamp,
"代码": symbol,
"日期": date,
"开盘": float(kline[1]),
"最高": float(kline[2]),
"最低": float(kline[3]),
"收盘": float(kline[4]),
"成交量": float(kline[5]),
"收盘timestamp": float(kline[6]/1000),
"成交额": float(kline[7]),
}
filter = {"代码": symbol, "timestamp": latest_timestamp}
collection.update_one(
filter, {"$set": update_data})
else:
data_list.append({"timestamp": timestamp,
"代码": symbol,
"日期": date,
"开盘": float(kline[1]),
"最高": float(kline[2]),
"最低": float(kline[3]),
"收盘": float(kline[4]),
"成交量": float(kline[5]),
"收盘timestamp": float(kline[6]/1000),
"成交额": float(kline[7]),
})
# 如果时间戳等于最新数据的时间戳,则执行更新操作,否则执行插入操作
if len(data_list) > 0:
collection.insert_many(data_list)
print("任务已经完成")
limit = 5000
if collection.count_documents({}) >= limit:
oldest_data = collection.find().sort([("日期", 1)]).limit(
collection.count_documents({})-limit)
ids_to_delete = [data["_id"] for data in oldest_data]
collection.delete_many({"_id": {"$in": ids_to_delete}})
print("数据清理成功")
except Exception as e:
print(e)
time.sleep(1)
try:
# 获取数据并转换为DataFrame格式
df = pd.DataFrame(list(collection.find()))
print(f"{name}数据读取成功")
# 按照“代码”列进行分组并计算技术指标
df = df.groupby(["代码"], group_keys=False).apply(technology)
# 分组并计算指标排名
df = df.groupby(["日期"], group_keys=False).apply(rank)
df.sort_values(by="日期") # 以日期列为索引,避免计算错误
# 获取最后一天的数据
last_day = df.iloc[-1]["日期"]
# 计算总共统计的股票数量
df = df[df[f"日期"] == last_day].copy()
code = df[df["日期"] == df["日期"].min()]["代码"] # 获取首日标的数量,杜绝未来函数
num = math.ceil(len(code)/100)
df = df[(df[f"开盘"] >= 0.00000200)].copy()
df = df[(df[f"过去{1}日资金波动_rank"] <= 0.01)].copy()
dfend = df.groupby(["日期"], group_keys=True).apply(
lambda x: x.nsmallest(1, f"开盘")).reset_index(drop=True)
print(df)
if len(df) < 200:
# 发布到钉钉机器人
df["市场"] = f"实盘{name}"
dfend["市场"] = f"实盘{name}"
message = df[["市场", "代码", "日期", "开盘"]].copy().to_markdown()
messageend = dfend[["市场", "代码", "日期", "开盘"]].copy().to_markdown()
print(type(message))
webhook = "https://oapi.dingtalk.com/robot/send?access_token=f5a623f7af0ae156047ef0be361a70de58aff83b7f6935f4a5671a626cf42165"
requests.post(webhook, json={"msgtype": "markdown", "markdown": {
"title": f"{name}", "text": message}})
requests.post(webhook, json={"msgtype": "markdown", "markdown": {
"title": f"{name}低价股", "text": messageend}})
except Exception as e:
print(f"发生bug: {e}")
buy_symbols = df["代码"].copy().drop_duplicates().tolist() # 获取所有不重复的交易标的
print(buy_symbols)