File size: 4,816 Bytes
7ec53ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# 安装币安的python库
# pip install python-binance
from binance.client import Client
from pymongo import MongoClient
import time
import pandas as pd

# 币安的api配置
api_key = "0jmNVvNZusoXKGkwnGLBghPh8Kmc0klh096VxNS9kn8P0nkAEslVUlsuOcRoGrtm"
api_secret = "PbSWkno1meUckhmkLyz8jQ2RRG7KgmZyAWhIF0qPdCJrmDSFxoxGdMG5gZeYYCgy"

# 需要写入的数据库配置
client = MongoClient(
    "mongodb://wth000:[email protected]:27017/dbname?authSource=wth000")
db = client["wth000"]
# 设置参数
name = "COIN30分钟"
collection = db[f"{name}"]
# 创建Binance客户端
client = Client(api_key, api_secret)
# 获取所有USDT计价的现货交易对
codes = client.get_exchange_info()["symbols"]
usdt_codes = [code for code in codes if code["quoteAsset"] == "USDT" and (
    "DOWN" not in code["symbol"]) and ("UP" not in code["symbol"])]
print(f"当前币安现货USDT有{len(usdt_codes)}个交易对")

# 遍历所有现货交易对,并获取日K线数据
for code in usdt_codes:
    symbol = code["symbol"]
    data_list = []
    # 找到该标的最新的时间戳
    latest_data = collection.find_one(
        {"代码": symbol}, {"timestamp": 1}, sort=[("timestamp", -1)])
    latest_timestamp = latest_data["timestamp"] if latest_data else 0
    klines = client.get_klines(
        symbol=symbol,
        interval=Client.KLINE_INTERVAL_30MINUTE,
        limit=1000
    )
    # 实际上实盘的时候,这里应该改成八小时
    # KLINE_INTERVAL_15MINUTE="15m"
    # KLINE_INTERVAL_8HOUR="8h"
    # KLINE_INTERVAL_1DAY ="1d"
    # 插入到集合中
    for kline in klines:
        timestamp = kline[0] / 1000
        if timestamp < latest_timestamp:  # 如果时间戳小于等于最后时间戳,直接跳过
            continue
        date = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(timestamp))
        if timestamp == latest_timestamp:
            update_data = {
                "timestamp": timestamp,
                "代码": symbol,
                "日期": date,
                "开盘": float(kline[1]),
                "最高": float(kline[2]),
                "最低": float(kline[3]),
                "收盘": float(kline[4]),
                "成交量": float(kline[5]),
                "收盘timestamp": float(kline[6]/1000),
                "成交额": float(kline[7]),
                "成交笔数": float(kline[8]),
                "主动买入成交量": float(kline[9]),
                "主动买入成交额":  float(kline[10])
            }
            filter = {"代码": symbol, "timestamp": latest_timestamp}
            collection.update_one(
                filter, {"$set": update_data})
        else:
            data_list.append({"timestamp": timestamp,
                              "代码": symbol,
                              "日期": date,
                              "开盘": float(kline[1]),
                              "最高": float(kline[2]),
                              "最低": float(kline[3]),
                              "收盘": float(kline[4]),
                              "成交量": float(kline[5]),
                              "收盘timestamp": float(kline[6]/1000),
                              "成交额": float(kline[7]),
                              "成交笔数": float(kline[8]),
                              "主动买入成交量": float(kline[9]),
                              "主动买入成交额":  float(kline[10])
                              })
    # 如果时间戳等于最新数据的时间戳,则执行更新操作,否则执行插入操作
    if len(data_list) > 0:
        collection.insert_many(data_list)
print("原始数据获取完成")

# limit = 600000
# if collection.count_documents({}) >= limit:
#     oldest_data = collection.find().sort([("日期", 1)]).limit(
#         collection.count_documents({})-limit)
#     ids_to_delete = [data["_id"] for data in oldest_data]
#     collection.delete_many({"_id": {"$in": ids_to_delete}})
# print("数据清理成功")

# # 数据拼接及指标计算
# time.sleep(1)
# df = pd.DataFrame(list(collection.find()))
# try:
#     dfbase = pd.DataFrame(list(db[f"COIN基本面"].find()))
#     # 仅保留共有代码的数据行
#     common_codes = set(df["代码"]).intersection(set(dfbase["代码"]))
#     df = df[df["代码"].isin(common_codes)]
#     dfbase = dfbase[dfbase["代码"].isin(common_codes)]
#     df = pd.merge(df, dfbase[["代码", "发行量"]], on="代码")
#     df["总市值"] = df["开盘"]*df["发行量"]
#     df.drop('_id', axis=1, inplace=True)  # 删掉目标列
#     db[f"{name}拼接"].drop()
#     time.sleep(1)
#     db[f"{name}拼接"].insert_many(df.to_dict("records"))
#     print("拼接数据插入完成")
# except Exception as e:
#     print(e, "拼接基本面数据失败")