lamhieu commited on
Commit
2860cbc
·
1 Parent(s): 20fe670

chore: update something

Browse files
Files changed (1) hide show
  1. lightweight_embeddings/analytics.py +66 -64
lightweight_embeddings/analytics.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import logging
2
  import asyncio
3
  from upstash_redis import Redis as UpstashRedis
@@ -31,8 +33,14 @@ class Analytics:
31
  # Upstash Redis client (synchronous over HTTP)
32
  self.redis_client = self._create_redis_client()
33
 
34
- # Local buffer stores cumulative data for two-way sync
35
- self.local_buffer = {
 
 
 
 
 
 
36
  "access": defaultdict(lambda: defaultdict(int)),
37
  "tokens": defaultdict(lambda: defaultdict(int)),
38
  }
@@ -53,7 +61,7 @@ class Analytics:
53
 
54
  async def _initialize(self):
55
  """
56
- Fetches existing data from Redis into the local buffer,
57
  then starts the periodic synchronization task.
58
  """
59
  try:
@@ -68,13 +76,16 @@ class Analytics:
68
  def _get_period_keys(self) -> tuple:
69
  """
70
  Returns day, week, month, and year keys based on the current UTC date.
 
71
  """
72
  now = datetime.utcnow()
73
  day_key = now.strftime("%Y-%m-%d")
 
 
74
  week_key = f"{now.year}-W{now.strftime('%U')}"
75
  month_key = now.strftime("%Y-%m")
76
  year_key = now.strftime("%Y")
77
- return day_key, week_key, month_key, year_key
78
 
79
  async def access(self, model_id: str, tokens: int):
80
  """
@@ -84,58 +95,56 @@ class Analytics:
84
  - model_id (str): The ID of the accessed model.
85
  - tokens (int): Number of tokens used in this access event.
86
  """
87
- day_key, week_key, month_key, year_key = self._get_period_keys()
88
 
89
  async with self.lock:
90
- # Access counts
91
- self.local_buffer["access"][day_key][model_id] += 1
92
- self.local_buffer["access"][week_key][model_id] += 1
93
- self.local_buffer["access"][month_key][model_id] += 1
94
- self.local_buffer["access"][year_key][model_id] += 1
95
- self.local_buffer["access"]["total"][model_id] += 1
96
-
97
- # Token usage
98
- self.local_buffer["tokens"][day_key][model_id] += tokens
99
- self.local_buffer["tokens"][week_key][model_id] += tokens
100
- self.local_buffer["tokens"][month_key][model_id] += tokens
101
- self.local_buffer["tokens"][year_key][model_id] += tokens
102
- self.local_buffer["tokens"]["total"][model_id] += tokens
103
 
104
  async def stats(self) -> Dict[str, Dict[str, Dict[str, int]]]:
105
  """
106
- Returns a copy of current statistics from the local buffer.
107
  """
108
  async with self.lock:
 
 
109
  return {
110
  "access": {
111
  period: dict(models)
112
- for period, models in self.local_buffer["access"].items()
113
  },
114
  "tokens": {
115
  period: dict(models)
116
- for period, models in self.local_buffer["tokens"].items()
117
  },
118
  }
119
 
120
  async def _sync_from_redis(self):
121
  """
122
- Pulls existing analytics data from Upstash Redis into the local buffer.
123
  Uses run_in_executor to avoid blocking the event loop.
 
124
  """
125
  loop = asyncio.get_running_loop()
126
-
127
  async with self.lock:
128
- # Scan 'access' keys
 
 
 
 
 
 
 
 
 
129
  cursor = 0
130
  while True:
131
- # Upstash doesn't provide a typical 'SCAN' the same way as standard Redis?
132
- # We'll mimic it by searching for keys, or we can store a list of known periods if needed.
133
- # If you only store certain known patterns, adapt accordingly.
134
- # For demonstration, we do a naive approach, or assume we have a method that lists keys.
135
- # Upstash doesn't always support the standard SCAN. We might store known keys in a set in Redis.
136
-
137
- # If Upstash doesn't support SCAN at all, you need another approach (like maintaining a separate index).
138
- # For now, let's assume it can handle the SCAN command similarly:
139
  scan_result = await loop.run_in_executor(
140
  None,
141
  partial(
@@ -146,6 +155,7 @@ class Analytics:
146
  ),
147
  )
