danhtran2mind commited on
Commit
204d5bf
·
verified ·
1 Parent(s): bce32c7

Update apps/gradio_app/processor.py

Browse files
Files changed (1) hide show
  1. apps/gradio_app/processor.py +208 -208
apps/gradio_app/processor.py CHANGED
@@ -1,209 +1,209 @@
1
- import os
2
- import sys
3
- import shutil
4
- import traceback
5
- import logging
6
- import gradio as gr
7
- import uuid
8
- import cv2
9
- import time
10
- from gradio_app.utils import convert_to_supported_format
11
-
12
- # Adjust sys.path to include the src directory
13
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'src', 'license_plate_detector_ocr')))
14
- from infer import infer, is_image_file
15
-
16
- def gradio_process(input_file, input_type):
17
- """Process the input file (image or video) for license plate detection and OCR."""
18
- unique_id = str(uuid.uuid4())[:8]
19
- temp_input_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", unique_id))
20
- preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
21
- try:
22
- file_path = input_file.name if hasattr(input_file, 'name') else input_file
23
- logging.debug(f"Input file path: {file_path}")
24
- print(f"Input file path: {file_path}")
25
-
26
- # Verify source file exists and is readable
27
- if not os.path.exists(file_path):
28
- error_msg = f"Error: Source file {file_path} does not exist."
29
- logging.error(error_msg)
30
- return None, None, error_msg, None, None
31
- if not os.access(file_path, os.R_OK):
32
- error_msg = f"Error: Source file {file_path} is not readable."
33
- logging.error(error_msg)
34
- return None, None, error_msg, None, None
35
-
36
- # Create unique temp and preview directories
37
- os.makedirs(temp_input_dir, exist_ok=True)
38
- os.makedirs(preview_dir, exist_ok=True)
39
- temp_input_path = os.path.join(temp_input_dir, os.path.basename(file_path))
40
- preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
41
-
42
- # Copy input file to temp and preview directories with retry
43
- max_retries = 3
44
- for attempt in range(max_retries):
45
- try:
46
- shutil.copy2(file_path, temp_input_path) # Copy to temp for processing
47
- shutil.copy2(file_path, preview_input_path) # Copy to preview for display
48
- os.chmod(temp_input_path, 0o644)
49
- os.chmod(preview_input_path, 0o644)
50
- logging.debug(f"Copied input file to: {temp_input_path} and {preview_input_path}")
51
- break
52
- except Exception as e:
53
- if attempt == max_retries - 1:
54
- error_msg = f"Error copying file {file_path} to {temp_input_path} or {preview_input_path} after {max_retries} attempts: {str(e)}"
55
- logging.error(error_msg)
56
- return None, None, error_msg, None, None
57
- time.sleep(0.5) # Brief delay before retry
58
-
59
- # Verify copied files
60
- for path in [temp_input_path, preview_input_path]:
61
- if not os.path.exists(path):
62
- error_msg = f"Error: Copied file {path} does not exist."
63
- logging.error(error_msg)
64
- return None, None, error_msg, None, None
65
- if not os.access(path, os.R_OK):
66
- error_msg = f"Error: Copied file {path} is not readable."
67
- logging.error(error_msg)
68
- return None, None, error_msg, None, None
69
- if os.path.getsize(path) == 0:
70
- error_msg = f"Error: Copied file {path} is empty."
71
- logging.error(error_msg)
72
- return None, None, error_msg, None, None
73
-
74
- # Validate image or video
75
- if is_image_file(temp_input_path):
76
- img = cv2.imread(temp_input_path)
77
- if img is None:
78
- error_msg = f"Error: Could not load image from {temp_input_path}."
79
- logging.error(error_msg)
80
- return None, None, error_msg, None, None
81
- # Check image properties
82
- height, width, channels = img.shape
83
- logging.debug(f"Image properties: {width}x{height}, {channels} channels")
84
- if channels not in (1, 3, 4):
85
- error_msg = f"Error: Unsupported number of channels ({channels}) in {temp_input_path}. Expected 1, 3, or 4."
86
- logging.error(error_msg)
87
- return None, None, error_msg, None, None
88
- if width == 0 or height == 0:
89
- error_msg = f"Error: Invalid image dimensions ({width}x{height}) in {temp_input_path}."
90
- logging.error(error_msg)
91
- return None, None, error_msg, None, None
92
- else:
93
- cap = cv2.VideoCapture(temp_input_path)
94
- if not cap.isOpened():
95
- error_msg = f"Error: Could not open video at {temp_input_path}."
96
- logging.error(error_msg)
97
- cap.release()
98
- return None, None, error_msg, None, None
99
- cap.release()
100
-
101
- # Set output path
102
- output_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", str(uuid.uuid4())[:8]))
103
- os.makedirs(output_dir, exist_ok=True)
104
- output_filename = f"{os.path.splitext(os.path.basename(temp_input_path))[0]}_{unique_id}_output{'_output.jpg' if is_image_file(temp_input_path) else '_output.mp4'}"
105
- output_path = os.path.join(output_dir, output_filename)
106
- logging.debug(f"Output path: {output_path}")
107
-
108
- # Call the infer function
109
- logging.debug(f"Calling infer with input: {temp_input_path}, output: {output_path}")
110
- result_array, plate_texts = infer(temp_input_path, output_path)
111
-
112
- if result_array is None and is_image_file(temp_input_path):
113
- error_msg = f"Error: Processing failed for {temp_input_path}. 'infer' returned None. Check infer.py logs for details."
114
- logging.error(error_msg)
115
- return None, None, error_msg, preview_input_path if is_image_file(temp_input_path) else None, preview_input_path if not is_image_file(temp_input_path) else None
116
-
117
- # Validate output file for videos
118
- if not is_image_file(temp_input_path):
119
- if not os.path.exists(output_path):
120
- error_msg = f"Error: Output video file {output_path} was not created."
121
- logging.error(error_msg)
122
- return None, None, error_msg, None, preview_input_path
123
- # Convert output video to supported format
124
- converted_output_path = os.path.join(output_dir, f"converted_{os.path.basename(output_path)}")
125
- converted_path = convert_to_supported_format(output_path, converted_output_path)
126
- if converted_path is None:
127
- error_msg = f"Error: Failed to convert output video {output_path} to supported format."
128
- logging.error(error_msg)
129
- return None, None, error_msg, None, preview_input_path
130
- output_path = converted_path
131
-
132
- # Format plate texts
133
- if is_image_file(temp_input_path):
134
- formatted_texts = "\n".join(plate_texts) if plate_texts else "No plates detected"
135
- logging.debug(f"Image processed successfully. Plate texts: {formatted_texts}")
136
- return result_array, None, formatted_texts, preview_input_path, None
137
- else:
138
- formatted_texts = []
139
- for i, texts in enumerate(plate_texts):
140
- if texts:
141
- formatted_texts.append(f"Frame {i+1}: {', '.join(texts)}")
142
- formatted_texts = "\n".join(formatted_texts) if formatted_texts else "No plates detected"
143
- logging.debug(f"Video processed successfully. Plate texts: {formatted_texts}")
144
- return None, output_path, formatted_texts, None, preview_input_path
145
- except Exception as e:
146
- error_message = f"Error processing {file_path}: {str(e)}\n{traceback.format_exc()}"
147
- logging.error(error_message)
148
- print(error_message)
149
- return None, None, error_message, preview_input_path if is_image_file(file_path) else None, preview_input_path if not is_image_file(file_path) else None
150
- finally:
151
- # Clean up temp directory after processing, but keep preview directory
152
- if os.path.exists(temp_input_dir):
153
- shutil.rmtree(temp_input_dir, ignore_errors=True)
154
- logging.debug(f"Cleaned up temporary directory: {temp_input_dir}")
155
-
156
- def update_preview(file, input_type):
157
- """Return file path for the appropriate preview component based on input type."""
158
- if not file:
159
- logging.debug("No file provided for preview.")
160
- return None, None
161
-
162
- # Handle both file objects and string paths
163
- file_path = file.name if hasattr(file, 'name') else file
164
- logging.debug(f"Updating preview for {input_type}: {file_path}")
165
-
166
- # Verify file exists
167
- if not os.path.exists(file_path):
168
- logging.error(f"Input file {file_path} does not exist.")
169
- return None, None
170
-
171
- # Check if video format is supported
172
- if input_type == "Video" and not file_path.lower().endswith(('.mp4', '.webm')):
173
- logging.error(f"Unsupported video format for {file_path}. Use MP4 or WebM.")
174
- return None, None
175
-
176
- # Copy to preview directory for persistent display
177
- unique_id = str(uuid.uuid4())[:8]
178
- preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
179
- os.makedirs(preview_dir, exist_ok=True)
180
- preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
181
- try:
182
- shutil.copy2(file_path, preview_input_path)
183
- os.chmod(preview_input_path, 0o644)
184
- logging.debug(f"Copied preview file to: {preview_input_path}")
185
- except Exception as e:
186
- logging.error(f"Error copying preview file to {preview_input_path}: {str(e)}")
187
- return None, None
188
-
189
- return preview_input_path if input_type == "Image" else None, preview_input_path if input_type == "Video" else None
190
-
191
- def update_visibility(input_type):
192
- """Update visibility of input/output components based on input type."""
193
- logging.debug(f"Updating visibility for input type: {input_type}")
194
- is_image = input_type == "Image"
195
- is_video = input_type == "Video"
196
- return (
197
- gr.update(visible=is_image),
198
- gr.update(visible=is_video),
199
- gr.update(visible=is_image),
200
- gr.update(visible=is_video)
201
- )
202
-
203
- def clear_preview_data():
204
- """Clear all files in the preview_data directory."""
205
- preview_data_dir = os.path.abspath("apps/gradio_app/preview_data")
206
- if os.path.exists(preview_data_dir):
207
- shutil.rmtree(preview_data_dir, ignore_errors=True)
208
- logging.debug(f"Cleared preview_data directory: {preview_data_dir}")
209
  os.makedirs(preview_data_dir, exist_ok=True)
 
