ford442 commited on
Commit
e52857f
·
verified ·
1 Parent(s): a9dfcca

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +86 -0
app.py CHANGED
@@ -75,6 +75,9 @@ import os
75
 
76
  import torch
77
  import paramiko
 
 
 
78
 
79
  torch.backends.cuda.matmul.allow_tf32 = False
80
  torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False
@@ -91,6 +94,8 @@ FTP_HOST = os.getenv("FTP_HOST")
91
  FTP_USER = os.getenv("FTP_USER")
92
  FTP_PASS = os.getenv("FTP_PASS")
93
  FTP_DIR = os.getenv("FTP_DIR")
 
 
94
 
95
  def scheduler_swap_callback(pipeline, step_index, timestep, callback_kwargs):
96
  # adjust the batch_size of prompt_embeds according to guidance_scale
@@ -129,7 +134,88 @@ def scheduler_swap_callback(pipeline, step_index, timestep, callback_kwargs):
129
  # pipeline.scheduler._step_index = pipeline.num_timesteps * 0.9
130
  return callback_kwargs
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  def upload_to_ftp(filename):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
  try:
134
  transport = paramiko.Transport((FTP_HOST, 22))
135
  destination_path=FTP_DIR+filename
 
75
 
76
  import torch
77
  import paramiko
78
+ import socket
79
+ import threading # NEW IMPORT
80
+ import queue # NEW IMPORT
81
 
82
  torch.backends.cuda.matmul.allow_tf32 = False
83
  torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False
 
94
  FTP_USER = os.getenv("FTP_USER")
95
  FTP_PASS = os.getenv("FTP_PASS")
96
  FTP_DIR = os.getenv("FTP_DIR")
97
+ FTP_HOST_FALLBACK = os.getenv("FTP_HOST_FALLBACK")
98
+ FTP_DIR_FALLBACK = os.getenv("FTP_DIR_FALLBACK")
99
 
100
  def scheduler_swap_callback(pipeline, step_index, timestep, callback_kwargs):
101
  # adjust the batch_size of prompt_embeds according to guidance_scale
 
134
  # pipeline.scheduler._step_index = pipeline.num_timesteps * 0.9
135
  return callback_kwargs
136
 
137
+
138
+ # --- WORKER FUNCTION FOR THREADING ---
139
+ # This function contains the logic to connect to a single host.
140
+ # It will be executed by each of our threads.
141
+ def connect_worker(host, result_queue):
142
+ """Tries to connect to a single host and puts the successful transport object into the queue."""
143
+ transport = None
144
+ try:
145
+ transport = paramiko.Transport((host, 22))
146
+ # We still use the 5-second timeout for the handshake
147
+ transport.start_client(timeout=5)
148
+ transport.auth_password(username=FTP_USER, password=FTP_PASS)
149
+
150
+ # If we reach here, the connection was successful.
151
+ # Put the result in the queue for the main thread to use.
152
+ print(f"✅ Connection to {host} succeeded first.")
153
+ result_queue.put(transport)
154
+
155
+ except (paramiko.SSHException, socket.timeout, EOFError) as e:
156
+ # This is an expected failure, just print a note.
157
+ print(f"ℹ️ Connection to {host} failed or was too slow: {e}")
158
+ if transport:
159
+ transport.close()
160
+ except Exception as e:
161
+ # Handle any other unexpected errors.
162
+ print(f"❌ Unexpected error connecting to {host}: {e}")
163
+ if transport:
164
+ transport.close()
165
+
166
  def upload_to_ftp(filename):
167
+ """
168
+ Attempts to connect to two FTP hosts simultaneously and uses the first one that responds.
169
+ It now uses a corresponding directory for the primary and fallback hosts.
170
+ """
171
+ hosts = [FTP_HOST]
172
+ if FTP_HOST_FALLBACK:
173
+ hosts.append(FTP_HOST_FALLBACK)
174
+
175
+ result_queue = queue.Queue()
176
+ threads = []
177
+
178
+ print(f"--> Racing connections to {hosts} for uploading {filename}...")
179
+
180
+ for host in hosts:
181
+ thread = threading.Thread(target=connect_worker, args=(host, result_queue))
182
+ thread.daemon = True
183
+ thread.start()
184
+ threads.append(thread)
185
+
186
+ try:
187
+ winning_transport = result_queue.get(timeout=7)
188
+
189
+ # --- THIS IS THE NEW LOGIC ---
190
+ # 1. Determine which host won the race.
191
+ winning_host = winning_transport.getpeername()[0]
192
+
193
+ # 2. Select the correct destination directory based on the winning host.
194
+ # If the fallback directory isn't specified, it safely defaults to the primary directory.
195
+ if winning_host == FTP_HOST:
196
+ destination_directory = FTP_DIR
197
+ else:
198
+ destination_directory = FTP_DIR_FALLBACK if FTP_DIR_FALLBACK else FTP_DIR
199
+
200
+ print(f"--> Proceeding with upload to {winning_host} in directory {destination_directory}...")
201
+
202
+ # 3. Construct the full destination path using the selected directory.
203
+ sftp = paramiko.SFTPClient.from_transport(winning_transport)
204
+ destination_path = os.path.join(destination_directory, os.path.basename(filename))
205
+ sftp.put(filename, destination_path)
206
+
207
+ print(f"✅ Successfully uploaded {filename}.")
208
+
209
+ sftp.close()
210
+ winning_transport.close()
211
+
212
+ except queue.Empty:
213
+ print("❌ Critical Error: Neither FTP host responded in time.")
214
+ except Exception as e:
215
+ print(f"❌ An unexpected error occurred during SFTP operation: {e}")
216
+
217
+
218
+ def upload_to_ftp_old(filename):
219
  try:
220
  transport = paramiko.Transport((FTP_HOST, 22))
221
  destination_path=FTP_DIR+filename