148
  cursor, keys = scan_result[0], scan_result[1]
 
149
  for key in keys:
150
  # key is "analytics:access:<period>"
151
  period = key.replace("analytics:access:", "")
@@ -153,10 +163,11 @@ class Analytics:
153
  None, partial(self.redis_client.hgetall, key)
154
  )
155
  for model_id, count_str in data.items():
156
- self.local_buffer["access"][period][model_id] = int(count_str)
157
- break
 
 
158
 
159
- # Scan 'tokens' keys
160
  cursor = 0
161
  while True:
162
  scan_result = await loop.run_in_executor(
@@ -169,6 +180,7 @@ class Analytics:
169
  ),
170
  )
171
  cursor, keys = scan_result[0], scan_result[1]
 
172
  for key in keys:
173
  # key is "analytics:tokens:<period>"
174
  period = key.replace("analytics:tokens:", "")
@@ -176,24 +188,24 @@ class Analytics:
176
  None, partial(self.redis_client.hgetall, key)
177
  )
178
  for model_id, count_str in data.items():
179
- self.local_buffer["tokens"][period][model_id] = int(count_str)
180
- break
 
 
181
 
182
  async def _sync_to_redis(self):
183
  """
184
- Pushes the local buffer data to Upstash Redis (local -> Redis).
185
- Since Upstash does not support pipelining, we increment each field individually.
186
  """
187
  loop = asyncio.get_running_loop()
188
-
189
  async with self.lock:
190
  try:
191
- # For each (period, model_id, count), call hincrby
192
- for period, models in self.local_buffer["access"].items():
193
  redis_key = f"analytics:access:{period}"
194
  for model_id, count in models.items():
195
  if count != 0:
196
- # hincrby(key, field, amount)
197
  await loop.run_in_executor(
198
  None,
199
  partial(
@@ -204,7 +216,7 @@ class Analytics:
204
  ),
205
  )
206
 
207
- for period, models in self.local_buffer["tokens"].items():
208
  redis_key = f"analytics:tokens:{period}"
209
  for model_id, count in models.items():
210
  if count != 0:
@@ -218,6 +230,12 @@ class Analytics:
218
  ),
219
  )
220
 
 
 
 
 
 
 
221
  logger.info("Analytics data successfully synced to Upstash Redis.")
222
  except Exception as e:
223
  logger.error("Unexpected error during Upstash Redis sync: %s", e)
@@ -226,15 +244,14 @@ class Analytics:
226
  async def _start_sync_task(self):
227
  """
228
  Periodically runs _sync_to_redis at a configurable interval.
229
- Also attempts reconnection on any errors (though Upstash typically won't
230
- behave exactly like a persistent TCP connection).
231
  """
232
  while True:
233
  await asyncio.sleep(self.sync_interval)
234
  try:
235
  await self._sync_to_redis()
236
  except Exception as e:
237
- # Upstash might fail differently than standard Redis if there's a network issue
238
  logger.error("Error during scheduled sync to Upstash Redis: %s", e)
239
  await self._handle_redis_reconnection()
240
 
@@ -257,8 +274,7 @@ class Analytics:
257
  # Recreate the client
258
  await loop.run_in_executor(None, self.redis_client.close)
259
  self.redis_client = self._create_redis_client()
260
- # Upstash doesn't necessarily have a direct 'PING' command, so optional:
261
- # If you want to test, you could do e.g. redis_client.get("some_known_key") as a check
262
  logger.info("Successfully reconnected to Upstash Redis.")
263
  return
264
  except Exception as e:
@@ -270,21 +286,7 @@ class Analytics:
270
  logger.critical(
271
  "Max reconnection attempts reached. Unable to reconnect to Upstash Redis."
272
  )