1
+ import os
2
+ import sys
3
+ import shutil
4
+ import traceback
5
+ import logging
6
+ import gradio as gr
7
+ import uuid
8
+ import cv2
9
+ import time
10
+ from gradio_app.utils import convert_to_supported_format
11
+
12
+ # Adjust sys.path to include the src directory
13
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'src', 'license_plate_detector_ocr')))
14
+ from infer import infer, is_image_file
15
+
16
+ def gradio_process(model_path, input_file, input_type):
17
+ """Process the input file (image or video) for license plate detection and OCR."""
18
+ unique_id = str(uuid.uuid4())[:8]
19
+ temp_input_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", unique_id))
20
+ preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
21
+ try:
22
+ file_path = input_file.name if hasattr(input_file, 'name') else input_file
23
+ logging.debug(f"Input file path: {file_path}")
24
+ print(f"Input file path: {file_path}")
25
+
26
+ # Verify source file exists and is readable
27
+ if not os.path.exists(file_path):
28
+ error_msg = f"Error: Source file {file_path} does not exist."
29
+ logging.error(error_msg)
30
+ return None, None, error_msg, None, None
31
+ if not os.access(file_path, os.R_OK):
32
+ error_msg = f"Error: Source file {file_path} is not readable."
33
+ logging.error(error_msg)
34
+ return None, None, error_msg, None, None
35
+
36
+ # Create unique temp and preview directories
37
+ os.makedirs(temp_input_dir, exist_ok=True)
38
+ os.makedirs(preview_dir, exist_ok=True)
39
+ temp_input_path = os.path.join(temp_input_dir, os.path.basename(file_path))
40
+ preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
41
+
42
+ # Copy input file to temp and preview directories with retry
43
+ max_retries = 3
44
+ for attempt in range(max_retries):
45
+ try:
46
+ shutil.copy2(file_path, temp_input_path) # Copy to temp for processing
47
+ shutil.copy2(file_path, preview_input_path) # Copy to preview for display
48
+ os.chmod(temp_input_path, 0o644)
49
+ os.chmod(preview_input_path, 0o644)
50
+ logging.debug(f"Copied input file to: {temp_input_path} and {preview_input_path}")
51
+ break
52
+ except Exception as e:
53
+ if attempt == max_retries - 1:
54
+ error_msg = f"Error copying file {file_path} to {temp_input_path} or {preview_input_path} after {max_retries} attempts: {str(e)}"
55
+ logging.error(error_msg)
56
+ return None, None, error_msg, None, None
57
+ time.sleep(0.5) # Brief delay before retry
58
+
59
+ # Verify copied files
60
+ for path in [temp_input_path, preview_input_path]:
61
+ if not os.path.exists(path):
62
+ error_msg = f"Error: Copied file {path} does not exist."
63
+ logging.error(error_msg)
64
+ return None, None, error_msg, None, None
65
+ if not os.access(path, os.R_OK):
66
+ error_msg = f"Error: Copied file {path} is not readable."
67
+ logging.error(error_msg)
68
+ return None, None, error_msg, None, None
69
+ if os.path.getsize(path) == 0:
70
+ error_msg = f"Error: Copied file {path} is empty."
71
+ logging.error(error_msg)
72
+ return None, None, error_msg, None, None
73
+
74
+ # Validate image or video
75
+ if is_image_file(temp_input_path):
76
+ img = cv2.imread(temp_input_path)
77
+ if img is None:
78
+ error_msg = f"Error: Could not load image from {temp_input_path}."
79
+ logging.error(error_msg)
80
+ return None, None, error_msg, None, None
81
+ # Check image properties
82
+ height, width, channels = img.shape
83
+ logging.debug(f"Image properties: {width}x{height}, {channels} channels")
84
+ if channels not in (1, 3, 4):
85
+ error_msg = f"Error: Unsupported number of channels ({channels}) in {temp_input_path}. Expected 1, 3, or 4."
86
+ logging.error(error_msg)
87
+ return None, None, error_msg, None, None
88
+ if width == 0 or height == 0:
89
+ error_msg = f"Error: Invalid image dimensions ({width}x{height}) in {temp_input_path}."
90
+ logging.error(error_msg)
91
+ return None, None, error_msg, None, None
92
+ else:
93
+ cap = cv2.VideoCapture(temp_input_path)
94
+ if not cap.isOpened():
95
+ error_msg = f"Error: Could not open video at {temp_input_path}."
96
+ logging.error(error_msg)
97
+ cap.release()
98
+ return None, None, error_msg, None, None
99
+ cap.release()
100
+
101
+ # Set output path
102
+ output_dir = os.path.abspath(os.path.join("apps/gradio_app/temp_data", str(uuid.uuid4())[:8]))
103
+ os.makedirs(output_dir, exist_ok=True)
104
+ output_filename = f"{os.path.splitext(os.path.basename(temp_input_path))[0]}_{unique_id}_output{'_output.jpg' if is_image_file(temp_input_path) else '_output.mp4'}"
105
+ output_path = os.path.join(output_dir, output_filename)
106
+ logging.debug(f"Output path: {output_path}")
107
+
108
+ # Call the infer function
109
+ logging.debug(f"Calling infer with input: {temp_input_path}, output: {output_path}")
110
+ result_array, plate_texts = infer(model_path, temp_input_path, output_path)
111
+
112
+ if result_array is None and is_image_file(temp_input_path):
113
+ error_msg = f"Error: Processing failed for {temp_input_path}. 'infer' returned None. Check infer.py logs for details."
114
+ logging.error(error_msg)
115
+ return None, None, error_msg, preview_input_path if is_image_file(temp_input_path) else None, preview_input_path if not is_image_file(temp_input_path) else None
116
+
117
+ # Validate output file for videos
118
+ if not is_image_file(temp_input_path):
119
+ if not os.path.exists(output_path):
120
+ error_msg = f"Error: Output video file {output_path} was not created."
121
+ logging.error(error_msg)
122
+ return None, None, error_msg, None, preview_input_path
123
+ # Convert output video to supported format
124
+ converted_output_path = os.path.join(output_dir, f"converted_{os.path.basename(output_path)}")
125
+ converted_path = convert_to_supported_format(output_path, converted_output_path)
126
+ if converted_path is None:
127
+ error_msg = f"Error: Failed to convert output video {output_path} to supported format."
128
+ logging.error(error_msg)
129
+ return None, None, error_msg, None, preview_input_path
130
+ output_path = converted_path
131
+
132
+ # Format plate texts
133
+ if is_image_file(temp_input_path):
134
+ formatted_texts = "\n".join(plate_texts) if plate_texts else "No plates detected"
135
+ logging.debug(f"Image processed successfully. Plate texts: {formatted_texts}")
136
+ return result_array, None, formatted_texts, preview_input_path, None
137
+ else:
138
+ formatted_texts = []
139
+ for i, texts in enumerate(plate_texts):
140
+ if texts:
141
+ formatted_texts.append(f"Frame {i+1}: {', '.join(texts)}")
142
+ formatted_texts = "\n".join(formatted_texts) if formatted_texts else "No plates detected"
143
+ logging.debug(f"Video processed successfully. Plate texts: {formatted_texts}")
144
+ return None, output_path, formatted_texts, None, preview_input_path
145
+ except Exception as e:
146
+ error_message = f"Error processing {file_path}: {str(e)}\n{traceback.format_exc()}"
147
+ logging.error(error_message)
148
+ print(error_message)
149
+ return None, None, error_message, preview_input_path if is_image_file(file_path) else None, preview_input_path if not is_image_file(file_path) else None
150
+ finally:
151
+ # Clean up temp directory after processing, but keep preview directory
152
+ if os.path.exists(temp_input_dir):
153
+ shutil.rmtree(temp_input_dir, ignore_errors=True)
154
+ logging.debug(f"Cleaned up temporary directory: {temp_input_dir}")
155
+
156
+ def update_preview(file, input_type):
157
+ """Return file path for the appropriate preview component based on input type."""
158
+ if not file:
159
+ logging.debug("No file provided for preview.")
160
+ return None, None
161
+
162
+ # Handle both file objects and string paths
163
+ file_path = file.name if hasattr(file, 'name') else file
164
+ logging.debug(f"Updating preview for {input_type}: {file_path}")
165
+
166
+ # Verify file exists
167
+ if not os.path.exists(file_path):
168
+ logging.error(f"Input file {file_path} does not exist.")
169
+ return None, None
170
+
171
+ # Check if video format is supported
172
+ if input_type == "Video" and not file_path.lower().endswith(('.mp4', '.webm')):
173
+ logging.error(f"Unsupported video format for {file_path}. Use MP4 or WebM.")
174
+ return None, None
175
+
176
+ # Copy to preview directory for persistent display
177
+ unique_id = str(uuid.uuid4())[:8]
178
+ preview_dir = os.path.abspath(os.path.join("apps/gradio_app/preview_data", unique_id))
179
+ os.makedirs(preview_dir, exist_ok=True)
180
+ preview_input_path = os.path.join(preview_dir, os.path.basename(file_path))
181
+ try:
182
+ shutil.copy2(file_path, preview_input_path)
183
+ os.chmod(preview_input_path, 0o644)
184
+ logging.debug(f"Copied preview file to: {preview_input_path}")
185
+ except Exception as e:
186
+ logging.error(f"Error copying preview file to {preview_input_path}: {str(e)}")
187
+ return None, None
188
+
189
+ return preview_input_path if input_type == "Image" else None, preview_input_path if input_type == "Video" else None
190
+
191
+ def update_visibility(input_type):
192
+ """Update visibility of input/output components based on input type."""
193
+ logging.debug(f"Updating visibility for input type: {input_type}")
194
+ is_image = input_type == "Image"
195
+ is_video = input_type == "Video"
196
+ return (
197
+ gr.update(visible=is_image),
198
+ gr.update(visible=is_video),
199
+ gr.update(visible=is_image),
200
+ gr.update(visible=is_video)
201
+ )
202
+
203
+ def clear_preview_data():
204
+ """Clear all files in the preview_data directory."""
205
+ preview_data_dir = os.path.abspath("apps/gradio_app/preview_data")
206
+ if os.path.exists(preview_data_dir):
207
+ shutil.rmtree(preview_data_dir, ignore_errors=True)
208
+ logging.debug(f"Cleared preview_data directory: {preview_data_dir}")
209
  os.makedirs(preview_data_dir, exist_ok=True)