273
-
274
- # Optionally, keep trying indefinitely
275
- # while True:
276
- # try:
277
- # logger.info("Retrying to reconnect to Upstash Redis...")
278
- # await loop.run_in_executor(None, self.redis_client.close)
279
- # self.redis_client = self._create_redis_client()
280
- # logger.info(
281
- # "Successfully reconnected to Upstash Redis after extended retry."
282
- # )
283
- # break
284
- # except Exception as e:
285
- # logger.error("Extended reconnection attempt failed: %s", e)
286
- # await asyncio.sleep(delay)
287
- # delay = min(delay * 2, 60) # cap at 60s or choose another max
288
 
289
  async def close(self):
290
  """
 
1
+ # filename: analytics.py
2
+
3
  import logging
4
  import asyncio
5
  from upstash_redis import Redis as UpstashRedis
 
33
  # Upstash Redis client (synchronous over HTTP)
34
  self.redis_client = self._create_redis_client()
35
 
36
+ # current_totals holds the absolute counters (loaded from Redis)
37
+ self.current_totals = {
38
+ "access": defaultdict(lambda: defaultdict(int)),
39
+ "tokens": defaultdict(lambda: defaultdict(int)),
40
+ }
41
+
42
+ # new_increments holds only the new usage since last sync
43
+ self.new_increments = {
44
  "access": defaultdict(lambda: defaultdict(int)),
45
  "tokens": defaultdict(lambda: defaultdict(int)),
46
  }
 
61
 
62
  async def _initialize(self):
63
  """
64
+ Fetches existing data from Redis into the current_totals buffer,
65
  then starts the periodic synchronization task.
66
  """
67
  try:
 
76
  def _get_period_keys(self) -> tuple:
77
  """
78
  Returns day, week, month, and year keys based on the current UTC date.
79
+ Also includes "total" as a key for all-time tracking.
80
  """
81
  now = datetime.utcnow()
82
  day_key = now.strftime("%Y-%m-%d")
83
+ # %U is the week number of year, with Sunday as the first day of the week
84
+ # If you prefer ISO week, consider using %V or something else.
85
  week_key = f"{now.year}-W{now.strftime('%U')}"
86
  month_key = now.strftime("%Y-%m")
87
  year_key = now.strftime("%Y")
88
+ return day_key, week_key, month_key, year_key, "total"
89
 
90
  async def access(self, model_id: str, tokens: int):
91
  """
 
95
  - model_id (str): The ID of the accessed model.
96
  - tokens (int): Number of tokens used in this access event.
97
  """
98
+ keys = self._get_period_keys()
99
 
100
  async with self.lock:
101
+ for period_key in keys:
102
+ # Increase new increments by the usage
103
+ self.new_increments["access"][period_key][model_id] += 1
104
+ self.new_increments["tokens"][period_key][model_id] += tokens
105
+
106
+ # Also update current_totals so that stats() are immediately up to date
107
+ self.current_totals["access"][period_key][model_id] += 1
108
+ self.current_totals["tokens"][period_key][model_id] += tokens
 
 
 
 
 
109
 
110
  async def stats(self) -> Dict[str, Dict[str, Dict[str, int]]]:
111
  """
112
+ Returns a copy of current statistics from the local buffer (absolute totals).
113
  """
114
  async with self.lock:
115
+ # Return the current_totals, which includes everything loaded from Redis
116
+ # plus all increments since the last sync.
117
  return {
118
  "access": {
119
  period: dict(models)
120
+ for period, models in self.current_totals["access"].items()
121
  },
122
  "tokens": {
123
  period: dict(models)
124
+ for period, models in self.current_totals["tokens"].items()
125
  },
126
  }
127
 
128
  async def _sync_from_redis(self):
129
  """
130
+ Pulls existing analytics data from Upstash Redis into current_totals.
131
  Uses run_in_executor to avoid blocking the event loop.
132
+ Also resets new_increments to avoid double counting after a restart.
133
  """
134
  loop = asyncio.get_running_loop()
 
135
  async with self.lock:
136
+ # Reset local structures
137
+ self.current_totals = {
138
+ "access": defaultdict(lambda: defaultdict(int)),
139
+ "tokens": defaultdict(lambda: defaultdict(int)),
140
+ }
141
+ self.new_increments = {
142
+ "access": defaultdict(lambda: defaultdict(int)),
143
+ "tokens": defaultdict(lambda: defaultdict(int)),
144
+ }
145
+
146
  cursor = 0
147
  while True:
 
 
 
 
 
 
 
 
148
  scan_result = await loop.run_in_executor(
149
  None,
150
  partial(
 
155
  ),
156
  )
157
  cursor, keys = scan_result[0], scan_result[1]
158
+
159
  for key in keys:
160
  # key is "analytics:access:<period>"
161
  period = key.replace("analytics:access:", "")
 
163
  None, partial(self.redis_client.hgetall, key)
164
  )
165
  for model_id, count_str in data.items():
166
+ self.current_totals["access"][period][model_id] = int(count_str)
167
+
168
+ if cursor == 0:
169
+ break
170
 
 
171
  cursor = 0
172
  while True:
173
  scan_result = await loop.run_in_executor(
 
180
  ),
181
  )
182
  cursor, keys = scan_result[0], scan_result[1]
183
+
184
  for key in keys:
185
  # key is "analytics:tokens:<period>"
186
  period = key.replace("analytics:tokens:", "")
 
188
  None, partial(self.redis_client.hgetall, key)
189
  )
190
  for model_id, count_str in data.items():
191
+ self.current_totals["tokens"][period][model_id] = int(count_str)
192
+
193
+ if cursor == 0:
194
+ break
195
 
196
  async def _sync_to_redis(self):
197
  """
198
+ Pushes only the new_increments to Upstash Redis (local -> Redis).
199
+ We use HINCRBY to avoid double counting, ensuring we only add the difference.
200
  """
201
  loop = asyncio.get_running_loop()
 
202
  async with self.lock:
203
  try:
204
+ # For each (period, model_id, count) in new_increments, call HINCRBY
205
+ for period, models in self.new_increments["access"].items():
206
  redis_key = f"analytics:access:{period}"
207
  for model_id, count in models.items():
208
  if count != 0:
 
209
  await loop.run_in_executor(
210
  None,
211
  partial(
 
216
  ),
217
  )
218
 
219
+ for period, models in self.new_increments["tokens"].items():
220
  redis_key = f"analytics:tokens:{period}"
221
  for model_id, count in models.items():
222
  if count != 0:
 
230
  ),
231
  )
232
 
233
+ # Reset new_increments after successful sync
234
+ self.new_increments = {
235
+ "access": defaultdict(lambda: defaultdict(int)),
236
+ "tokens": defaultdict(lambda: defaultdict(int)),
237
+ }
238
+
239
  logger.info("Analytics data successfully synced to Upstash Redis.")
240
  except Exception as e:
241
  logger.error("Unexpected error during Upstash Redis sync: %s", e)
 
244
  async def _start_sync_task(self):
245
  """
246
  Periodically runs _sync_to_redis at a configurable interval.
247
+ Also attempts reconnection on any errors (though Upstash is HTTP-based,
248
+ so it's stateless).
249
  """
250
  while True:
251
  await asyncio.sleep(self.sync_interval)
252
  try:
253
  await self._sync_to_redis()
254
  except Exception as e:
 
255
  logger.error("Error during scheduled sync to Upstash Redis: %s", e)
256
  await self._handle_redis_reconnection()
257
 
 
274
  # Recreate the client
275
  await loop.run_in_executor(None, self.redis_client.close)
276
  self.redis_client = self._create_redis_client()
277
+ # Optionally, do a test command if desired (Upstash has limited support).
 
278
  logger.info("Successfully reconnected to Upstash Redis.")
279
  return
280
  except Exception as e:
 
286
  logger.critical(
287
  "Max reconnection attempts reached. Unable to reconnect to Upstash Redis."
288
  )
289
+ # Optionally, you can keep trying indefinitely here.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
290
 
291
  async def close(self):
292
  """