.gitattributes CHANGED
@@ -34,4 +34,3 @@ saved_model/**/* filter=lfs diff=lfs merge=lfs -text
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
36
  *.png filter=lfs diff=lfs merge=lfs -text
37
- *.pdf filter=lfs diff=lfs merge=lfs -text
 
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
36
  *.png filter=lfs diff=lfs merge=lfs -text
 
.gitignore CHANGED
@@ -20,4 +20,3 @@ Desktop.ini
20
  *.bak
21
  *.tmp
22
  *.old
23
- assets/tmp/
 
20
  *.bak
21
  *.tmp
22
  *.old
 
Dockerfile CHANGED
@@ -19,19 +19,14 @@ RUN pip install -r requirements.txt
19
  COPY --chown=user:user main.py .
20
  COPY --chown=user:user modules/ ./modules/
21
  COPY --chown=user:user config.cfg .
 
22
 
23
  # ---------- Copy examples folder explicitly ----------
24
  COPY --chown=user:user examples/ ./examples/
25
 
26
- # ---------- Copy assets folder ----------
27
- COPY --chown=user:user assets/ ./assets/
28
-
29
  # ---------- Ensure proper permissions for examples folder ----------
30
  RUN chmod -R 755 examples/
31
 
32
- # ---------- Ensure proper permissions for assets folder ----------
33
- RUN chmod -R 755 assets/
34
-
35
  # Switch to non-root user
36
  USER user
37
 
 
19
  COPY --chown=user:user main.py .
20
  COPY --chown=user:user modules/ ./modules/
21
  COPY --chown=user:user config.cfg .
22
+ COPY --chown=user:user product_classification.csv .
23
 
24
  # ---------- Copy examples folder explicitly ----------
25
  COPY --chown=user:user examples/ ./examples/
26
 
 
 
 
27
  # ---------- Ensure proper permissions for examples folder ----------
28
  RUN chmod -R 755 examples/
29
 
 
 
 
30
  # Switch to non-root user
31
  USER user
32
 
assets/fsc-logo-black-and-white.jpg DELETED
Binary file (35.5 kB)
 
assets/label_title_mapping.csv DELETED
@@ -1,18 +0,0 @@
1
- language,category_100,category_mix,category_recycled
2
- german,100%,MIX,RECYCLED
3
- french,100%,MIXTE,RECYCLÉ
4
- spanish,100%,MIXTO,RECICLADO
5
- turkish,100%,KARMA,GERİKAZANILMIŞ
6
- romanian,100%,MIX,RECICLAT
7
- polish,100%,MIESZANY,RECYKLING
8
- portugese,100%,MISTO,RECICLADO
9
- norwegian,100%,MIX,RECYCLED
10
- swedish,100%,MIX,Återvunnet
11
- danish,100%,MIX,RECYCLED
12
- finnish,100%,MIX,KIERRÄTETTY
13
- chinese_simplified,100%,混合产品,再生产品
14
- chinese_traditional,100%,混合產品,再生產品
15
- russian,100%,МИКС,ВТОРИЧНЫЙ
16
- greek,100%,ΜΕΙΓΜΑ,ΑΝΑΚΥΚΛΩΜΕΝΟ
17
- japanese,100%,ミックス,リサイクル
18
- serbian,100%,МЕШАВИНА,РЕЦИКЛИРАНО
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
config.cfg CHANGED
@@ -1,8 +1,10 @@
1
  [ocr]
2
- MODEL = o4-mini
 
3
 
4
  [scope]
5
- MODEL = o4-mini
 
6
  TEMPERATURE = 0
7
 
8
  [gradio]
 
1
  [ocr]
2
+ MODEL = gpt-4.1
3
+ GUIDANCE_IMAGE = guidance.png
4
 
5
  [scope]
6
+ TAXONOMY_CSV = product_classification.csv
7
+ MODEL = gpt-5
8
  TEMPERATURE = 0
9
 
10
  [gradio]
examples/{Zeit_ dass sich was dreht-Gruene 2025-2026.pdf → 001_Poessneck_SV_48_4C+INK 5.pdf_7dde9edfbdb943ba83ff79ca0a1255ab_extracted_logo_page_1.png} RENAMED
File without changes
examples/{FSC NUTZUNG WEBSITE.pdf → 11232900 Ningbo Guanyi_FSC Hangtag 20250526_extracted_logo_page_2.png} RENAMED
File without changes
examples/{151897_rating label preview.pdf → Bottom of the bags casa 12x36x12_extracted_logo_page_1.png} RENAMED
File without changes
assets/guidance.png → guidance.png RENAMED
File without changes
main.py CHANGED
@@ -1,32 +1,22 @@
1
  import os
2
  import logging
 
 
3
  import dotenv
4
  import gradio as gr
 
 
5
  import configparser
6
  from pathlib import Path
 
 
7
 
8
  # Import from modules
 
 
 
9
  from modules.image_processing import pdf_first_page_to_image
10
- from modules.pipeline import process_image_progressive
11
-
12
- # Load environment variables
13
- dotenv.load_dotenv()
14
-
15
- # ============================ Setup ============================================
16
- # Configure logging
17
- logging.basicConfig(
18
- level=logging.INFO,
19
- format='%(asctime)s - %(levelname)s - %(message)s',
20
- datefmt='%H:%M:%S'
21
- )
22
- logger = logging.getLogger(__name__)
23
-
24
- # Load configuration
25
- config = configparser.ConfigParser()
26
- config.read('config.cfg')
27
-
28
-
29
- # ============================ Gradio setup ============================================
30
 
31
  APP_DIR = Path(__file__).resolve().parent
32
  EXAMPLES_DIR = Path(os.environ.get("EXAMPLES_DIR", APP_DIR / "examples")).resolve()
@@ -47,25 +37,281 @@ def load_example_images():
47
  return files
48
 
49
  # when updating components programmatically in Gradio 4, prefer .update(...)
50
- def show_image_preview(file_path):
51
- """Show preview of any image file (example or user upload)."""
52
- if file_path:
53
- # Handle PDF files by converting to image for thumbnail display
54
- if file_path.lower().endswith('.pdf'):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  try:
56
- # Convert PDF to image for thumbnail display
57
- thumbnail_path = pdf_first_page_to_image(file_path, max_long_edge=400, fmt="PNG")
58
- return file_path, thumbnail_path # Return (original_file, thumbnail_image)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
  except Exception as e:
60
- logger.error(f"Failed to create thumbnail for PDF {file_path}: {e}")
61
- return file_path, None # Return original file, no thumbnail
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  else:
63
- # For image files, use the file directly for both
64
- return file_path, file_path
65
- else:
66
- return None, None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
 
68
- # ============================ Gradio UI ============================================
 
 
 
 
 
69
 
70
  def create_gradio_interface():
71
  """Create and return the Gradio interface with progressive updates."""
@@ -85,7 +331,7 @@ def create_gradio_interface():
85
  gr.Markdown("### Upload Your Own Image")
86
  image_input = gr.File(
87
  label="Upload Product Image",
88
- file_types=["image", ".pdf", ".PDF", ".Pdf", ".pDF"],
89
  type="filepath"
90
  )
91
 
@@ -126,23 +372,9 @@ def create_gradio_interface():
126
  # Output components with formatted section headers
127
  gr.Markdown("## Analysis Results")
128
 
129
- gr.Markdown("### FSC Logo Data Extraction")
130
  fsc_license_output = gr.Textbox(
131
- label="FSC License Code",
132
- lines=1,
133
- interactive=False,
134
- container=True
135
- )
136
-
137
- label_title_output = gr.Textbox(
138
- label="Label Title",
139
- lines=1,
140
- interactive=False,
141
- container=True
142
- )
143
-
144
- product_type_output = gr.Textbox(
145
- label="Product Type",
146
  lines=1,
147
  interactive=False,
148
  container=True
@@ -155,7 +387,7 @@ def create_gradio_interface():
155
  container=True
156
  )
157
 
158
- gr.Markdown("### Image Analysis")
159
  product_summary_output = gr.Textbox(
160
  label="Product Description",
161
  lines=3,
@@ -171,68 +403,14 @@ def create_gradio_interface():
171
  )
172
 
173
  taxonomy_output = gr.Textbox(
174
- label="Taxonomy Matches",
175
- lines=10,
176
- interactive=False,
177
- container=True
178
- )
179
- image_text_output = gr.Textbox(
180
- label="Image Text",
181
- lines=15,
182
- interactive=False,
183
- container=True
184
- )
185
- fsc_text_strings_output = gr.Textbox(
186
- label="FSC Text Strings Identified (searching for FSC or Forest Stewardship Council)",
187
  lines=10,
188
  interactive=False,
189
  container=True
190
  )
191
- gr.Markdown("### Scope Check")
192
-
193
- overall_scope_check_output = gr.Textbox(
194
- label="Overall Compliance:",
195
- lines=2,
196
- interactive=False,
197
- container=True
198
- )
199
- materials_check_output = gr.Textbox(
200
- label="Product Type: Materials Check",
201
- lines=2,
202
- interactive=False,
203
- container=True
204
- )
205
-
206
- taxonomy_check_output = gr.Textbox(
207
- label="Product Type: Taxonomy Check",
208
- lines=2,
209
- interactive=False,
210
- container=True
211
- )
212
-
213
- label_title_check_output = gr.Textbox(
214
- label="Label Title Check",
215
- lines=2,
216
- interactive=False,
217
- container=True
218
- )
219
-
220
- materials_check_rationale_output = gr.Textbox(
221
- label="Rationale: Materials Check",
222
- lines=5,
223
- interactive=False,
224
- container=True
225
- )
226
-
227
- taxonomy_check_rationale_output = gr.Textbox(
228
- label="Rationale: Taxonomy Check",
229
- lines=5,
230
- interactive=False,
231
- container=True
232
- )
233
 
234
  gr.Markdown("### Processing Information")
235
- cost_info_output = gr.Textbox(
236
  label="Resources",
237
  lines=3,
238
  interactive=False,
@@ -243,22 +421,15 @@ def create_gradio_interface():
243
  if example_images:
244
  # Update thumbnail and load image when dropdown selection changes
245
  example_dropdown.change(
246
- fn=show_image_preview,
247
  inputs=[example_dropdown],
248
  outputs=[image_input, example_thumbnail]
249
  )
250
 
251
- # Update thumbnail when user uploads a file
252
- image_input.change(
253
- fn=show_image_preview,
254
- inputs=[image_input],
255
- outputs=[image_input, example_thumbnail]
256
- )
257
-
258
  process_btn.click(
259
- fn=process_image_progressive,
260
  inputs=[image_input],
261
- outputs=[fsc_license_output, label_title_output, product_type_output, license_status_output, product_summary_output, material_output, taxonomy_output, image_text_output, fsc_text_strings_output, overall_scope_check_output, materials_check_output, taxonomy_check_output, label_title_check_output, materials_check_rationale_output, taxonomy_check_rationale_output, cost_info_output]
262
  )
263
 
264
  return demo
@@ -267,6 +438,10 @@ def main():
267
  """Main function to launch the Gradio interface."""
268
  logger.info("Starting FSC Product Classification Tool with Gradio interface")
269
 
 
 
 
 
270
  if not os.getenv("OPENAI_API_KEY"):
271
  logger.error("OPENAI_API_KEY environment variable not set. Please set your OpenAI API key.")
272
  return
 
1
  import os
2
  import logging
3
+ import pandas as pd
4
+ import openai
5
  import dotenv
6
  import gradio as gr
7
+ import tempfile
8
+ import json
9
  import configparser
10
  from pathlib import Path
11
+ from typing import Optional, Dict, List, Tuple
12
+ import time
13
 
14
  # Import from modules
15
+ from modules.pdf_processing import get_crop_from_pdf
16
+ from modules.llm_pipeline import extract_fsc_logo_and_taxonomy, build_taxonomy_block_from_dataframe
17
+ from modules.fsc_api import get_fsc_products_for_license, filter_taxonomy_by_fsc_products
18
  from modules.image_processing import pdf_first_page_to_image
19
+ from modules.ocr_extraction import extract_fsc_logo_data_single
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
  APP_DIR = Path(__file__).resolve().parent
22
  EXAMPLES_DIR = Path(os.environ.get("EXAMPLES_DIR", APP_DIR / "examples")).resolve()
 
37
  return files
38
 
39
  # when updating components programmatically in Gradio 4, prefer .update(...)
40
+ def select_example_image(example_path):
41
+ return example_path if example_path else None
42
+
43
+ def show_example_thumbnail(example_path):
44
+ return example_path if example_path else None
45
+
46
+ def select_and_show_example(example_path):
47
+ """Select example image and show thumbnail in one function."""
48
+ if example_path:
49
+ return example_path, example_path # Return (image_input, thumbnail)
50
+ else:
51
+ return None, None
52
+
53
+ # Load environment variables
54
+ dotenv.load_dotenv()
55
+
56
+ # ============================ Setup ============================================
57
+ # Configure logging
58
+ logging.basicConfig(
59
+ level=logging.INFO,
60
+ format='%(asctime)s - %(levelname)s - %(message)s',
61
+ datefmt='%H:%M:%S'
62
+ )
63
+ logger = logging.getLogger(__name__)
64
+
65
+ # Load configuration
66
+ config = configparser.ConfigParser()
67
+ config.read('config.cfg')
68
+
69
+ # Expect your API key in environment: OPENAI_API_KEY
70
+ client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
71
+
72
+ # Model configuration
73
+ OCR_MODEL = config.get('ocr', 'MODEL')
74
+ SCOPE_MODEL = config.get('scope', 'MODEL')
75
+
76
+ # File paths
77
+ GUIDANCE_IMAGE = config.get('ocr', 'GUIDANCE_IMAGE')
78
+ TAXONOMY_CSV = config.get('scope', 'TAXONOMY_CSV')
79
+
80
+ # Load taxonomy once at startup
81
+ try:
82
+ taxonomy_df = pd.read_csv(TAXONOMY_CSV)
83
+ logger.info(f"Loaded taxonomy with {len(taxonomy_df)} categories")
84
+ except Exception as e:
85
+ logger.error(f"Failed to load taxonomy: {e}")
86
+ taxonomy_df = pd.DataFrame()
87
+
88
+ def step1_extract_fsc_code(image_file) -> Tuple[str, str, str, str, str, str]:
89
+ """Step 1: Extract FSC license code using OCR."""
90
+ if image_file is None:
91
+ return "Please upload an image file.", "⏳ Waiting for image...", "⏳ Waiting for image...", "⏳ Waiting for image...", "⏳ Waiting for image...", ""
92
+
93
+ try:
94
+ # Handle the uploaded file
95
+ if isinstance(image_file, str):
96
+ image_path = image_file
97
+ else:
98
+ image_path = image_file.name
99
+
100
+ logger.info(f"Processing image: {image_path}")
101
+
102
+ # Convert PDF to image if needed
103
+ cleanup_png = False
104
+ try:
105
+ if image_path.lower().endswith(".pdf"):
106
+ img_path = pdf_first_page_to_image(image_path)
107
+ cleanup_png = True
108
+ logger.info("Converted PDF to image")
109
+ elif image_path.lower().endswith((".png", ".jpg", ".jpeg", ".webp")):
110
+ img_path = image_path
111
+ else:
112
+ return "Unsupported file format. Please upload PDF, PNG, JPG, JPEG, or WEBP files.", "", "", "", "", ""
113
+
114
+ # Step 1: Extract FSC license code using OCR extraction
115
+ logger.info("Step 1: Extracting FSC license code using OCR...")
116
+
117
+ # Initialize token and cost tracking
118
+ total_tokens = {"input": 0, "output": 0}
119
+ total_cost = [0.0]
120
+
121
  try:
122
+ # Use the single-file OCR extraction function
123
+ ocr_data = extract_fsc_logo_data_single(
124
+ img_path,
125
+ client,
126
+ model_name=OCR_MODEL,
127
+ total_tokens=total_tokens,
128
+ total_cost=total_cost
129
+ )
130
+
131
+ # Extract FSC license code from OCR results
132
+ extracted_fsc_license_code = None
133
+ if ocr_data.get("fsc_data") and len(ocr_data["fsc_data"]) > 0:
134
+ fsc_entry = ocr_data["fsc_data"][0]
135
+ extracted_fsc_license_code = fsc_entry.get("fsc_license_code")
136
+ if extracted_fsc_license_code:
137
+ # Ensure it has the FSC- prefix
138
+ if not extracted_fsc_license_code.startswith("FSC-"):
139
+ extracted_fsc_license_code = f"FSC-{extracted_fsc_license_code}"
140
+
141
+ logger.info(f"OCR extracted FSC license code: {extracted_fsc_license_code}")
142
+
143
+ fsc_code_result = extracted_fsc_license_code or "Not found"
144
+
145
+ return fsc_code_result, "⏳ Step 2: Looking up FSC license...", "⏳ Step 2: Looking up FSC license...", "⏳ Step 2: Looking up FSC license...", "⏳ Step 2: Looking up FSC license...", ""
146
+
147
  except Exception as e:
148
+ logger.error(f"OCR extraction failed: {e}")
149
+ return f"Error in OCR extraction: {str(e)}", "", "", "", "", ""
150
+
151
+ finally:
152
+ # Clean up temporary PNG if created
153
+ if cleanup_png and 'img_path' in locals():
154
+ try:
155
+ os.remove(img_path)
156
+ logger.debug(f"Cleaned up temporary PNG: {img_path}")
157
+ except OSError:
158
+ logger.warning(f"Failed to clean up temporary PNG: {img_path}")
159
+
160
+ except Exception as e:
161
+ logger.error(f"Error processing image: {e}")
162
+ return f"Error processing image: {str(e)}", "", "", "", "", ""
163
+
164
+ def step2_fsc_lookup(image_file, fsc_code) -> Tuple[str, str, str, str, str, str]:
165
+ """Step 2: Lookup FSC license in API and get scope."""
166
+ if image_file is None:
167
+ return fsc_code, "Please upload an image file.", "", "", "", ""
168
+
169
+ if not fsc_code or fsc_code == "Not found":
170
+ return fsc_code, "No FSC code found - skipping API lookup", "", "", "", ""
171
+
172
+ try:
173
+ logger.info(f"Step 2: Looking up FSC license: {fsc_code}")
174
+
175
+ fsc_products, license_status = get_fsc_products_for_license(fsc_code)
176
+
177
+ return fsc_code, license_status or "Unknown", "⏳ Step 3: Analyzing product...", "⏳ Step 3: Analyzing product...", "⏳ Step 3: Analyzing product...", ""
178
+
179
+ except Exception as e:
180
+ logger.error(f"Error in FSC lookup: {e}")
181
+ return fsc_code, "Error", "⏳ Step 3: Analyzing product...", "⏳ Step 3: Analyzing product...", "⏳ Step 3: Analyzing product...", ""
182
+
183
+ def step3_analyze_with_filtered_taxonomy(image_file, fsc_code, fsc_products, license_status) -> Tuple[str, str, str, str, str, str]:
184
+ """Step 3: Analyze product with FSC-filtered taxonomy."""
185
+ if image_file is None:
186
+ return fsc_code, "Please upload an image file.", "", "", "", ""
187
+
188
+ try:
189
+ # Handle the uploaded file
190
+ if isinstance(image_file, str):
191
+ image_path = image_file
192
  else:
193
+ image_path = image_file.name
194
+
195
+ # Convert PDF to image if needed
196
+ cleanup_png = False
197
+ try:
198
+ if image_path.lower().endswith(".pdf"):
199
+ img_path = pdf_first_page_to_image(image_path)
200
+ cleanup_png = True
201
+ elif image_path.lower().endswith((".png", ".jpg", ".jpeg", ".webp")):
202
+ img_path = image_path
203
+ else:
204
+ return fsc_code, license_status or "Unknown", "Unsupported file format.", "", "", ""
205
+
206
+ # Step 3: Filter taxonomy and analyze
207
+ logger.info("Step 3: Analyzing product with filtered taxonomy...")
208
+
209
+ # Determine which taxonomy to use
210
+ if fsc_code and fsc_code != "Not found" and license_status == 'Valid' and fsc_products:
211
+ # Use filtered taxonomy based on FSC products
212
+ filtered_taxonomy = filter_taxonomy_by_fsc_products(taxonomy_df, fsc_products)
213
+ if not filtered_taxonomy.empty:
214
+ taxonomy_block = build_taxonomy_block_from_dataframe(filtered_taxonomy, include_level3=True)
215
+ taxonomy_source = "FSC-filtered"
216
+ else:
217
+ taxonomy_block = build_taxonomy_block_from_dataframe(taxonomy_df, include_level3=True)
218
+ taxonomy_source = "full (no FSC matches found)"
219
+ else:
220
+ # Use full taxonomy
221
+ taxonomy_block = build_taxonomy_block_from_dataframe(taxonomy_df, include_level3=True)
222
+ taxonomy_source = "full"
223
+
224
+ logger.info(f"Using {taxonomy_source} taxonomy for analysis")
225
+
226
+ # Initialize token and cost tracking
227
+ total_tokens = {"input": 0, "output": 0}
228
+ total_cost = [0.0]
229
+
230
+ result = extract_fsc_logo_and_taxonomy(
231
+ img_path,
232
+ taxonomy_block,
233
+ original_filename=os.path.basename(image_path),
234
+ client=client,
235
+ model_name=SCOPE_MODEL,
236
+ total_tokens=total_tokens,
237
+ total_cost=total_cost
238
+ )
239
+
240
+ # Format results
241
+ product_summary = result.get("product_summary", "No product summary available")
242
+ material = result.get("inferred_material", "No material information available")
243
+
244
+ # Format taxonomy results
245
+ taxonomy_matches = result.get("taxonomy_matches", [])
246
+ has_match = result.get("has_taxonomy_match", False)
247
+
248
+ taxonomy_results = ""
249
+ if has_match and taxonomy_matches:
250
+ for i, match in enumerate(taxonomy_matches[:3], 1): # Show top 3 matches
251
+ taxonomy_results += f"Match {i}:\n"
252
+ if match.get("level1_code") and match.get("level1_name"):
253
+ taxonomy_results += f"Level 1: {match['level1_code']} - {match['level1_name']}\n"
254
+ if match.get("level2_code") and match.get("level2_name"):
255
+ taxonomy_results += f"Level 2: {match['level2_code']} - {match['level2_name']}\n"
256
+ if match.get("confidence"):
257
+ taxonomy_results += f"Confidence: {match['confidence']:.2f}\n"
258
+ if match.get("rationale"):
259
+ taxonomy_results += f"Rationale: {match['rationale']}\n"
260
+ taxonomy_results += "\n"
261
+ else:
262
+ taxonomy_results += "Taxonomy Classification: No matching categories found"
263
+
264
+ cost_info = f"""
265
+ Total Cost: ${total_cost[0]:.4f}
266
+ Models Used: {OCR_MODEL} (OCR), {SCOPE_MODEL} (Analysis)
267
+ Token Usage: {total_tokens['input']} input + {total_tokens['output']} output tokens"""
268
+
269
+ return fsc_code, license_status or "Unknown", product_summary, material, taxonomy_results, cost_info
270
+
271
+ finally:
272
+ # Clean up temporary PNG if created
273
+ if cleanup_png and 'img_path' in locals():
274
+ try:
275
+ os.remove(img_path)
276
+ logger.debug(f"Cleaned up temporary PNG: {img_path}")
277
+ except OSError:
278
+ logger.warning(f"Failed to clean up temporary PNG: {img_path}")
279
+
280
+ except Exception as e:
281
+ logger.error(f"Error in step 3: {e}")
282
+ return fsc_code, license_status or "Unknown", f"Error in analysis: {str(e)}", "", "", ""
283
+
284
+ def process_single_image_progressive(image_file):
285
+ """Process image with progressive updates showing each step."""
286
+ if image_file is None:
287
+ return "Please upload an image file.", "⏳ Waiting for image...", "⏳ Waiting for image...", "⏳ Waiting for image...", "⏳ Waiting for image...", ""
288
+ if image_file.lower().endswith(".pdf"):
289
+ image_file = get_crop_from_pdf(image_file)
290
+
291
+ if image_file is None:
292
+ return "Please upload an image file.", "⏳ Waiting for image...", "⏳ Waiting for image...", "⏳ Waiting for image...", "⏳ Waiting for image...", ""
293
+
294
+ # Step 1: Extract FSC code
295
+ fsc_code, _, _, _, _, _ = step1_extract_fsc_code(image_file)
296
+ yield fsc_code, "⏳ Step 2: Looking up FSC license...", "⏳ Step 2: Looking up FSC license...", "⏳ Step 2: Looking up FSC license...", "⏳ Step 2: Looking up FSC license...", ""
297
+
298
+ # Step 2: FSC lookup
299
+ fsc_code, license_status, _, _, _, _ = step2_fsc_lookup(image_file, fsc_code)
300
+ yield fsc_code, license_status, "⏳ Step 3: Analyzing product...", "⏳ Step 3: Analyzing product...", "⏳ Step 3: Analyzing product...", ""
301
+
302
+ # Check if license status is valid before proceeding to step 3
303
+ if license_status != 'Valid':
304
+ # Skip step 3 if license is not valid
305
+ error_message = f"Analysis stopped: License status is '{license_status}', not 'Valid'. Cannot proceed with product analysis."
306
+ yield fsc_code, license_status, error_message, "", "Analysis skipped due to invalid license status", ""
307
+ return
308
 
309
+ # Step 3: Analyze with filtered taxonomy
310
+ # We need to get the FSC products and license status from step 2
311
+ fsc_products, license_status = get_fsc_products_for_license(fsc_code) if fsc_code and fsc_code != "Not found" else ([], None)
312
+
313
+ fsc_code, license_status, product_summary, material, taxonomy_results, cost_info = step3_analyze_with_filtered_taxonomy(image_file, fsc_code, fsc_products, license_status)
314
+ yield fsc_code, license_status, product_summary, material, taxonomy_results, cost_info
315
 
316
  def create_gradio_interface():
317
  """Create and return the Gradio interface with progressive updates."""
 
331
  gr.Markdown("### Upload Your Own Image")
332
  image_input = gr.File(
333
  label="Upload Product Image",
334
+ file_types=["image", ".pdf"],
335
  type="filepath"
336
  )
337
 
 
372
  # Output components with formatted section headers
373
  gr.Markdown("## Analysis Results")
374
 
375
+ gr.Markdown("### FSC License Information")
376
  fsc_license_output = gr.Textbox(
377
+ label="Extracted FSC License Code",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
378
  lines=1,
379
  interactive=False,
380
  container=True
 
387
  container=True
388
  )
389
 
390
+ gr.Markdown("### Product Analysis")
391
  product_summary_output = gr.Textbox(
392
  label="Product Description",
393
  lines=3,
 
403
  )
404
 
405
  taxonomy_output = gr.Textbox(
406
+ label="Scope Check",
 
 
 
 
 
 
 
 
 
 
 
 
407
  lines=10,
408
  interactive=False,
409
  container=True
410
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
 
412
  gr.Markdown("### Processing Information")
413
+ fsc_info_output = gr.Textbox(
414
  label="Resources",
415
  lines=3,
416
  interactive=False,
 
421
  if example_images:
422
  # Update thumbnail and load image when dropdown selection changes
423
  example_dropdown.change(
424
+ fn=select_and_show_example,
425
  inputs=[example_dropdown],
426
  outputs=[image_input, example_thumbnail]
427
  )
428
 
 
 
 
 
 
 
 
429
  process_btn.click(
430
+ fn=process_single_image_progressive,
431
  inputs=[image_input],
432
+ outputs=[fsc_license_output, license_status_output, product_summary_output, material_output, taxonomy_output, fsc_info_output]
433
  )
434
 
435
  return demo
 
438
  """Main function to launch the Gradio interface."""
439
  logger.info("Starting FSC Product Classification Tool with Gradio interface")
440
 
441
+ if taxonomy_df.empty:
442
+ logger.error("Failed to load taxonomy. Please check that product_classification.csv exists.")
443
+ return
444
+
445
  if not os.getenv("OPENAI_API_KEY"):
446
  logger.error("OPENAI_API_KEY environment variable not set. Please set your OpenAI API key.")
447
  return
modules/{database_lookup.py → fsc_api.py} RENAMED
@@ -1,5 +1,6 @@
1
  import os
2
  import json
 
3
  import logging
4
  from typing import List, Optional, Dict
5
  import pandas as pd
@@ -60,6 +61,20 @@ def call_fsc_api(license_code: str, api_base_url: str = None, api_key: str = Non
60
  logger.error(f"Error parsing JSON response for {license_code}: {e}")
61
  return None
62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
 
64
  def get_fsc_products_for_license(license_code: str, api_base_url: str = None, api_key: str = None) -> tuple[List[Dict], Optional[str]]:
65
  """
@@ -75,18 +90,18 @@ def get_fsc_products_for_license(license_code: str, api_base_url: str = None, ap
75
  """
76
  api_response = call_fsc_api(license_code, api_base_url, api_key)
77
 
78
- # Check if API call failed
79
- if api_response is None:
80
- logger.warning(f"API call failed for {license_code} - license not found in FSC database")
81
- return [], "Not Found"
 
82
 
83
- # Get license and certificate status
84
- license_status = api_response.get("LicenseStatus")
85
- certificate_status = api_response.get("CertificateStatus")
86
- logger.info(f"Status for {license_code} - License: {license_status}; Certificate: {certificate_status}")
87
- products = api_response.get("Products", [])
88
-
89
- return products, license_status, certificate_status
90
 
91
  # ============================ Taxonomy Filtering ================================
92
 
 
1
  import os
2
  import json
3
+ import time
4
  import logging
5
  from typing import List, Optional, Dict
6
  import pandas as pd
 
61
  logger.error(f"Error parsing JSON response for {license_code}: {e}")
62
  return None
63
 
64
+ def extract_products_from_response(api_response: Dict) -> List[Dict]:
65
+ """
66
+ Extract the "Products" list from the API response.
67
+
68
+ Args:
69
+ api_response (Dict): The API response dictionary
70
+
71
+ Returns:
72
+ List[Dict]: List of product dictionaries, or empty list if not found
73
+ """
74
+ if not api_response:
75
+ return []
76
+
77
+ return api_response.get("Products", [])
78
 
79
  def get_fsc_products_for_license(license_code: str, api_base_url: str = None, api_key: str = None) -> tuple[List[Dict], Optional[str]]:
80
  """
 
90
  """
91
  api_response = call_fsc_api(license_code, api_base_url, api_key)
92
 
93
+ # Check license status first
94
+ license_status = None
95
+ if api_response:
96
+ license_status = api_response.get("LicenseStatus")
97
+ logger.info(f"License status for {license_code}: {license_status}")
98
 
99
+ products = extract_products_from_response(api_response)
100
+
101
+ # Add a small delay to be respectful to the API
102
+ time.sleep(0.5)
103
+
104
+ return products, license_status
 
105
 
106
  # ============================ Taxonomy Filtering ================================
107
 
modules/homography_functions.py DELETED
@@ -1,333 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Helper module for extracting logo_extraction.py - get logo from PDF pages using feature matching
4
- (SIFT/ORB) and homography estimation.
5
-
6
- Key steps:
7
- - Compute keypoints/descriptors on page and template images.
8
- - Match descriptors with BFMatcher + Lowe's ratio test.
9
- - Estimate homography with RANSAC.
10
- - Return the homography matrix (M) if sufficient inliers are found.
11
- - Multiprocessing is used to run feature extraction with a timeout.
12
-
13
- Optimizations:
14
- - Removed expensive per-block multiprocessing overhead
15
- - Simplified feature extraction (no timeout handling per block)
16
- - Reduced default SIFT features for better performance
17
- - Maintained all validation logic
18
- - Added template inversion strategy
19
- - Dual matching approach (original + inverted)
20
- - Automatic selection of best match
21
- - Backward compatible with existing code
22
- """
23
-
24
- import cv2
25
- import numpy as np
26
- from typing import Any, List, Optional, Tuple
27
- import logging
28
-
29
- logger = logging.getLogger(__name__)
30
-
31
- # ==== Global Constants ====
32
- MATCH_THRESHOLD: float = 0.65
33
- RANSAC_REPROJ_THRESHOLD: float = 5.0
34
- MIN_MATCHES: int = 8
35
- # ==== End Global Constants ====
36
-
37
-
38
- def serialize_keypoints(kps: List[cv2.KeyPoint]) -> List[Tuple]:
39
- """Convert OpenCV KeyPoint objects into serializable format."""
40
- return [
41
- (kp.pt, kp.size, kp.angle, kp.response, kp.octave, kp.class_id)
42
- for kp in kps
43
- ]
44
-
45
-
46
- def deserialize_keypoints(serialized_kps: List[Tuple]) -> List[cv2.KeyPoint]:
47
- """Reconstruct OpenCV KeyPoint objects from serialized tuples."""
48
- return [
49
- cv2.KeyPoint(
50
- x=pt[0][0],
51
- y=pt[0][1],
52
- size=pt[1],
53
- angle=pt[2],
54
- response=pt[3],
55
- octave=pt[4],
56
- class_id=pt[5],
57
- )
58
- for pt in serialized_kps
59
- ]
60
-
61
-
62
- def validate_homography(M: np.ndarray, template_shape: Tuple[int, int],
63
- block_shape: Tuple[int, int]) -> bool:
64
- """Validate that homography represents a reasonable transformation."""
65
- if M is None:
66
- return False
67
-
68
- h, w = template_shape
69
- pts = np.float32([[0, 0], [0, h], [w, h], [w, 0]]).reshape(-1, 1, 2)
70
-
71
- try:
72
- dst = cv2.perspectiveTransform(pts, M)
73
- except:
74
- logger.debug("Validation failed: perspectiveTransform error")
75
- return False
76
-
77
- # Calculate transformed area
78
- transformed_area = cv2.contourArea(dst)
79
- original_area = w * h
80
-
81
- # Check area ratio
82
- area_ratio = transformed_area / original_area
83
- logger.debug(f"Area ratio: {area_ratio:.3f}")
84
- if area_ratio < 0.01 or area_ratio > 100:
85
- logger.debug(f"Area ratio {area_ratio:.3f} outside bounds")
86
- return False
87
-
88
- # Check aspect ratio preservation
89
- transformed_width = np.linalg.norm(dst[2] - dst[3])
90
- transformed_height = np.linalg.norm(dst[1] - dst[0])
91
- transformed_aspect = transformed_width / (transformed_height + 1e-6)
92
- original_aspect = w / h
93
-
94
- aspect_ratio_change = transformed_aspect / original_aspect
95
- logger.debug(f"Aspect ratio change: {aspect_ratio_change:.3f}")
96
- if aspect_ratio_change < 0.5 or aspect_ratio_change > 2.0:
97
- logger.debug(f"Aspect ratio change {aspect_ratio_change:.3f} outside bounds")
98
- return False
99
-
100
- # Check for excessive skew
101
- edge1 = dst[1] - dst[0]
102
- edge2 = dst[3] - dst[0]
103
- angle = np.abs(np.dot(edge1.flatten(), edge2.flatten()) /
104
- (np.linalg.norm(edge1) * np.linalg.norm(edge2) + 1e-6))
105
- logger.debug(f"Edge angle dot product: {angle:.3f}")
106
- if angle > 0.5:
107
- logger.debug(f"Excessive skew: {angle:.3f}")
108
- return False
109
-
110
- logger.debug("Validation passed")
111
- return True
112
-
113
-
114
- def try_match_with_template(
115
- gray_page: np.ndarray,
116
- gray_template: np.ndarray,
117
- sift_or_orb: Any,
118
- kp1: List[cv2.KeyPoint],
119
- des1: np.ndarray,
120
- inverted: bool = False
121
- ) -> Tuple[Optional[np.ndarray], int, int]:
122
- """
123
- Try matching with a template (original or inverted).
124
-
125
- Returns
126
- -------
127
- Tuple[Optional[np.ndarray], int, int]
128
- (Homography matrix, number of good matches, number of inliers)
129
- """
130
- # Extract features from page
131
- try:
132
- kp2, des2 = sift_or_orb.detectAndCompute(gray_page, None)
133
- except Exception as e:
134
- logger.debug(f"Feature extraction failed: {e}")
135
- return None, 0, 0
136
-
137
- if kp2 is None or des2 is None or len(des1) == 0 or len(des2) == 0:
138
- return None, 0, 0
139
-
140
- # Match descriptors
141
- bf = cv2.BFMatcher()
142
- matches = bf.knnMatch(des1, des2, k=2)
143
-
144
- # Apply Lowe's ratio test
145
- good_matches = [
146
- m for m_n in matches if len(m_n) == 2
147
- for m, n in [m_n] if m.distance < MATCH_THRESHOLD * n.distance
148
- ]
149
-
150
- n_good = len(good_matches)
151
- logger.debug(f"{'Inverted' if inverted else 'Original'} template: {n_good} good matches (need {MIN_MATCHES})")
152
-
153
- if n_good < MIN_MATCHES:
154
- return None, n_good, 0
155
-
156
- # Prepare points and estimate homography
157
- src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
158
- dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
159
-
160
- M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, RANSAC_REPROJ_THRESHOLD)
161
-
162
- if M is None:
163
- logger.debug("Homography estimation failed")
164
- return None, n_good, 0
165
-
166
- # Validate inliers
167
- inliers = np.sum(mask)
168
- logger.debug(f"Inliers: {inliers}/{n_good}")
169
-
170
- if inliers < 10 or inliers / n_good < 0.6:
171
- logger.debug("Insufficient inliers")
172
- return None, n_good, inliers
173
-
174
- # Validate geometric properties
175
- if not validate_homography(M, gray_template.shape, gray_page.shape):
176
- logger.debug("Homography failed geometric validation")
177
- return None, n_good, inliers
178
-
179
- return M, n_good, inliers
180
-
181
-
182
- def extract_logo_from_pdf_page(
183
- page_cv: np.ndarray,
184
- gray_template: np.ndarray,
185
- sift_or_orb: Any,
186
- kp1: List[cv2.KeyPoint],
187
- des1: np.ndarray,
188
- ) -> Optional[np.ndarray]:
189
- """
190
- Extract logo with polarity-invariant feature matching.
191
-
192
- This function tries both the original template and an inverted version,
193
- then returns the homography from whichever gives better results.
194
-
195
- Parameters
196
- ----------
197
- page_cv : np.ndarray
198
- Page image (BGR)
199
- gray_template : np.ndarray
200
- Template image (grayscale)
201
- sift_or_orb : Any
202
- Feature detector/descriptor
203
- kp1 : List[cv2.KeyPoint]
204
- Template keypoints (from original template)
205
- des1 : np.ndarray
206
- Template descriptors (from original template)
207
-
208
- Returns
209
- -------
210
- Optional[np.ndarray]
211
- Homography matrix if logo found, None otherwise
212
- """
213
- gray_page = cv2.cvtColor(page_cv, cv2.COLOR_BGR2GRAY)
214
-
215
- # Strategy 1: Try with original template
216
- logger.debug("Trying original template...")
217
- M_orig, n_good_orig, inliers_orig = try_match_with_template(
218
- gray_page, gray_template, sift_or_orb, kp1, des1, inverted=False
219
- )
220
-
221
- # Strategy 2: Try with inverted template (for white-on-dark logos)
222
- logger.debug("Trying inverted template...")
223
- gray_template_inv = 255 - gray_template
224
- kp1_inv, des1_inv = sift_or_orb.detectAndCompute(gray_template_inv, None)
225
-
226
- M_inv, n_good_inv, inliers_inv = try_match_with_template(
227
- gray_page, gray_template_inv, sift_or_orb, kp1_inv, des1_inv, inverted=True
228
- )
229
-
230
- # Select the best match based on quality metrics
231
- best_M = None
232
- best_score = 0
233
-
234
- if M_orig is not None:
235
- # Quality score: weighted combination of matches and inliers
236
- score_orig = n_good_orig + (inliers_orig * 2)
237
- if score_orig > best_score:
238
- best_score = score_orig
239
- best_M = M_orig
240
- logger.info(f"Original template selected (score: {score_orig}, matches: {n_good_orig}, inliers: {inliers_orig})")
241
-
242
- if M_inv is not None:
243
- # Quality score: weighted combination of matches and inliers
244
- score_inv = n_good_inv + (inliers_inv * 2)
245
- if score_inv > best_score:
246
- best_score = score_inv
247
- best_M = M_inv
248
- logger.info(f"Inverted template selected (score: {score_inv}, matches: {n_good_inv}, inliers: {inliers_inv})")
249
-
250
- if best_M is not None:
251
- logger.info("Logo detected with valid homography")
252
- return best_M
253
- else:
254
- logger.debug("No valid match found with either template")
255
- return None
256
-
257
-
258
- def extract_logo_from_pdf_page_dual_features(
259
- page_cv: np.ndarray,
260
- gray_template: np.ndarray,
261
- sift_or_orb: Any,
262
- kp1_orig: List[cv2.KeyPoint],
263
- des1_orig: np.ndarray,
264
- kp1_inv: List[cv2.KeyPoint],
265
- des1_inv: np.ndarray,
266
- ) -> Optional[np.ndarray]:
267
- """
268
- Extract logo with pre-computed inverted template features.
269
-
270
- This version is more efficient when processing multiple pages, as the
271
- inverted template features are computed once and reused.
272
-
273
- Parameters
274
- ----------
275
- page_cv : np.ndarray
276
- Page image (BGR)
277
- gray_template : np.ndarray
278
- Template image (grayscale, original)
279
- sift_or_orb : Any
280
- Feature detector/descriptor
281
- kp1_orig : List[cv2.KeyPoint]
282
- Template keypoints (from original template)
283
- des1_orig : np.ndarray
284
- Template descriptors (from original template)
285
- kp1_inv : List[cv2.KeyPoint]
286
- Template keypoints (from inverted template)
287
- des1_inv : np.ndarray
288
- Template descriptors (from inverted template)
289
-
290
- Returns
291
- -------
292
- Optional[np.ndarray]
293
- Homography matrix if logo found, None otherwise
294
- """
295
- gray_page = cv2.cvtColor(page_cv, cv2.COLOR_BGR2GRAY)
296
- gray_template_inv = 255 - gray_template
297
-
298
- # Try with original template
299
- logger.debug("Trying original template...")
300
- M_orig, n_good_orig, inliers_orig = try_match_with_template(
301
- gray_page, gray_template, sift_or_orb, kp1_orig, des1_orig, inverted=False
302
- )
303
-
304
- # Try with inverted template
305
- logger.debug("Trying inverted template...")
306
- M_inv, n_good_inv, inliers_inv = try_match_with_template(
307
- gray_page, gray_template_inv, sift_or_orb, kp1_inv, des1_inv, inverted=True
308
- )
309
-
310
- # Select the best match
311
- best_M = None
312
- best_score = 0
313
-
314
- if M_orig is not None:
315
- score_orig = n_good_orig + (inliers_orig * 2)
316
- if score_orig > best_score:
317
- best_score = score_orig
318
- best_M = M_orig
319
- logger.info(f"Original template selected (score: {score_orig})")
320
-
321
- if M_inv is not None:
322
- score_inv = n_good_inv + (inliers_inv * 2)
323
- if score_inv > best_score:
324
- best_score = score_inv
325
- best_M = M_inv
326
- logger.info(f"Inverted template selected (score: {score_inv})")
327
-
328
- if best_M is not None:
329
- logger.info("Logo detected with valid homography")
330
- return best_M
331
- else:
332
- logger.debug("No valid match found with either template")
333
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/image_analysis.py DELETED
@@ -1,157 +0,0 @@
1
- import os
2
- import logging
3
- from typing import Dict
4
- import pandas as pd
5
- import openai
6
- import re
7
-
8
- # import from modules
9
- from modules.models import FSCExtractionAndTaxonomy
10
- from modules.prompts import prompt_image_analysis
11
- from modules.utils import get_token_costs
12
- from modules.image_processing import file_to_b64, pdf_multi_page_to_images
13
-
14
- # Configure logging
15
- logger = logging.getLogger(__name__)
16
-
17
- # ============================ Taxonomy Preparation ===================================
18
- def build_taxonomy_block_from_dataframe(taxonomy_df: pd.DataFrame, include_level3: bool = False) -> str:
19
- """
20
- Produces compact lines from a filtered taxonomy dataframe such as:
21
- W12 Indoor furniture | W12.1 Cabinet
22
- P5 Packaging and wrappings of paper | P5.1 Cardboard packaging | P5.2 Corrugated paper packaging
23
- """
24
- lines, seen = [], set()
25
-
26
- for _, row in taxonomy_df.iterrows():
27
- # Handle NaN values properly
28
- main_category = str(row.get("Main Category") or "").strip() if pd.notna(row.get("Main Category")) else ""
29
- l1c = str(row.get("Level 1 Code") or "").strip() if pd.notna(row.get("Level 1 Code")) else ""
30
- l1n = str(row.get("Level 1 Name") or "").strip() if pd.notna(row.get("Level 1 Name")) else ""
31
- l2c = str(row.get("Level 2 Code") or "").strip() if pd.notna(row.get("Level 2 Code")) else ""
32
- l2n = str(row.get("Level 2 Name") or "").strip() if pd.notna(row.get("Level 2 Name")) else ""
33
- l3c = str(row.get("Level 3 Code") or "").strip() if pd.notna(row.get("Level 3 Code")) else ""
34
- l3n = str(row.get("Level 3 Name") or "").strip() if pd.notna(row.get("Level 3 Name")) else ""
35
-
36
- if not l1c or not l1n:
37
- continue
38
-
39
- parts = [f"{main_category} | {l1c} {l1n}"]
40
- if l2c:
41
- parts.append(f"{l2c} {l2n}")
42
- if include_level3 and l3c:
43
- parts.append(f"{l3c} {l3n}")
44
- line = " | ".join(parts)
45
- if line not in seen:
46
- seen.add(line)
47
- lines.append(line)
48
-
49
- return "\n".join(lines)
50
-
51
-
52
- # ============================ Image Analysis =====================================
53
- def extract_fsc_logo_and_taxonomy(image_path: str, product_type: str, taxonomy_block: str,
54
- client: openai.OpenAI = None, model_name: str = "gpt-4.1") -> Dict:
55
- """
56
- Describes image and material, matches taxonomy items, gets text
57
- """
58
- if client is None:
59
- raise ValueError("OpenAI client must be provided")
60
-
61
- INPUT_TOKEN_COST, OUTPUT_TOKEN_COST = get_token_costs(model_name)
62
-
63
- content = [{"type": "input_text", "text": prompt_image_analysis(product_type, taxonomy_block)}]
64
-
65
- if image_path.lower().endswith(".pdf"):
66
- image_paths = pdf_multi_page_to_images(image_path)
67
- else:
68
- return "Unsupported file format."
69
-
70
- for image_path in image_paths:
71
- img_b64 = file_to_b64(image_path)
72
- content.append({"type": "input_image", "image_url": f"data:image/png;base64,{img_b64}", "detail": "high"})
73
-
74
- logger.info(f"Image analysis content: {len(image_paths)} pages")
75
-
76
- try:
77
- response = client.responses.parse(
78
- model=model_name,
79
- input=[{"role": "user", "content": content}],
80
- text_format=FSCExtractionAndTaxonomy,
81
- )
82
-
83
- usage = response.usage
84
- input_tokens = usage.input_tokens
85
- output_tokens = usage.output_tokens
86
- input_cost = (input_tokens / 1000.0) * INPUT_TOKEN_COST
87
- output_cost = (output_tokens / 1000.0) * OUTPUT_TOKEN_COST
88
- call_cost = input_cost + output_cost
89
-
90
- logger.info(f"Image analysis: {input_tokens} in + {output_tokens} out = ${call_cost:.4f}")
91
-
92
- if response.output_parsed:
93
- result = response.output_parsed
94
- result_dict = result.model_dump()
95
-
96
- # flatten top-1 taxonomy into columns
97
- tx = result_dict.get("taxonomy_matches", [])[:1]
98
- if tx:
99
- t0 = tx[0]
100
- result_dict["main_category"] = t0.get("main_category")
101
- result_dict["level1_code"] = t0.get("level1_code")
102
- result_dict["level1_name"] = t0.get("level1_name")
103
- result_dict["level2_code"] = t0.get("level2_code")
104
- result_dict["level2_name"] = t0.get("level2_name")
105
- result_dict["taxonomy_confidence"] = t0.get("confidence")
106
- else:
107
- # No taxonomy matches found
108
- result_dict["main_category"] = None
109
- result_dict["level1_code"] = None
110
- result_dict["level1_name"] = None
111
- result_dict["level2_code"] = None
112
- result_dict["level2_name"] = None
113
- result_dict["taxonomy_confidence"] = None
114
-
115
- return {
116
- "image_data": result_dict,
117
- "usage": {
118
- "input_tokens": input_tokens,
119
- "output_tokens": output_tokens,
120
- "total_cost": call_cost
121
- }
122
- }
123
-
124
- except Exception as e:
125
- logger.error(f"Error processing Step 3: {str(e)}")
126
- raise
127
-
128
- finally:
129
- # Clean up temporary PNG if created for single and multi page pdfs
130
- for image_path in image_paths:
131
- try:
132
- os.remove(image_path)
133
- logger.debug(f"Cleaned up temporary PNG: {image_path} for {len(image_paths)} pages")
134
- except OSError:
135
- logger.warning(f"Failed to clean up temporary PNG: {image_path} for {len(image_paths)} pages")
136
-
137
-
138
-
139
- # ============================ FSC Text Analysis ======================================
140
-
141
- # Analyze text from image and return all instances of the text "FSC" or "Forest Stewardship Council"
142
- def analyze_fsc_text(image_text: str) -> str:
143
- """
144
- Return all instances of "FSC" or "Forest Stewardship Council" with up to 3 words of context on each side.
145
- Stops at newline characters.
146
- """
147
- if not image_text:
148
- return ""
149
-
150
- word = r"\b\w+(?:[-'']\w+)*\b"
151
- pattern = rf"(?:{word}[^\w\n]+){{0,5}}(?:FSC|Forest Stewardship Council)(?:[^\w\n]+{word}){{0,5}}"
152
- matches = re.findall(pattern, image_text, flags=re.IGNORECASE)
153
- if not matches:
154
- return "No FSC text strings identified"
155
-
156
- return "\n".join(f"{i}: {match}" for i, match in enumerate(matches, start=1))
157
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/image_processing.py CHANGED
@@ -4,7 +4,6 @@ import logging
4
  from pathlib import Path
5
  import pypdfium2 as pdfium
6
  from PIL import Image, ImageOps, ImageChops, ImageFilter
7
- from typing import List
8
 
9
  # Configure logging
10
  logger = logging.getLogger(__name__)
@@ -21,21 +20,15 @@ def _autocrop_white(im, tol=3):
21
  bbox = gray.point(lambda p: 255 if p > tol else 0).getbbox()
22
  return im.crop(bbox) if bbox else im
23
 
24
-
25
- #
26
  def pdf_first_page_to_image(
27
  pdf_path: str,
28
- max_long_edge: int = 2000, # 1600–2400 is a good sweet spot
29
  fmt: str = "WEBP", # "WEBP" | "JPEG" | "PNG"
30
  quality: int = 82, # for WEBP/JPEG
31
  supersample: float = 1.7, # render a bit larger, then downsample with LANCZOS
32
  crop_margins: bool = True
33
  ) -> str:
34
- """
35
- OpenAI API will not accept pdf as image files.
36
- Needs to be intelligently converted.
37
- Using WEBP as default (offers the best tradeoff between compression and quality for this use case)
38
- """
39
  pdf = pdfium.PdfDocument(pdf_path)
40
  page = pdf.get_page(0)
41
  w_pt, h_pt = page.get_size() # PDF points
@@ -72,75 +65,4 @@ def pdf_first_page_to_image(
72
  img.save(out, "PNG", optimize=True, compress_level=9)
73
  else:
74
  raise ValueError("fmt must be one of: WEBP, JPEG, PNG")
75
- return out
76
-
77
- def pdf_multi_page_to_images(
78
- pdf_path: str,
79
- max_pages: int = 3, # Limit to first 3 pages to avoid excessive processing
80
- max_long_edge: int = 2000,
81
- fmt: str = "WEBP",
82
- quality: int = 82,
83
- supersample: float = 1.7,
84
- crop_margins: bool = True
85
- ) -> List[str]:
86
- """
87
- Convert PDF to multiple images (first N pages).
88
-
89
- Returns:
90
- List[str]: List of temporary image file paths
91
- """
92
- pdf = pdfium.PdfDocument(pdf_path)
93
- total_pages = len(pdf)
94
- pages_to_process = min(max_pages, total_pages)
95
-
96
- image_paths = []
97
- pdf_name = Path(pdf_path).stem
98
-
99
- for page_num in range(pages_to_process):
100
- try:
101
- page = pdf.get_page(page_num)
102
- w_pt, h_pt = page.get_size()
103
- long_pt = max(w_pt, h_pt)
104
-
105
- # Render bigger, then downsample
106
- scale = supersample * (max_long_edge / long_pt)
107
- pil = page.render(scale=scale).to_pil()
108
- img = pil.convert("RGB")
109
-
110
- # Downsample to exact target
111
- if max(img.size) > max_long_edge:
112
- ratio = max_long_edge / max(img.size)
113
- new_size = (int(img.width * ratio), int(img.height * ratio))
114
- img = img.resize(new_size, Image.LANCZOS).filter(
115
- ImageFilter.UnsharpMask(radius=0.6, percent=120, threshold=3)
116
- )
117
-
118
- if crop_margins:
119
- img = _autocrop_white(img)
120
-
121
- # Create temporary file
122
- with tempfile.NamedTemporaryFile(
123
- suffix=f".{fmt.lower()}",
124
- delete=False,
125
- prefix=f"{pdf_name}_page{page_num+1}_"
126
- ) as tmp_file:
127
- out_path = tmp_file.name
128
-
129
- # Save with appropriate format
130
- if fmt.upper() == "WEBP":
131
- img.save(out_path, "WEBP", quality=quality, method=6)
132
- elif fmt.upper() == "JPEG":
133
- img.save(out_path, "JPEG", quality=quality, optimize=True, progressive=True, subsampling=0)
134
- elif fmt.upper() == "PNG":
135
- img = img.quantize(colors=256)
136
- img.save(out_path, "PNG", optimize=True, compress_level=9)
137
- else:
138
- raise ValueError("fmt must be one of: WEBP, JPEG, PNG")
139
-
140
- image_paths.append(out_path)
141
-
142
- except Exception as e:
143
- logger.error(f"Error processing page {page_num + 1}: {e}")
144
- continue
145
-
146
- return image_paths
 
4
  from pathlib import Path
5
  import pypdfium2 as pdfium
6
  from PIL import Image, ImageOps, ImageChops, ImageFilter
 
7
 
8
  # Configure logging
9
  logger = logging.getLogger(__name__)
 
20
  bbox = gray.point(lambda p: 255 if p > tol else 0).getbbox()
21
  return im.crop(bbox) if bbox else im
22
 
23
+ # TO DO: fix this for multiple pages
 
24
  def pdf_first_page_to_image(
25
  pdf_path: str,
26
+ max_long_edge: int = 2000, # 1600–2400 is a good sweet spot for LLM vision
27
  fmt: str = "WEBP", # "WEBP" | "JPEG" | "PNG"
28
  quality: int = 82, # for WEBP/JPEG
29
  supersample: float = 1.7, # render a bit larger, then downsample with LANCZOS
30
  crop_margins: bool = True
31
  ) -> str:
 
 
 
 
 
32
  pdf = pdfium.PdfDocument(pdf_path)
33
  page = pdf.get_page(0)
34
  w_pt, h_pt = page.get_size() # PDF points
 
65
  img.save(out, "PNG", optimize=True, compress_level=9)
66
  else:
67
  raise ValueError("fmt must be one of: WEBP, JPEG, PNG")
68
+ return out
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/llm_pipeline.py ADDED
@@ -0,0 +1,433 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import logging
4
+ import re
5
+ from typing import List, Optional, Dict
6
+ import pandas as pd
7
+ import openai
8
+ from .models import FSCExtractionAndTaxonomy
9
+ from .utils import get_token_costs
10
+ from .image_processing import file_to_b64, pdf_first_page_to_image
11
+ from .fsc_api import get_fsc_products_for_license, filter_taxonomy_by_fsc_products
12
+
13
+ # Configure logging
14
+ logger = logging.getLogger(__name__)
15
+
16
+ # ============================ Prompt constructors ===================================
17
+ def build_taxonomy_block_from_dataframe(taxonomy_df: pd.DataFrame, include_level3: bool = False) -> str:
18
+ """
19
+ Produces compact lines from a filtered taxonomy dataframe such as:
20
+ W12 Indoor furniture | W12.1 Cabinet
21
+ P5 Packaging and wrappings of paper | P5.1 Cardboard packaging | P5.2 Corrugated paper packaging
22
+ """
23
+ lines, seen = [], set()
24
+
25
+ for _, row in taxonomy_df.iterrows():
26
+ # Handle NaN values properly
27
+ l1c = str(row.get("Level 1 Code") or "").strip() if pd.notna(row.get("Level 1 Code")) else ""
28
+ l1n = str(row.get("Level 1 Name") or "").strip() if pd.notna(row.get("Level 1 Name")) else ""
29
+ l2c = str(row.get("Level 2 Code") or "").strip() if pd.notna(row.get("Level 2 Code")) else ""
30
+ l2n = str(row.get("Level 2 Name") or "").strip() if pd.notna(row.get("Level 2 Name")) else ""
31
+ l3c = str(row.get("Level 3 Code") or "").strip() if pd.notna(row.get("Level 3 Code")) else ""
32
+ l3n = str(row.get("Level 3 Name") or "").strip() if pd.notna(row.get("Level 3 Name")) else ""
33
+
34
+ if not l1c or not l1n:
35
+ continue
36
+
37
+ parts = [f"{l1c} {l1n}"]
38
+ if l2c:
39
+ parts.append(f"{l2c} {l2n}")
40
+ if include_level3 and l3c:
41
+ parts.append(f"{l3c} {l3n}")
42
+ line = " | ".join(parts)
43
+ if line not in seen:
44
+ seen.add(line)
45
+ lines.append(line)
46
+
47
+ return "\n".join(lines)
48
+
49
+ def build_prompt(taxonomy_block: str) -> str:
50
+ return f"""
51
+ You are a compliance analyst assessing products for alignment with the Forest Stewardship Council (FSC) product taxonomy.
52
+
53
+ TASKS:
54
+ 1. Describe what the product appears to be
55
+ 2. Infer the substrate/material
56
+ 3. Assess if the product matches any category in the provided taxonomy
57
+ 4. If there's a match, select 1–3 closest taxonomy alignments from the TAXONOMY below
58
+ 5. If there's NO match, set has_taxonomy_match to false and leave taxonomy_matches empty
59
+ 6. If you can't interpret what the product is from the image, set has_taxonomy_match to false and leave taxonomy_matches empty
60
+
61
+ IMPORTANT: Only choose codes/names that appear in TAXONOMY. If the product doesn't clearly match any category in the taxonomy, indicate this by setting has_taxonomy_match to false.
62
+
63
+ Heuristics / Few-shot hints:
64
+ - Retail cereal boxes are folding cartons/cardboard → prefer P5.1 over P5.2 unless corrugation is explicit
65
+ - Chests/commodes/cupboards/drawers → W12 Indoor furniture → W12.1 Cabinet
66
+ - If the product appears to be something not covered by the taxonomy (e.g., electronics, clothing, food items), set has_taxonomy_match to false
67
+
68
+ Often the image will features pictures(s) of other non-relevant products (e.g. bicycles). The actual FSC product can in such cases be a printed material or other substrate (e.g. packaging, electronic media). Carefully assess what the product is, taking this into account.
69
+ When in doubt,arefully read all text in the image to help infer what the product is vs. what the actual FSC product is.
70
+ A major flag is when you see a schematic which indicates a packaging design document. In this case, the product indicated on the packaging may differ from the actual FSC product.
71
+ Also be careful as some images are taken from a screenshot of a website. In this case, read all text as it may give an indication of the FSC product type.
72
+
73
+ Return JSON that matches the provided schema.
74
+
75
+ ## TAXONOMY (codes and names only; choose from these if there's a match)
76
+ {taxonomy_block}
77
+
78
+ Now analyze this image:
79
+ """.strip()
80
+
81
+ # ============================ Core extractor =====================================
82
+ def extract_fsc_logo_and_taxonomy(image_path: str, taxonomy_block: str, original_filename: str = None,
83
+ client: openai.OpenAI = None, model_name: str = "gpt-5",
84
+ total_tokens: Dict[str, int] = None, total_cost: List[float] = None):
85
+ """
86
+ Single vision call that performs both:
87
+ - FSC OCR extraction
88
+ - Product taxonomy classification
89
+ """
90
+ if client is None:
91
+ raise ValueError("OpenAI client must be provided")
92
+ if total_tokens is None:
93
+ total_tokens = {"input": 0, "output": 0}
94
+ if total_cost is None:
95
+ total_cost = [0.0]
96
+
97
+ INPUT_TOKEN_COST, OUTPUT_TOKEN_COST = get_token_costs(model_name)
98
+
99
+ # Use original filename if provided, otherwise fall back to image_path basename
100
+ display_name = original_filename if original_filename else os.path.basename(image_path)
101
+ logger.info(f"Processing: {display_name}")
102
+
103
+ img_b64 = file_to_b64(image_path)
104
+ content = [{"type": "text", "text": build_prompt(taxonomy_block)}]
105
+
106
+ content += [{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}", "detail": "high"}}]
107
+
108
+ try:
109
+ response = client.beta.chat.completions.parse(
110
+ model=model_name,
111
+ messages=[{"role": "user", "content": content}],
112
+ response_format=FSCExtractionAndTaxonomy,
113
+ )
114
+
115
+ usage = response.usage
116
+ input_tokens = usage.prompt_tokens
117
+ output_tokens = usage.completion_tokens
118
+ input_cost = (input_tokens / 1000.0) * INPUT_TOKEN_COST
119
+ output_cost = (output_tokens / 1000.0) * OUTPUT_TOKEN_COST
120
+ call_cost = input_cost + output_cost
121
+
122
+ total_tokens["input"] += input_tokens
123
+ total_tokens["output"] += output_tokens
124
+ total_cost[0] += call_cost
125
+
126
+ logger.info(f"✓ {display_name}: {input_tokens} in + {output_tokens} out = ${call_cost:.4f}")
127
+
128
+ if response.choices and response.choices[0].message and response.choices[0].message.parsed:
129
+ result = response.choices[0].message.parsed
130
+ result_dict = result.model_dump()
131
+ result_dict["source_file"] = display_name
132
+
133
+ # Convenience: flatten top-1 taxonomy into columns
134
+ tx = result_dict.get("taxonomy_matches", [])[:1]
135
+ if tx:
136
+ t0 = tx[0]
137
+ result_dict["level1_code"] = t0.get("level1_code")
138
+ result_dict["level1_name"] = t0.get("level1_name")
139
+ result_dict["level2_code"] = t0.get("level2_code")
140
+ result_dict["level2_name"] = t0.get("level2_name")
141
+ result_dict["taxonomy_confidence"] = t0.get("confidence")
142
+ else:
143
+ # No taxonomy matches found
144
+ result_dict["level1_code"] = None
145
+ result_dict["level1_name"] = None
146
+ result_dict["level2_code"] = None
147
+ result_dict["level2_name"] = None
148
+ result_dict["taxonomy_confidence"] = None
149
+
150
+ return result_dict
151
+
152
+ # Fallback if no parseable output returned
153
+ logger.warning(f"⚠ No parseable output for {display_name}")
154
+ return {
155
+ "source_file": display_name,
156
+ "product_summary": None,
157
+ "inferred_material": None,
158
+ "taxonomy_matches": [],
159
+ "has_taxonomy_match": False,
160
+ "level1_code": None,
161
+ "level1_name": None,
162
+ "level2_code": None,
163
+ "level2_name": None,
164
+ "taxonomy_confidence": None,
165
+ }
166
+ except Exception as e:
167
+ logger.error(f"✗ Error processing {display_name}: {str(e)}")
168
+ raise
169
+
170
+ # ============================ Main / Batch ======================================
171
+ def load_fsc_license_mapping(fsc_logo_extracted_csv: str) -> Dict[str, str]:
172
+ """
173
+ Load FSC license codes from the extracted CSV file.
174
+
175
+ Args:
176
+ fsc_logo_extracted_csv (str): Path to the FSC extraction CSV file
177
+
178
+ Returns:
179
+ Dict[str, str]: Mapping of source_file to fsc_license_code
180
+ """
181
+ try:
182
+ df = pd.read_csv(fsc_logo_extracted_csv)
183
+ # Create mapping from source_file to fsc_license_code
184
+ mapping = {}
185
+ for _, row in df.iterrows():
186
+ source_file = row.get('source_file', '')
187
+ license_code = row.get('fsc_license_code', '')
188
+ if pd.notna(license_code) and license_code:
189
+ mapping[source_file] = license_code
190
+
191
+ logger.info(f"Loaded {len(mapping)} FSC license mappings")
192
+
193
+ # Debug: Show some examples of the mapping
194
+ sample_keys = list(mapping.keys())[:5]
195
+ logger.info(f"Sample mapping keys: {sample_keys}")
196
+
197
+ return mapping
198
+ except Exception as e:
199
+ logger.error(f"Error loading FSC license mapping: {e}")
200
+ return {}
201
+
202
+ def get_files_to_process(directory: str, fsc_license_mapping: Dict[str, str]) -> List[str]:
203
+ """
204
+ Get list of files to process by matching CSV entries with actual files in directory.
205
+
206
+ Args:
207
+ directory (str): Directory containing image files
208
+ fsc_license_mapping (Dict[str, str]): Mapping of source_file to fsc_license_code
209
+
210
+ Returns:
211
+ List[str]: List of filenames to process
212
+ """
213
+ # Get all files in directory
214
+ all_files = [f for f in os.listdir(directory) if f.lower().endswith(('.pdf', '.png', '.jpg', '.jpeg', '.webp')) and not f.endswith('.page1.webp')]
215
+
216
+ files_to_process = []
217
+ unmatched_csv_entries = []
218
+
219
+ # For each CSV entry, try to find a matching file
220
+ for csv_filename in fsc_license_mapping.keys():
221
+ matching_file = find_matching_license_key(csv_filename, {f: f for f in all_files})
222
+ if matching_file:
223
+ files_to_process.append(matching_file)
224
+ logger.debug(f"Matched CSV entry '{csv_filename}' to file '{matching_file}'")
225
+ else:
226
+ unmatched_csv_entries.append(csv_filename)
227
+ logger.warning(f"No matching file found for CSV entry: {csv_filename}")
228
+
229
+ # Remove duplicates (in case multiple CSV entries match the same file)
230
+ files_to_process = list(set(files_to_process))
231
+
232
+ logger.info(f"Found {len(files_to_process)} files to process out of {len(fsc_license_mapping)} CSV entries")
233
+ logger.info(f"Unmatched CSV entries: {len(unmatched_csv_entries)}")
234
+
235
+ if unmatched_csv_entries:
236
+ logger.info(f"Sample unmatched entries: {unmatched_csv_entries[:5]}")
237
+
238
+ return files_to_process
239
+
240
+ def find_matching_license_key(filename: str, license_mapping: Dict[str, str]) -> Optional[str]:
241
+ """
242
+ Find a matching license key for a given filename by trying different variations.
243
+
244
+ Args:
245
+ filename (str): The filename with extension (e.g., "KM548644-fsc.pdf")
246
+ license_mapping (Dict[str, str]): The license mapping dictionary
247
+
248
+ Returns:
249
+ Optional[str]: The matching license key or None if not found
250
+ """
251
+ # Try exact match first
252
+ if filename in license_mapping:
253
+ return filename
254
+
255
+ # Try without extension
256
+ filename_no_ext = os.path.splitext(filename)[0]
257
+ if filename_no_ext in license_mapping:
258
+ return filename_no_ext
259
+
260
+ # Try with different common extensions
261
+ for ext in ['.pdf', '.png', '.jpg', '.jpeg', '.webp']:
262
+ test_filename = filename_no_ext + ext
263
+ if test_filename in license_mapping:
264
+ return test_filename
265
+
266
+ # Try removing common suffixes that might have been added during processing
267
+ # Remove hash suffixes like "_2d4b076e5c1f4db6893970e23a14809c"
268
+ filename_clean = re.sub(r'_[a-f0-9]{32}$', '', filename_no_ext)
269
+ if filename_clean in license_mapping:
270
+ return filename_clean
271
+
272
+ # Try removing hash suffixes with different patterns
273
+ filename_clean2 = re.sub(r'_[a-f0-9]{8,}$', '', filename_no_ext)
274
+ if filename_clean2 in license_mapping:
275
+ return filename_clean2
276
+
277
+ return None
278
+
279
+ def process_directory_with_fsc_filtering(directory: str, taxonomy_df: pd.DataFrame,
280
+ fsc_license_mapping: Dict[str, str],
281
+ client: openai.OpenAI, model_name: str = "gpt-5") -> pd.DataFrame:
282
+ logger.info(f"Starting processing of directory: {directory}")
283
+
284
+ # Get files to process based on CSV entries
285
+ files_to_process = get_files_to_process(directory, fsc_license_mapping)
286
+
287
+ if not files_to_process:
288
+ logger.error("No files to process found!")
289
+ return pd.DataFrame()
290
+
291
+ # Debug: Show some examples of files to process
292
+ sample_files = files_to_process[:5]
293
+ logger.info(f"Sample files to process: {sample_files}")
294
+
295
+ rows = []
296
+ total_tokens = {"input": 0, "output": 0}
297
+ total_cost = [0.0]
298
+
299
+ for i, fname in enumerate(files_to_process, 1):
300
+ fpath = os.path.join(directory, fname)
301
+ lower = fname.lower()
302
+
303
+ logger.info(f"Processing file {i}/{len(files_to_process)}: {fname}")
304
+
305
+ # Get FSC license code for this file using flexible matching
306
+ matching_key = find_matching_license_key(fname, fsc_license_mapping)
307
+ fsc_license_code = fsc_license_mapping.get(matching_key) if matching_key else None
308
+
309
+ if not fsc_license_code:
310
+ logger.warning(f"No FSC license code found for {fname} (tried matching key: {matching_key})")
311
+ # Debug: Show what keys are available for similar filenames
312
+ filename_no_ext = os.path.splitext(fname)[0]
313
+ similar_keys = [k for k in fsc_license_mapping.keys() if filename_no_ext in k or k in filename_no_ext]
314
+ if similar_keys:
315
+ logger.info(f"Similar keys found: {similar_keys[:3]}") # Show first 3
316
+ continue
317
+
318
+ logger.info(f"Found FSC license code: {fsc_license_code} for file: {fname}")
319
+
320
+ # Get allowed products from FSC API
321
+ logger.info(f"Fetching FSC products for license: {fsc_license_code}")
322
+ fsc_products, license_status = get_fsc_products_for_license(fsc_license_code)
323
+
324
+ # Check if license status is not 'Valid'
325
+ if license_status and license_status != 'Valid':
326
+ logger.warning(f"License status is '{license_status}' for {fsc_license_code}, skipping taxonomy processing")
327
+ # Add a row with license status and skip taxonomy processing
328
+ rows.append({
329
+ "source_file": fname,
330
+ "fsc_license_code": fsc_license_code,
331
+ "license_status": license_status,
332
+ "product_summary": None,
333
+ "inferred_material": None,
334
+ "has_taxonomy_match": False,
335
+ "level1_code": None,
336
+ "level1_name": None,
337
+ "level2_code": None,
338
+ "level2_name": None,
339
+ "taxonomy_confidence": None,
340
+ "all_taxonomy_matches": "[]",
341
+ "fsc_products_count": len(fsc_products),
342
+ "fsc_products": json.dumps(fsc_products, ensure_ascii=False),
343
+ })
344
+ continue
345
+
346
+ if not fsc_products:
347
+ logger.warning(f"No FSC products found for license {fsc_license_code}, using full taxonomy")
348
+ filtered_taxonomy = taxonomy_df
349
+ else:
350
+ # Filter taxonomy based on FSC products
351
+ filtered_taxonomy = filter_taxonomy_by_fsc_products(taxonomy_df, fsc_products)
352
+
353
+ if filtered_taxonomy.empty:
354
+ logger.warning(f"No matching taxonomy found for FSC products, using full taxonomy")
355
+ filtered_taxonomy = taxonomy_df
356
+
357
+ # Build taxonomy block from filtered taxonomy
358
+ taxonomy_block = build_taxonomy_block_from_dataframe(filtered_taxonomy, include_level3=True)
359
+
360
+ if not taxonomy_block.strip():
361
+ logger.warning(f"Empty taxonomy block for {fname}, using full taxonomy")
362
+ taxonomy_block = build_taxonomy_block_from_dataframe(taxonomy_df, include_level3=True)
363
+
364
+ # Convert PDF to temporary PNG (first page), otherwise use image as-is
365
+ cleanup_png = False
366
+ try:
367
+ if lower.endswith(".pdf"):
368
+ img_path = pdf_first_page_to_image(fpath)
369
+ cleanup_png = True
370
+ # Log the size of the converted PNG file
371
+ converted_file_size = os.path.getsize(img_path) / (1024 * 1024) # Convert to MB
372
+ logger.info(f"Converted PNG size: {converted_file_size:.2f} MB")
373
+ elif lower.endswith((".png", ".jpg", ".jpeg", ".webp")):
374
+ img_path = fpath
375
+ else:
376
+ continue # skip unsupported
377
+
378
+ # Pass the original filename to ensure it's used in the results
379
+ out = extract_fsc_logo_and_taxonomy(img_path, taxonomy_block, original_filename=fname,
380
+ client=client, model_name=model_name,
381
+ total_tokens=total_tokens, total_cost=total_cost)
382
+
383
+ # Create a single row for each file
384
+ rows.append({
385
+ "source_file": out.get("source_file"),
386
+ "fsc_license_code": fsc_license_code,
387
+ "license_status": license_status,
388
+ "product_summary": out.get("product_summary"),
389
+ "inferred_material": out.get("inferred_material"),
390
+ "has_taxonomy_match": out.get("has_taxonomy_match", False),
391
+ "level1_code": out.get("level1_code"),
392
+ "level1_name": out.get("level1_name"),
393
+ "level2_code": out.get("level2_code"),
394
+ "level2_name": out.get("level2_name"),
395
+ "taxonomy_confidence": out.get("taxonomy_confidence"),
396
+ "all_taxonomy_matches": json.dumps(out.get("taxonomy_matches", []), ensure_ascii=False),
397
+ "fsc_products_count": len(fsc_products),
398
+ "fsc_products": json.dumps(fsc_products, ensure_ascii=False),
399
+ })
400
+
401
+ except Exception as e:
402
+ logger.error(f"Failed to process {fname}: {e}")
403
+ # Add a row with error information
404
+ rows.append({
405
+ "source_file": fname,
406
+ "fsc_license_code": fsc_license_code,
407
+ "license_status": license_status,
408
+ "product_summary": f"ERROR: {str(e)}",
409
+ "inferred_material": None,
410
+ "has_taxonomy_match": False,
411
+ "level1_code": None,
412
+ "level1_name": None,
413
+ "level2_code": None,
414
+ "level2_name": None,
415
+ "taxonomy_confidence": None,
416
+ "all_taxonomy_matches": "[]",
417
+ "fsc_products_count": len(fsc_products),
418
+ "fsc_products": json.dumps(fsc_products, ensure_ascii=False),
419
+ })
420
+ finally:
421
+ # Clean up temp PNG
422
+ if cleanup_png and 'img_path' in locals():
423
+ try:
424
+ os.remove(img_path)
425
+ logger.debug(f"Cleaned up temporary PNG: {img_path}")
426
+ except OSError:
427
+ logger.warning(f"Failed to clean up temporary PNG: {img_path}")
428
+
429
+ df = pd.DataFrame(rows)
430
+ logger.info(f"Completed processing. Generated {len(df)} rows")
431
+ logger.info(f"Total tokens in/out: {total_tokens['input']}/{total_tokens['output']}")
432
+ logger.info(f"Total cost: ${total_cost[0]:.4f}")
433
+ return df
modules/logo_extraction.py DELETED
@@ -1,434 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Searches for FSC logo inside PDF pages using block-wise template matching + feature matching (SIFT/ORB).
4
-
5
- Behavior notes:
6
- - The code will use fitz (PyMuPDF) or pdf2image depending on PDF_2_IMG flag.
7
- - If a logo is found it is written to: OUTPUT_FOLDER/{pdf_filename}_logo.png
8
- - If OUTPUT_FOLDER does not exist, behavior is unchanged (file write will fail as before).
9
- - Print messages remain the same and continue to use flush=True.
10
-
11
- Optimizations:
12
- - Parallel block processing using ProcessPoolExecutor
13
- - Template feature caching across PDFs
14
- - Configurable detector (ORB default for speed, SIFT optional)
15
- - Coarse-to-fine search strategy
16
- - Reduced feature counts for better performance
17
- - Early termination on first match
18
- - Retry if failure
19
-
20
- Handles white-on-dark vs black-on-white logo problem by:
21
- 1. Caching both original AND inverted template features
22
- 2. Trying both during matching
23
- 3. Selecting the best match automatically
24
- """
25
-
26
- import os
27
- from typing import Any, Optional, Tuple, List
28
- from functools import partial
29
- from concurrent.futures import ProcessPoolExecutor, as_completed
30
- import cv2
31
- import fitz # PyMuPDF
32
- import numpy as np
33
- from pdf2image import convert_from_path
34
- from PIL import Image
35
- import logging
36
-
37
- # import from modules
38
- from modules.homography_functions import serialize_keypoints, deserialize_keypoints, extract_logo_from_pdf_page_dual_features
39
-
40
- logger = logging.getLogger(__name__)
41
-
42
- Image.MAX_IMAGE_PIXELS = 933120000
43
-
44
- # ==== Global Constants ====
45
- TEMPLATE_PATH = os.path.join("assets","fsc-logo-black-and-white.jpg")
46
- PDF_FOLDER = os.path.join("template_matching","pdfs")
47
- OUTPUT_FOLDER = os.path.join("template_matching","extracted")
48
- PADDING_SCALE = 4
49
- DPI = 300
50
- PDF_2_IMG = False
51
-
52
- # Optimized detector settings
53
- USE_SIFT = True # Changed to False - ORB is much faster and often sufficient
54
- SIFT_FEATURES = 2500 # Reduced from 5000 for better performance
55
- ORB_FEATURES = 1500 # Increased from 1000 for better matching
56
-
57
- # Parallelization settings
58
- MAX_WORKERS = 4 # Adjust based on your CPU cores
59
- USE_COARSE_TO_FINE = False # Enable coarse-to-fine search for large pages
60
-
61
- # Block processing settings
62
- OVERLAP = 0.2
63
- MIN_BLOCK_SIZE = 300
64
- MAX_BLOCK_SIZE = 1000
65
- # ==== End Global Constants ====
66
-
67
- # Global template cache
68
- _template_cache = {}
69
-
70
-
71
-
72
- def calculate_adaptive_block_size(page_shape: Tuple[int, int],
73
- min_size: int = MIN_BLOCK_SIZE,
74
- max_size: int = MAX_BLOCK_SIZE) -> Tuple[int, int]:
75
- """Calculate block size as a fraction of image dimensions."""
76
- H, W = page_shape[:2]
77
- size = int(min(H, W) * 0.25)
78
- size = max(min_size, min(size, max_size))
79
- return (size, size)
80
-
81
-
82
- def get_template_features(template_path: str, use_sift: bool = USE_SIFT):
83
- """
84
- Load and cache BOTH original and inverted template features.
85
-
86
- This allows the system to match logos regardless of polarity
87
- (black-on-white vs white-on-black).
88
-
89
- Returns
90
- -------
91
- Tuple containing:
92
- - gray_template: grayscale template (original)
93
- - kp_orig: keypoints from original
94
- - des_orig: descriptors from original
95
- - kp_inv: keypoints from inverted
96
- - des_inv: descriptors from inverted
97
- - detector: the feature detector used
98
- """
99
- cache_key = (template_path, use_sift)
100
-
101
- if cache_key not in _template_cache:
102
- logger.info(f"Computing template features (SIFT={use_sift}) for BOTH polarities...")
103
-
104
- # Load and resize template
105
- template = cv2.imread(template_path)
106
- gray_template = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
107
-
108
- h, w = gray_template.shape
109
- max_dim = 300
110
- if max(h, w) > max_dim:
111
- scale = max_dim / max(h, w)
112
- new_w = int(w * scale)
113
- new_h = int(h * scale)
114
- gray_template = cv2.resize(gray_template, (new_w, new_h),
115
- interpolation=cv2.INTER_AREA)
116
- logger.info(f"Resized template from {w}x{h} to {new_w}x{new_h}")
117
-
118
- # Create detector
119
- if use_sift:
120
- detector = cv2.SIFT_create(nfeatures=SIFT_FEATURES)
121
- else:
122
- detector = cv2.ORB_create(nfeatures=ORB_FEATURES)
123
-
124
- # Extract features from ORIGINAL template
125
- kp_orig, des_orig = detector.detectAndCompute(gray_template, None)
126
- logger.info(f"Original template keypoints: {len(kp_orig)}")
127
-
128
- # Extract features from INVERTED template
129
- gray_template_inv = 255 - gray_template
130
- kp_inv, des_inv = detector.detectAndCompute(gray_template_inv, None)
131
- logger.info(f"Inverted template keypoints: {len(kp_inv)}")
132
-
133
- # Cache both versions
134
- _template_cache[cache_key] = (
135
- gray_template,
136
- kp_orig, des_orig,
137
- kp_inv, des_inv,
138
- detector
139
- )
140
-
141
- return _template_cache[cache_key]
142
-
143
-
144
- def process_single_block(block_data: Tuple,
145
- gray_template: np.ndarray,
146
- serialized_kp_orig: List[Tuple],
147
- des_orig: np.ndarray,
148
- serialized_kp_inv: List[Tuple],
149
- des_inv: np.ndarray,
150
- use_sift: bool) -> Optional[Tuple]:
151
- """
152
- Process a single block with dual-template matching.
153
-
154
- Returns
155
- -------
156
- Optional[Tuple]
157
- (M_block, x_offset, y_offset, block_num) if logo found, None otherwise
158
- """
159
- block, x_offset, y_offset, block_num, total_blocks = block_data
160
-
161
- logger.debug(f"Processing block {block_num}/{total_blocks} at ({x_offset},{y_offset})")
162
-
163
- # Create detector inside worker (can't pickle cv2 objects)
164
- if use_sift:
165
- detector = cv2.SIFT_create(nfeatures=SIFT_FEATURES)
166
- else:
167
- detector = cv2.ORB_create(nfeatures=ORB_FEATURES)
168
-
169
- # Deserialize keypoints for both templates
170
- kp_orig = deserialize_keypoints(serialized_kp_orig)
171
- kp_inv = deserialize_keypoints(serialized_kp_inv)
172
-
173
- try:
174
- # Use the improved dual-template matching function
175
- M_block = extract_logo_from_pdf_page_dual_features(
176
- block, gray_template, detector,
177
- kp_orig, des_orig,
178
- kp_inv, des_inv
179
- )
180
-
181
- if M_block is not None:
182
- logger.info(f"✓ Logo found in block {block_num} at ({x_offset}, {y_offset})")
183
- return (M_block, x_offset, y_offset, block_num)
184
-
185
- except Exception as e:
186
- logger.warning(f"Block {block_num} error: {e}")
187
-
188
- return None
189
-
190
-
191
- def extract_with_homography(M_block: np.ndarray,
192
- x_offset: int,
193
- y_offset: int,
194
- page_cv: np.ndarray,
195
- gray_template: np.ndarray) -> np.ndarray:
196
- """Extract logo region using homography matrix."""
197
- # Transform block coordinates to page coordinates
198
- T = np.array([[1, 0, x_offset], [0, 1, y_offset], [0, 0, 1]], dtype=np.float32)
199
- M_page = T @ M_block
200
-
201
- # Warp template corners to page coords
202
- h, w = gray_template.shape
203
- pts = np.float32([[0, 0], [0, h], [w, h], [w, 0]]).reshape(-1, 1, 2)
204
- dst = cv2.perspectiveTransform(pts, M_page)
205
-
206
- # Compute bounding box with padding
207
- x_min, y_min = np.int32(dst.min(axis=0).ravel())
208
- x_max, y_max = np.int32(dst.max(axis=0).ravel())
209
-
210
- x_pad = int((x_max - x_min) * PADDING_SCALE)
211
- y_pad = int((y_max - y_min) * PADDING_SCALE)
212
-
213
- x1_crop = max(0, x_min - x_pad)
214
- y1_crop = max(0, y_min - y_pad)
215
- x2_crop = min(page_cv.shape[1], x_max + x_pad)
216
- y2_crop = min(page_cv.shape[0], y_max + y_pad)
217
-
218
- return page_cv[y1_crop:y2_crop, x1_crop:x2_crop]
219
-
220
-
221
- def search_logo_in_blocks_parallel(page_cv: np.ndarray,
222
- gray_template: np.ndarray,
223
- detector: Any,
224
- kp_orig: Any,
225
- des_orig: Any,
226
- kp_inv: Any,
227
- des_inv: Any,
228
- max_workers: int = MAX_WORKERS) -> Optional[np.ndarray]:
229
- """
230
- Parallel block-wise logo search with dual-template features.
231
- """
232
- H, W = page_cv.shape[:2]
233
- bh, bw = calculate_adaptive_block_size(page_cv.shape)
234
- step_h = int(bh * (1 - OVERLAP))
235
- step_w = int(bw * (1 - OVERLAP))
236
-
237
- logger.info(f"Page: {W}x{H}, Block: {bw}x{bh}, Template: {gray_template.shape[1]}x{gray_template.shape[0]}")
238
-
239
- # Prepare all blocks upfront
240
- blocks_data = []
241
- block_num = 0
242
- for y in range(0, H, step_h):
243
- for x in range(0, W, step_w):
244
- y1, y2 = y, min(y + bh, H)
245
- x1, x2 = x, min(x + bw, W)
246
- block = page_cv[y1:y2, x1:x2].copy()
247
- block_num += 1
248
- blocks_data.append((block, x1, y1, block_num, len(blocks_data)))
249
-
250
- total_blocks = len(blocks_data)
251
- logger.info(f"Processing {total_blocks} blocks in parallel (workers={max_workers})")
252
-
253
- # Serialize template features for BOTH polarities
254
- serialized_kp_orig = serialize_keypoints(kp_orig)
255
- serialized_kp_inv = serialize_keypoints(kp_inv)
256
- use_sift = isinstance(detector, cv2.SIFT)
257
-
258
- # Process blocks in parallel
259
- with ProcessPoolExecutor(max_workers=max_workers) as executor:
260
- process_func = partial(
261
- process_single_block,
262
- gray_template=gray_template,
263
- serialized_kp_orig=serialized_kp_orig,
264
- des_orig=des_orig,
265
- serialized_kp_inv=serialized_kp_inv,
266
- des_inv=des_inv,
267
- use_sift=use_sift
268
- )
269
-
270
- # Submit all blocks
271
- futures = {executor.submit(process_func, bd): bd for bd in blocks_data}
272
-
273
- # Process results as they complete (early termination)
274
- for future in as_completed(futures):
275
- result = future.result()
276
-
277
- if result is not None:
278
- M_block, x_offset, y_offset, block_num = result
279
-
280
- # Cancel remaining futures
281
- for f in futures:
282
- if not f.done():
283
- f.cancel()
284
-
285
- logger.info(f"Early termination - logo found, cancelled remaining blocks")
286
-
287
- # Extract logo with homography
288
- return extract_with_homography(
289
- M_block, x_offset, y_offset, page_cv, gray_template
290
- )
291
-
292
- logger.info("No logo found in any block")
293
- return None
294
-
295
-
296
- def search_logo_with_retry(page_cv: np.ndarray,
297
- gray_template: np.ndarray,
298
- detector: Any,
299
- kp_orig: Any,
300
- des_orig: Any,
301
- kp_inv: Any,
302
- des_inv: Any,
303
- max_retries: int = 3) -> Optional[np.ndarray]:
304
- """Search for logo with retry mechanism using dual templates."""
305
- for attempt in range(max_retries):
306
- logger.info(f"Logo search attempt {attempt + 1}/{max_retries}")
307
-
308
- result = search_logo_in_blocks_parallel(
309
- page_cv, gray_template, detector,
310
- kp_orig, des_orig,
311
- kp_inv, des_inv
312
- )
313
-
314
- if result is not None:
315
- logger.info(f"Logo found on attempt {attempt + 1}")
316
- return result
317
-
318
- if attempt < max_retries - 1:
319
- logger.info(f"Attempt {attempt + 1} failed, retrying...")
320
-
321
- logger.info("All retry attempts failed")
322
- return None
323
-
324
-
325
- def process_pdf(pdf_path: str,
326
- gray_template: np.ndarray,
327
- detector: Any,
328
- kp_orig: Any,
329
- des_orig: Any,
330
- kp_inv: Any,
331
- des_inv: Any) -> Optional[np.ndarray]:
332
- """
333
- Process a PDF file with dual-template matching.
334
-
335
- Returns
336
- -------
337
- Optional[np.ndarray]
338
- Cropped logo image if found, None otherwise
339
- """
340
- if PDF_2_IMG:
341
- pages = convert_from_path(pdf_path, dpi=DPI)
342
- else:
343
- pages = fitz.open(pdf_path)
344
-
345
- filename = os.path.splitext(os.path.basename(pdf_path))[0]
346
-
347
- for page_num, page in enumerate(pages, 1):
348
- try:
349
- logger.info(f"Processing page {page_num} of {filename}")
350
-
351
- if PDF_2_IMG:
352
- page_cv = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR)
353
- else:
354
- pix = page.get_pixmap(dpi=DPI)
355
- img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(
356
- pix.height, pix.width, pix.n
357
- )
358
- if pix.n == 4:
359
- page_cv = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
360
- else:
361
- page_cv = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
362
-
363
- # Search with dual-template retry
364
- logo = search_logo_with_retry(
365
- page_cv, gray_template, detector,
366
- kp_orig, des_orig,
367
- kp_inv, des_inv,
368
- max_retries=3
369
- )
370
-
371
- if logo is not None:
372
- logger.info(f"Logo found on page {page_num}")
373
- return logo
374
- else:
375
- logger.info(f"No logo on page {page_num}")
376
-
377
- except Exception as e:
378
- logger.error(f"Error processing page {page_num} of {filename}: {e}")
379
-
380
- return None
381
-
382
-
383
- def get_crop_from_pdf(pdf: Optional[str]) -> Optional[str]:
384
- """
385
- Main entry point with polarity-invariant processing.
386
-
387
- Parameters
388
- ----------
389
- pdf : Optional[str]
390
- Path to PDF file
391
-
392
- Returns
393
- -------
394
- Optional[str]
395
- Path to extracted logo image, or None if not found
396
- """
397
- if pdf is None:
398
- return (
399
- "Please upload a pdf file.",
400
- "⏳ Waiting for pdf...",
401
- "⏳ Waiting for pdf...",
402
- "⏳ Waiting for pdf...",
403
- "⏳ Waiting for pdf...",
404
- "",
405
- )
406
-
407
- # Get cached template features (both polarities)
408
- gray_template, kp_orig, des_orig, kp_inv, des_inv, detector = get_template_features(
409
- TEMPLATE_PATH, USE_SIFT
410
- )
411
-
412
- if pdf.lower().endswith(".pdf"):
413
- logger.info(f"Processing PDF: {pdf}")
414
- logger.info(f"Using {'SIFT' if USE_SIFT else 'ORB'} detector with {MAX_WORKERS} workers")
415
- logger.info("Polarity-invariant matching enabled (white-on-dark logos supported)")
416
-
417
- cropped_logo = process_pdf(
418
- pdf, gray_template, detector,
419
- kp_orig, des_orig,
420
- kp_inv, des_inv
421
- )
422
-
423
- if cropped_logo is not None:
424
- os.makedirs("assets/tmp", exist_ok=True)
425
- output_path = "assets/tmp/cropped_logo.png"
426
- cv2.imwrite(output_path, cropped_logo)
427
- logger.info(f"✓ Logo saved to {output_path}")
428
- return output_path
429
- else:
430
- logger.info("No logo found")
431
- return None
432
- else:
433
- logger.warning("File is not a PDF")
434
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/models.py CHANGED
@@ -1,18 +1,7 @@
1
  from pydantic import BaseModel, Field
2
- from typing import List, Optional
3
 
4
- # OCR Extraction
5
- class FSCLogoData(BaseModel):
6
- label_title: Optional[str] = Field(description="Short text from the logo's Label Title field, e.g. 'MIX', 'RECYCLED', '100%'")
7
- product_type: Optional[str] = Field(description="Descriptive text from the logo's 'Product Type' field")
8
- fsc_license_code: Optional[str] = Field(description="The 7 character FSC license code from the logo. The code always starts with a C followed by 6 digits. Return null if you can't see a full 6 digit code.")
9
-
10
- class FSCExtractionResult(BaseModel):
11
- fsc_data: List[FSCLogoData]
12
-
13
- # Image analysis
14
  class TaxonomyMatch(BaseModel):
15
- main_category: Optional[str] = None
16
  level1_code: Optional[str] = None
17
  level1_name: Optional[str] = None
18
  level2_code: Optional[str] = None
@@ -21,17 +10,16 @@ class TaxonomyMatch(BaseModel):
21
  rationale: Optional[str] = None
22
 
23
  class FSCExtractionAndTaxonomy(BaseModel):
24
- product_summary: str = Field(description="Describe the product")
25
- inferred_material: str = Field(description="Describe the material/substrate of the product")
26
  taxonomy_matches: List[TaxonomyMatch]
27
  has_taxonomy_match: bool = Field(description="Whether the product matches any category in the provided taxonomy")
28
- image_text: Optional[str] = Field(description="Get all text from the image")
29
 
 
 
 
 
 
30
 
31
- # Product type check
32
- class ProductTypeMatch(BaseModel):
33
- has_material_check: bool = Field(description="Whether the material approximately aligns with the product type")
34
- material_rationale: Optional[str] = Field(description="The rationale for the material match")
35
- has_taxonomy_check: bool = Field(description="Whether one or more of the taxonomy items approximately align with the product type")
36
- taxonomy_rationale: Optional[str] = Field(description="The rationale for the taxonomy match")
37
- taxonomy_matches: List[TaxonomyMatch] = Field(description="The taxonomy items that approximately align with the product type")
 
1
  from pydantic import BaseModel, Field
2
+ from typing import List, Optional, Dict
3
 
 
 
 
 
 
 
 
 
 
 
4
  class TaxonomyMatch(BaseModel):
 
5
  level1_code: Optional[str] = None
6
  level1_name: Optional[str] = None
7
  level2_code: Optional[str] = None
 
10
  rationale: Optional[str] = None
11
 
12
  class FSCExtractionAndTaxonomy(BaseModel):
13
+ product_summary: Optional[str] = None
14
+ inferred_material: Optional[str] = None
15
  taxonomy_matches: List[TaxonomyMatch]
16
  has_taxonomy_match: bool = Field(description="Whether the product matches any category in the provided taxonomy")
 
17
 
18
+ # OCR Extraction Models
19
+ class FSCLogoData(BaseModel):
20
+ label_title: Optional[str] = Field(description="The title of the label, e.g. 'MIX', RECYCLED' etc.")
21
+ product_type: Optional[str] = Field(description="The type of product, e.g. 'Paper', 'Wood', 'Packaging'")
22
+ fsc_license_code: Optional[str] = Field(description="The 7 character FSC license code. The code always starts with a C followed by 6 digits")
23
 
24
+ class FSCExtractionResult(BaseModel):
25
+ fsc_data: List[FSCLogoData]
 
 
 
 
 
modules/ocr_extraction.py CHANGED
@@ -1,77 +1,17 @@
1
  import os
2
  import base64
 
3
  import logging
 
4
  import openai
5
- from rapidfuzz import fuzz, process
6
-
7
- # import from modules
8
- from modules.models import FSCExtractionResult
9
- from modules.prompts import prompt_ocr
10
- from modules.utils import get_token_costs
11
 
 
12
  logger = logging.getLogger(__name__)
13
 
14
- # Valid FSC label titles for fuzzy matching
15
- VALID_LABEL_TITLES = [
16
- "100%",
17
- "Mix",
18
- "Recycled",
19
- ]
20
-
21
- def correct_label_title(extracted_title: str, threshold: int = 85) -> str:
22
- """
23
- Correct OCR errors in label titles using fuzzy matching.
24
-
25
- Args:
26
- extracted_title: The label title extracted by OCR
27
- threshold: Minimum similarity score (0-100) to accept a match
28
-
29
- Returns:
30
- Corrected label title or original if no good match found
31
- """
32
- if not extracted_title or extracted_title == "Not Found":
33
- return extracted_title
34
-
35
- # Normalize the input
36
- normalized_title = extracted_title.strip()
37
-
38
- # Check for exact match first (case-insensitive)
39
- for valid_title in VALID_LABEL_TITLES:
40
- if normalized_title.upper() == valid_title.upper():
41
- return valid_title
42
-
43
- # Use fuzzy matching to find the best match (case-insensitive)
44
- # Convert to uppercase for comparison
45
- normalized_upper = normalized_title.upper()
46
- valid_titles_upper = [title.upper() for title in VALID_LABEL_TITLES]
47
-
48
- result = process.extractOne(
49
- normalized_upper,
50
- valid_titles_upper,
51
- scorer=fuzz.ratio,
52
- score_cutoff=threshold
53
- )
54
-
55
- if result:
56
- matched_title_upper, score, matched_index = result
57
- # Return the original casing from VALID_LABEL_TITLES
58
- matched_title = VALID_LABEL_TITLES[matched_index]
59
-
60
- if normalized_title != matched_title:
61
- logger.info(
62
- f"OCR correction: '{normalized_title}' → '{matched_title}' "
63
- f"(similarity: {score}%)"
64
- )
65
- return matched_title
66
-
67
- # No good match found, return original
68
- logger.warning(
69
- f"No fuzzy match found for label title: '{normalized_title}' "
70
- f"(threshold: {threshold}%)"
71
- )
72
- return extracted_title
73
-
74
- def extract_fsc_logo_data_single(image_path: str, client: openai.OpenAI, model_name: str = "gpt-4.1") -> dict:
75
  """
76
  Extract FSC logo data from a single image file.
77
 
@@ -79,73 +19,188 @@ def extract_fsc_logo_data_single(image_path: str, client: openai.OpenAI, model_n
79
  image_path: Path to the image file
80
  client: OpenAI client instance
81
  model_name: Model to use for extraction
 
 
82
 
83
  Returns:
84
- Dictionary with extracted FSC data and usage information
85
  """
86
-
87
- INPUT_TOKEN_COST, OUTPUT_TOKEN_COST = get_token_costs(model_name)
 
 
 
 
 
 
 
 
 
88
 
89
  # Read and encode the target image file
90
  with open(image_path, "rb") as img_file:
91
  img_bytes = img_file.read()
92
  img_base64 = base64.b64encode(img_bytes).decode('utf-8')
93
 
94
- # Try to read example image if it exists
95
- example_image_path = os.path.join("assets","guidance.png")
96
- if not os.path.exists(example_image_path):
97
- logger.error("Can't find OCR guidance image")
98
- return None
99
-
100
- # Read and encode example image
101
- with open(example_image_path, "rb") as example_file:
102
- example_bytes = example_file.read()
103
- example_base64 = base64.b64encode(example_bytes).decode('utf-8')
104
-
105
- content = [{"type": "input_text", "text": prompt_ocr()},
106
- {"type": "input_image", "image_url": f"data:image/png;base64,{example_base64}", "detail": "high"},
107
- {"type": "input_text", "text": "Now analyze this image and extract all FSC logo data:"},
108
- {"type": "input_image", "image_url": f"data:image/png;base64,{img_base64}", "detail": "high"}]
109
-
110
- response = client.responses.parse(
111
- model=model_name,
112
- input=[{"role": "user", "content": content}],
113
- text_format=FSCExtractionResult,
114
- )
115
-
116
- # Calculate usage and cost
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  usage = response.usage
118
- input_tokens = usage.input_tokens
119
- output_tokens = usage.output_tokens
 
 
120
  input_cost = (input_tokens / 1000) * INPUT_TOKEN_COST
121
  output_cost = (output_tokens / 1000) * OUTPUT_TOKEN_COST
122
  call_cost = input_cost + output_cost
123
 
 
 
 
 
 
124
  logger.info(f"OCR extraction: {input_tokens} input + {output_tokens} output = ${call_cost:.4f}")
125
 
126
- # Extract and return the result
127
- result = response.output_parsed
128
- result_dict = result.model_dump()
129
-
130
- for item in result_dict["fsc_data"]:
131
- for key, value in item.items():
132
- if value is None:
133
- item[key] = "Not Found"
134
-
135
- # Apply fuzzy matching correction to label_title
136
- if "label_title" in item:
137
- original_title = item["label_title"]
138
- corrected_title = correct_label_title(original_title)
139
- item["label_title"] = corrected_title
140
-
141
- return {
142
- "fsc_data": result_dict["fsc_data"],
143
- "usage": {
144
- "input_tokens": input_tokens,
145
- "output_tokens": output_tokens,
146
- "total_cost": call_cost
147
  }
148
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
149
 
150
 
151
 
 
1
  import os
2
  import base64
3
+ import json
4
  import logging
5
+ from typing import Dict, Optional, List
6
  import openai
7
+ from .models import FSCExtractionResult
8
+ from .utils import get_token_costs
 
 
 
 
9
 
10
+ # Configure logging
11
  logger = logging.getLogger(__name__)
12
 
13
+ def extract_fsc_logo_data_single(image_path: str, client: openai.OpenAI, model_name: str = "gpt-4o",
14
+ total_tokens: Dict[str, int] = None, total_cost: List[float] = None) -> dict:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  """
16
  Extract FSC logo data from a single image file.
17
 
 
19
  image_path: Path to the image file
20
  client: OpenAI client instance
21
  model_name: Model to use for extraction
22
+ total_tokens: Dictionary to track token usage
23
+ total_cost: List to track total cost
24
 
25
  Returns:
26
+ Dictionary with extracted FSC data
27
  """
28
+ if total_tokens is None:
29
+ total_tokens = {"input": 0, "output": 0}
30
+ if total_cost is None:
31
+ total_cost = [0.0]
32
+
33
+ try:
34
+ INPUT_TOKEN_COST, OUTPUT_TOKEN_COST = get_token_costs(model_name)
35
+ except ValueError:
36
+ # Fallback to gpt-4o pricing if model not found
37
+ logger.warning(f"Model {model_name} not found in pricing table, using gpt-4o pricing")
38
+ INPUT_TOKEN_COST, OUTPUT_TOKEN_COST = get_token_costs("gpt-4o")
39
 
40
  # Read and encode the target image file
41
  with open(image_path, "rb") as img_file:
42
  img_bytes = img_file.read()
43
  img_base64 = base64.b64encode(img_bytes).decode('utf-8')
44
 
45
+ # Try to read example image if it exists, otherwise use a simpler prompt
46
+ example_image_path = "guidance.png"
47
+ if os.path.exists(example_image_path):
48
+ # Read and encode an example image (one shotting works better)
49
+ with open(example_image_path, "rb") as example_file:
50
+ example_bytes = example_file.read()
51
+ example_base64 = base64.b64encode(example_bytes).decode('utf-8')
52
+
53
+ # Enhanced prompt with example
54
+ prompt = """
55
+ You are an expert at detecting and extracting data from FSC (Forest Stewardship Council) certification logos in images.
56
+
57
+ Here's an example of what an FSC logo looks like and the data you should extract:
58
+
59
+ [EXAMPLE IMAGE WILL BE SHOWN BELOW]
60
+
61
+ From this example FSC logo, you would extract:
62
+ - Label Title: "RECYCLED"
63
+ - Product Type: "Papier aus Recyclingmaterial"
64
+ - FSC License Code: "C075521"
65
+
66
+ Your Task:
67
+ Extract three specific data fields:
68
+ - Label Title
69
+ - Product Type
70
+ - FSC License Code
71
+
72
+ If no FSC logos are found, return all null values.
73
+
74
+ Now analyze the following image and extract all FSC logo data:
75
+ """
76
+
77
+ # API call (includes example and target images)
78
+ response = client.beta.chat.completions.parse(
79
+ model=model_name,
80
+ messages=[
81
+ {
82
+ "role": "user",
83
+ "content": [
84
+ {"type": "text", "text": prompt},
85
+ {
86
+ "type": "image_url",
87
+ "image_url": {
88
+ "url": f"data:image/png;base64,{example_base64}",
89
+ "detail": "high"
90
+ }
91
+ },
92
+ {"type": "text", "text": "Now analyze this image:"},
93
+ {
94
+ "type": "image_url",
95
+ "image_url": {
96
+ "url": f"data:image/png;base64,{img_base64}",
97
+ "detail": "high"
98
+ }
99
+ }
100
+ ]
101
+ }
102
+ ],
103
+ response_format=FSCExtractionResult,
104
+ )
105
+ else:
106
+ # Simplified prompt without example image
107
+ prompt = """
108
+ You are an expert at detecting and extracting data from FSC (Forest Stewardship Council) certification logos in images.
109
+
110
+ Your Task:
111
+ Extract three specific data fields from FSC logos in the image:
112
+ - Label Title (e.g., "MIX", "RECYCLED", "PURE")
113
+ - Product Type (e.g., "Paper", "Wood", "Packaging")
114
+ - FSC License Code (7 character code starting with C followed by 6 digits, e.g., "C075521")
115
+
116
+ Look for FSC certification logos, labels, or text in the image. If no FSC logos are found, return all null values.
117
+
118
+ Now analyze the following image and extract all FSC logo data:
119
+ """
120
+
121
+ # API call (target image only)
122
+ response = client.beta.chat.completions.parse(
123
+ model=model_name,
124
+ messages=[
125
+ {
126
+ "role": "user",
127
+ "content": [
128
+ {"type": "text", "text": prompt},
129
+ {
130
+ "type": "image_url",
131
+ "image_url": {
132
+ "url": f"data:image/png;base64,{img_base64}",
133
+ "detail": "high"
134
+ }
135
+ }
136
+ ]
137
+ }
138
+ ],
139
+ response_format=FSCExtractionResult,
140
+ )
141
+
142
+ # Track token usage and cost
143
  usage = response.usage
144
+ input_tokens = usage.prompt_tokens
145
+ output_tokens = usage.completion_tokens
146
+
147
+ # Calculate costs
148
  input_cost = (input_tokens / 1000) * INPUT_TOKEN_COST
149
  output_cost = (output_tokens / 1000) * OUTPUT_TOKEN_COST
150
  call_cost = input_cost + output_cost
151
 
152
+ # Update totals
153
+ total_tokens["input"] += input_tokens
154
+ total_tokens["output"] += output_tokens
155
+ total_cost[0] += call_cost
156
+
157
  logger.info(f"OCR extraction: {input_tokens} input + {output_tokens} output = ${call_cost:.4f}")
158
 
159
+ # Extract the parsed result
160
+ if response.choices[0].message.parsed:
161
+ result = response.choices[0].message.parsed
162
+
163
+ # Add source_file deterministically to each item
164
+ result_dict = result.model_dump()
165
+ for item in result_dict["fsc_data"]:
166
+ item["source_file"] = os.path.basename(image_path)
167
+
168
+ return result_dict
169
+ else:
170
+ # Fallback case - create structure with source_file
171
+ fallback_result = {
172
+ "fsc_data": [{
173
+ "source_file": os.path.basename(image_path),
174
+ "label_title": None,
175
+ "product_type": None,
176
+ "fsc_license_code": None,
177
+ }]
 
 
178
  }
179
+ return fallback_result
180
+
181
+ # Keep the original function for backward compatibility
182
+ def extract_fsc_logo_data(directory: str, image_file: str, client: openai.OpenAI,
183
+ model_name: str = "gpt-4o", total_tokens: Dict[str, int] = None,
184
+ total_cost: List[float] = None) -> str:
185
+ """
186
+ Original function for batch processing - kept for backward compatibility.
187
+
188
+ Args:
189
+ directory: Directory containing the image file
190
+ image_file: Name of the image file
191
+ client: OpenAI client instance
192
+ model_name: Model to use for extraction
193
+ total_tokens: Dictionary to track token usage
194
+ total_cost: List to track total cost
195
+
196
+ Returns:
197
+ JSON string with extracted FSC data
198
+ """
199
+ image_path = os.path.join(directory, image_file)
200
+ result = extract_fsc_logo_data_single(image_path, client, model_name, total_tokens, total_cost)
201
+ return json.dumps(result)
202
+
203
+
204
 
205
 
206
 
modules/pipeline.py DELETED
@@ -1,305 +0,0 @@
1
- import os
2
- import logging
3
- from typing import List, Dict, Tuple
4
- import openai
5
- import configparser
6
- import pandas as pd
7
-
8
- # import from modules
9
- from modules.logo_extraction import get_crop_from_pdf
10
- from modules.ocr_extraction import extract_fsc_logo_data_single
11
- from modules.database_lookup import get_fsc_products_for_license, filter_taxonomy_by_fsc_products
12
- from modules.image_analysis import extract_fsc_logo_and_taxonomy, build_taxonomy_block_from_dataframe, analyze_fsc_text
13
- from modules.scope_check import check_product_type_match, step5_label_title_check
14
- from modules.utils import extract_cost
15
-
16
-
17
- # ============================ Config ============================================
18
-
19
- # Configure logging
20
- logger = logging.getLogger(__name__)
21
-
22
- # Load configuration
23
- config = configparser.ConfigParser()
24
- config.read('config.cfg')
25
-
26
- # Expect your API key in environment: OPENAI_API_KEY
27
- client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
28
-
29
- # Model configuration
30
- OCR_MODEL = config.get('ocr', 'MODEL')
31
- SCOPE_MODEL = config.get('scope', 'MODEL')
32
-
33
- # Load taxonomy
34
- try:
35
- taxonomy_df = pd.read_csv(os.path.join("assets","product_classification.csv"))
36
- logger.info(f"Loaded taxonomy with {len(taxonomy_df)} categories")
37
- except Exception as e:
38
- logger.error(f"Failed to load taxonomy: {e}")
39
- taxonomy_df = pd.DataFrame()
40
-
41
- # Import dictionary of label_title translations and package as dictionary for mix and recycled (100% is the same in all languages so not needed))
42
- try:
43
- translations_df = pd.read_csv(os.path.join("assets","label_title_mapping.csv"))
44
- logger.info(f"Loaded translations dictionary")
45
- mix = set(translations_df['category_mix'].to_list())
46
- recycled = set(translations_df['category_recycled'].to_list())
47
- translations_dict = {"mix" : mix, "recycled" : recycled}
48
- except:
49
- logger.error(f"Failed to load translations dictionary: {e}")
50
-
51
- # ============================ Pipeline ============================================
52
-
53
- def step1_extract_fsc_code(image_file) -> Tuple[str, str, str, str]:
54
- """Step 1: Extract FSC license code using OCR."""
55
-
56
- try:
57
- image_path = image_file
58
- # Step 1: Extract FSC license code using OCR extraction
59
- logger.info("Step 1: Extracting FSC license code using OCR...")
60
-
61
- # Use the updated OCR extraction function
62
- ocr_data = extract_fsc_logo_data_single(
63
- image_path,
64
- client,
65
- model_name=OCR_MODEL
66
- )
67
-
68
- fsc_entry = ocr_data["fsc_data"][0]
69
- fsc_code = fsc_entry.get("fsc_license_code")
70
- label_title = fsc_entry.get("label_title")
71
- product_type = fsc_entry.get("product_type")
72
-
73
- logger.info(f"OCR extracted FSC license code: {fsc_code}")
74
-
75
- # Get usage information from the response
76
- usage = ocr_data.get("usage", {})
77
- total_cost = usage.get("total_cost", 0.0)
78
- input_tokens = usage.get("input_tokens", 0)
79
- output_tokens = usage.get("output_tokens", 0)
80
-
81
- # Format cost information
82
- cost_info = f"""
83
- Total Cost: ${total_cost:.4f}
84
- Model Used: {OCR_MODEL}
85
- Token Usage: {input_tokens} input + {output_tokens} output tokens"""
86
-
87
- if fsc_code == "Not Found":
88
- return fsc_code, label_title, product_type, cost_info
89
-
90
- # Ensure FSC- prefix
91
- if not fsc_code.startswith("FSC-"):
92
- fsc_code = f"FSC-{fsc_code}"
93
-
94
- return fsc_code, label_title, product_type, cost_info
95
-
96
- except Exception as e:
97
- logger.error(f"OCR extraction failed: {e}")
98
- return f"Error in OCR extraction: {str(e)}"
99
-
100
-
101
- def step2_fsc_lookup(fsc_code) -> Tuple[List[Dict], str, str, str, str, str]:
102
- """Step 2: Lookup FSC license in API and get scope."""
103
- if not fsc_code or fsc_code == "Not Found":
104
- return [], "No FSC code found - skipping API lookup"
105
-
106
- try:
107
- logger.info(f"Step 2: Looking up FSC license: {fsc_code}")
108
-
109
- fsc_products, license_status, certificate_status = get_fsc_products_for_license(fsc_code)
110
-
111
- # Handle case where license is not found in FSC database
112
- if license_status == "Not Found":
113
- logger.warning(f"FSC license {fsc_code} not found in FSC database")
114
- return [], "FSC Code Not Found in Database", ""
115
-
116
- return fsc_products, license_status, certificate_status
117
-
118
- except Exception as e:
119
- logger.error(f"Error in FSC lookup: {e}")
120
- return [], "Error in FSC lookup"
121
-
122
- def step3_analyze_with_filtered_taxonomy(image_file, fsc_code, product_type, fsc_products, license_status) -> Tuple[str, str, str, str, str, str]:
123
- """Step 3: Analyze product with FSC-filtered taxonomy."""
124
-
125
- try:
126
- # Convert PDF to image if needed
127
- cleanup_png = False
128
- # Step 3: Filter taxonomy and analyze
129
- logger.info("Step 3: Analyzing product with filtered taxonomy...")
130
-
131
- # Determine which taxonomy to use
132
- if fsc_code != "Not Found" and license_status == 'Valid' and fsc_products:
133
- # filter full taxonomy based on licensed FSC products
134
- filtered_taxonomy = filter_taxonomy_by_fsc_products(taxonomy_df, fsc_products)
135
- # Construct the filtered taxonomy to be LLM friendly (for prompt construction)
136
- if not filtered_taxonomy.empty:
137
- taxonomy_block = build_taxonomy_block_from_dataframe(filtered_taxonomy, include_level3=True)
138
- taxonomy_source = "FSC-filtered"
139
- # If no scope matches found, use full taxonomy
140
- else:
141
- # Use full taxonomy
142
- taxonomy_block = build_taxonomy_block_from_dataframe(taxonomy_df, include_level3=True)
143
- taxonomy_source = "full"
144
-
145
- logger.info(f"Using {taxonomy_source} taxonomy for analysis")
146
-
147
- result = extract_fsc_logo_and_taxonomy(
148
- image_file,
149
- product_type,
150
- taxonomy_block,
151
- client=client,
152
- model_name=SCOPE_MODEL
153
- )
154
-
155
- image_data = result.get("image_data", {})
156
- product_summary = image_data.get("product_summary", "No product summary available")
157
- material = image_data.get("inferred_material", "No material information available")
158
-
159
- # Format taxonomy results
160
- taxonomy_matches = image_data.get("taxonomy_matches", [])
161
- has_match = image_data.get("has_taxonomy_match", False)
162
-
163
- taxonomy_results = ""
164
- if has_match and taxonomy_matches:
165
- for i, match in enumerate(taxonomy_matches[:3], 1): # Show top 3 matches
166
- taxonomy_results += f"Match {i}:\n"
167
- taxonomy_results += f"Main Category: {match['main_category']}\n"
168
- if match.get("level1_code") and match.get("level1_name"):
169
- taxonomy_results += f"Level 1: {match['level1_code']} - {match['level1_name']}\n"
170
- if match.get("level2_code") and match.get("level2_name"):
171
- taxonomy_results += f"Level 2: {match['level2_code']} - {match['level2_name']}\n"
172
- if match.get("confidence"):
173
- taxonomy_results += f"Confidence: {match['confidence']:.2f}\n"
174
- if match.get("rationale"):
175
- taxonomy_results += f"Rationale: {match['rationale']}\n"
176
- taxonomy_results += "\n"
177
- else:
178
- taxonomy_results += "Taxonomy Classification: No matching categories found"
179
-
180
- image_text = image_data.get("image_text", "No image text available")
181
-
182
- # Get usage information from the response
183
- usage = result.get("usage", {})
184
- total_cost = usage.get("total_cost", 0.0)
185
- input_tokens = usage.get("input_tokens", 0)
186
- output_tokens = usage.get("output_tokens", 0)
187
-
188
- cost_info = f"""
189
- Total Cost: ${total_cost:.4f}
190
- Model Used: {SCOPE_MODEL}
191
- Token Usage: {input_tokens} input + {output_tokens} output tokens"""
192
-
193
- return product_summary, material, taxonomy_results, image_text, cost_info
194
-
195
- except Exception as e:
196
- logger.error(f"Error in step 3: {e}")
197
-
198
- def step4_product_type_check(product_type: str, material: str, taxonomy_results: str) -> Tuple[str, str]:
199
- """Step 4: Check if the product type aligns with image analysis"""
200
-
201
- # Call the scope_check module function
202
- result = check_product_type_match(
203
- product_type=product_type,
204
- material=material,
205
- taxonomy_results=taxonomy_results,
206
- client=client,
207
- model_name=SCOPE_MODEL
208
- )
209
-
210
- product_type_match = result.get("product_type_match", {})
211
-
212
- usage = result.get("usage", {})
213
- total_cost = usage.get("total_cost", 0.0)
214
- input_tokens = usage.get("input_tokens", 0)
215
- output_tokens = usage.get("output_tokens", 0)
216
-
217
- # Format cost information
218
- cost_info = f"""
219
- Total Cost: ${total_cost:.4f}
220
- Model Used: {SCOPE_MODEL}
221
- Token Usage: {input_tokens} input + {output_tokens} output tokens"""
222
-
223
- return product_type_match, cost_info
224
-
225
- # ============================ Bring it all together ============================================
226
-
227
- def process_image_progressive(image_file):
228
- """Process image with progressive updates showing each step."""
229
-
230
- # Step 0: Search for FSC logo
231
- if image_file.lower().endswith(".pdf"):
232
- yield "⏳ Step 0: Searching for FSC Logo. Grab a coffee - this can take a while...", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""
233
- image_file_cropped = get_crop_from_pdf(image_file)
234
- else:
235
- return "Please upload an image file in pdf format", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""
236
-
237
- # If we get a failed logo extraction, return an error
238
- if not image_file_cropped:
239
- yield "No logo found in the image", "", "", "", "", "", "", "", "", "Manual Review", "", "", "", "", "", ""
240
- return
241
- else:
242
- yield "⏳ Step 1: Extracting Logo Data...", "⏳ Step 1: Extracting Logo Data...", "⏳ Step 1: Extracting Logo Data...", "", "", "", "", "", "", "", "", "", "", "", "", ""
243
-
244
- # ========== Step 1: Extract FSC code ==========
245
- fsc_code, label_title, product_type, cost_info_1 = step1_extract_fsc_code(image_file_cropped)
246
- yield fsc_code, label_title, product_type, "⏳ Step 2: Looking up FSC license...", "", "", "", "", "", "", "", "", "", "", "", ""
247
-
248
- if fsc_code == "Not Found":
249
- yield fsc_code, label_title, product_type, "No FSC Code. Cannot proceed with analysis...", "No FSC Code. Cannot proceed with analysis...", "", "", "", "", "Manual Review", "", "", "", "", "", ""
250
- return
251
-
252
- # ========== Step 2: FSC lookup ==========
253
- fsc_products, license_status, certificate_status = step2_fsc_lookup(fsc_code)
254
-
255
- #Check if API call was good / license and certificate status is valid before proceeding to step 3
256
- if license_status == "FSC Code Not Found in Database":
257
- yield fsc_code, label_title, product_type, license_status, "", "", "", "", "", "Manual Review", "", "", "", "", "", ""
258
- return
259
- elif license_status == "" or certificate_status != "Valid":
260
- license_status = "FSC Certificate Status: '"+certificate_status+"'"
261
- yield fsc_code, label_title, product_type, license_status, "Analysis skipped due to invalid license and/or certificate status", "", "", "", "", "Manual Review", "", "", "", "", "", ""
262
- return
263
- elif license_status != "Valid":
264
- yield fsc_code, label_title, product_type, license_status, "Analysis skipped due to invalid license status", "", "", "", "", "Manual Review", "", "", "", "", "", ""
265
- return
266
-
267
- yield fsc_code, label_title, product_type, license_status, "⏳ Step 3: Analyzing image...", "⏳ Step 3: Analyzing image...", "⏳ Step 3: Analyzing image...", "⏳ Step 3: Analyzing image...", "⏳ Step 3: Analyzing image...", "", "", "", "", "", "", ""
268
-
269
-
270
- # ========== Step 3: Analyze image with filtered taxonomy ==========
271
- product_summary, material, taxonomy_results, image_text, cost_info_3 = step3_analyze_with_filtered_taxonomy(image_file, fsc_code, product_type, fsc_products, license_status)
272
- fsc_text_strings = analyze_fsc_text(image_text)
273
- yield fsc_code, label_title, product_type, license_status, product_summary, material, taxonomy_results, image_text, fsc_text_strings, "⏳ Step 4: Checking Scope...", "⏳ Step 4: Checking Scope...", "⏳ Step 4: Checking Scope...", "⏳ Step 4: Checking Scope...", "⏳ Step 4: Checking Scope...", "⏳ Step 4: Checking Scope...",""
274
-
275
- # ========== Step 4: Check if the product type aligns with image analysis ==========
276
- product_type_match, cost_info_4 = step4_product_type_check(product_type, material, taxonomy_results)
277
-
278
- # Add up the token cost for each step
279
- cost_1 = extract_cost(cost_info_1)
280
- cost_3 = extract_cost(cost_info_3)
281
- cost_4 = extract_cost(cost_info_4)
282
- total_cost_all_steps = cost_1 + cost_3 + cost_4
283
-
284
- # Combine cost information from all three steps
285
- combined_cost_info = f"""=== TOTAL COST (ALL STEPS) ===
286
- ${total_cost_all_steps:.4f}
287
-
288
- === STEP 1: OCR EXTRACTION ==={cost_info_1}
289
-
290
- === STEP 3: IMAGE ANALYSIS ==={cost_info_3}
291
-
292
- === STEP 4: SCOPE CHECK ==={cost_info_4}"""
293
-
294
- # ========== Step 5: Check if the label title matches the taxonomy matches ==========
295
- label_title_check = step5_label_title_check(label_title, fsc_products, product_type_match.taxonomy_matches, translations_dict)
296
-
297
- # Get overall compliance status
298
- overall_scope_check = "Ok" if (product_type_match.has_material_check and product_type_match.has_taxonomy_check and label_title_check) else "Manual Review"
299
-
300
- product_type_check = "Ok" if product_type_match.has_material_check else "Not Ok"
301
- taxonomy_check = "Ok" if product_type_match.has_taxonomy_check else "Not Ok"
302
- label_title_check = "Ok" if label_title_check else "Not Ok"
303
-
304
- yield fsc_code, label_title, product_type, license_status, product_summary, material, taxonomy_results, image_text, fsc_text_strings, overall_scope_check, product_type_check, taxonomy_check, label_title_check, product_type_match.material_rationale, product_type_match.taxonomy_rationale, combined_cost_info
305
- return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/prompts.py DELETED
@@ -1,78 +0,0 @@
1
- # Repo for all three pipeline prompts
2
-
3
- def prompt_ocr():
4
- return """
5
- You are an expert at detecting and extracting data from FSC (Forest Stewardship Council) certification logos in images.
6
-
7
- Your Task:
8
- Analyze images of FSC logos and extract three specific data fields:
9
- - Label Title (short text label field)
10
- - Product Type (text description field)
11
- - FSC License Code (short text code field)
12
-
13
- If no FSC logos are found, return all null values.
14
-
15
- Please see example below of what an FSC logo looks like and the data you should extract. From this example FSC logo, you would extract:
16
- - Label Title: "RECYCLED"
17
- - Product Type: "Papier aus Recyclingmaterial"
18
- - FSC License Code: "C075521"
19
-
20
- Here is the example:
21
- """.strip()
22
-
23
-
24
- def prompt_image_analysis(product_type: str, taxonomy_block: str) -> str:
25
- return f"""
26
- You are a compliance analyst assessing products for alignment with the Forest Stewardship Council (FSC) product taxonomy. You will be provided with an image of a product from an FSC licensee for approval.
27
- The product identification task is quite difficult - as it is not always clear from images whether the product pictured in the image is the actual licensed product, /
28
- or whether the image depicts a representation of another item *inside* the licensed product.
29
- For example, a FSC licensee that makes paper-based packaging might submit a schematic of packaging design for woodent children's toy. As both wood and paper products (inter alia) are covered by FSC categories, /
30
- it is difficult to tell whether the product is the packaging or the wooden toy. In such cases, it can be helpful to review the Product Type presented on the logo. However, the Product Type can be wrong in some cases, /
31
- so it is important to think critically about all aspects of the image, alongside the Product Type. If the Product Type mentions 'Paper' products and the filtered TAXONOMY (see below) does NOT mention 'Wood' products, /
32
- then we can infer that the primary product is the actual paper packaging of the toy.
33
-
34
- Often the image will features pictures(s) of other non-relevant products (e.g. bicycles). The actual FSC product can in such cases be a printed material or other substrate (e.g. packaging, electronic media). Carefully assess what the product is, taking this into account.
35
- When in doubt, carefully read all text in the image to help infer what the product is vs. what the actual FSC product is.
36
- A major flag is when you see a schematic which indicates a packaging design document. In this case, the product indicated on the packaging may differ from the actual FSC product.
37
- Also be careful as some images are taken from a screenshot of a website. In this case, read all text as it may give an indication of the FSC product type.
38
-
39
- Product Type: "{product_type}"
40
-
41
- ## TAXONOMY (Filtered for this FSC Licensee. Respond with main category, codes and names; choose from these if there's a match)
42
- {taxonomy_block}
43
-
44
- TASKS:
45
- 1. Describe what the product pictured in the image appears to be
46
- 2. Infer the substrate/material of the primary product pictured in the image (where we infer the primary product as relates to the FSC TAXONOMY and Product Type)
47
- 3. Assess if the product matches any category in the provided TAXONOMY (where we infer the primary product as relates to the FSC TAXONOMY and Product Type)
48
- 4. If there's a match, select 1–3 closest TAXONOMY alignments from the TAXONOMY below
49
- 5. If there's NO match, set has_taxonomy_match to false and leave taxonomy_matches empty
50
- 6. If you can't interpret what the product is from the image, set has_taxonomy_match to false and leave taxonomy_matches empty
51
- 7. Extract all text from the image and return it in the image_text field.
52
-
53
- IMPORTANT: Only choose codes/names that appear in TAXONOMY. If the product doesn't clearly match any category in the taxonomy, indicate this by setting has_taxonomy_match to false.
54
-
55
- Heuristics / Few-shot hints:
56
- - Retail cereal boxes are folding cartons/cardboard → prefer P5.1 over P5.2 unless corrugation is explicit
57
- - Chests/commodes/cupboards/drawers → W12 Indoor furniture → W12.1 Cabinet
58
- - If the product appears to be something not covered by the taxonomy (e.g., electronics, clothing, food items), set has_taxonomy_match to false (but make sure to consider what the actual primary product is)
59
-
60
- Return your response as per the provided schema.
61
-
62
- Now analyze the image:
63
- """.strip()
64
-
65
-
66
- def prompt_product_type_check(product_type: str, material: str, taxonomy_results: str) -> str:
67
- return f"""
68
- You are a compliance analyst assessing product alignment in the context communications.
69
-
70
- TASKS:
71
- 1. Does the substrate/material "{material}" approximately match the material mentioned in the product type: "{product_type}"? Note - we only care here about the material, not any claims about sourcing/sustainability etc. We should also be very careful that the match is quite clear. For instance, if the substrate/material is "Cardboard" with a product type of "Paper products source from sustainable forests", then it would be a match as cardboard is a clear paper product. However, if the substrate/material is "Cardboard" with a product type of "Wood source from sustainable forests", it would NOT be a match. Even though cardboard comes from wood, we are looking for closer matches.
72
- 2. Include rationale for your answer.
73
- 3. Does one or more of the products mentioned in the taxonomy results ("{taxonomy_results}") approximately align with the material mentioned in "{product_type}"? E.g. if the product type is "Paper", then the taxonomy items could be something related to "Paper products". Here the 'Main Category' is the main category of the product type.
74
- 4. Return the taxonomy items that approximately align with the product type in a list.
75
- 5. Include rationale for your answer.
76
-
77
- Return your response as per the provided schema.
78
- """.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/scope_check.py DELETED
@@ -1,153 +0,0 @@
1
- import logging
2
- import re
3
- import os
4
- import pandas as pd
5
- from typing import List, Dict, Union, Any
6
- import openai
7
-
8
- # import from modules
9
- from modules.models import ProductTypeMatch
10
- from modules.prompts import prompt_product_type_check
11
- from modules.utils import get_token_costs
12
-
13
- # Configure logging
14
- logger = logging.getLogger(__name__)
15
-
16
- # ============================ Product Type Check ======================================
17
-
18
- def check_product_type_match(product_type: str, material: str, taxonomy_results: str,
19
- client: openai.OpenAI = None, model_name: str = "gpt-4.1") -> ProductTypeMatch:
20
- """
21
- Check if the product type aligns with the inferred material and taxonomy results.
22
-
23
- Args:
24
- product_type: The product type extracted from the FSC label
25
- material: The inferred material from image analysis
26
- taxonomy_results: The taxonomy classification results
27
- client: OpenAI client instance
28
- model_name: Model to use for the check
29
-
30
- Returns:
31
- ProductTypeMatch: Structured result with match assessment and rationale
32
- """
33
- if client is None:
34
- raise ValueError("OpenAI client must be provided")
35
-
36
- INPUT_TOKEN_COST, OUTPUT_TOKEN_COST = get_token_costs(model_name)
37
-
38
- logger.info(f"Checking product type match: {product_type}")
39
-
40
- prompt = prompt_product_type_check(product_type, material, taxonomy_results)
41
-
42
- try:
43
- response = client.responses.parse(
44
- model=model_name,
45
- input=[{"role": "user", "content": prompt}],
46
- text_format=ProductTypeMatch,
47
- )
48
-
49
- usage = response.usage
50
- input_tokens = usage.input_tokens
51
- output_tokens = usage.output_tokens
52
- input_cost = (input_tokens / 1000.0) * INPUT_TOKEN_COST
53
- output_cost = (output_tokens / 1000.0) * OUTPUT_TOKEN_COST
54
- call_cost = input_cost + output_cost
55
-
56
- logger.info(f"✓ Product type check: {input_tokens} in + {output_tokens} out = ${call_cost:.4f}")
57
-
58
- product_type_match = response.output_parsed
59
-
60
- return {
61
- "product_type_match": product_type_match,
62
- "usage": {
63
- "input_tokens": input_tokens,
64
- "output_tokens": output_tokens,
65
- "total_cost": call_cost
66
- }
67
- }
68
-
69
- except Exception as e:
70
- logger.error(f"✗ Error in product type check: {str(e)}")
71
- raise
72
-
73
-
74
- # ============================ Label Title Check ======================================
75
-
76
- def step5_label_title_check(
77
- label_title: str,
78
- fsc_products: Union[Dict[str, Any], List[Dict[str, Any]]],
79
- taxonomy_matches: Any,
80
- translations_dict: dict
81
- ) -> bool:
82
- """
83
- Step 5:
84
- 1. Check if extracted Label Title matches any of the Step 3 identified taxonomy items (which were already filtered per FSC code scope in Step 3 prior to feeding to LLM)
85
- 2. Compare Label Titles of matched taxonomy items with the extracted Label Title
86
- 3. Return True if the extracted Label Title matches the Label Title of any matched taxonomy item
87
- """
88
-
89
- if not label_title or not taxonomy_matches or not fsc_products:
90
- return False
91
-
92
- # --- Extract taxonomy codes robustly ---
93
- codes: set[str] = set()
94
-
95
- def _extract_from_str(s: str):
96
- codes.update(re.findall(r"level1_code='([^']+)'", s))
97
- codes.update(re.findall(r"level2_code='([^']+)'", s))
98
-
99
- if isinstance(taxonomy_matches, str):
100
- _extract_from_str(taxonomy_matches)
101
- elif isinstance(taxonomy_matches, dict):
102
- for k in ("level1_code", "level2_code"):
103
- v = taxonomy_matches.get(k)
104
- if v: codes.add(v)
105
- elif isinstance(taxonomy_matches, (list, tuple, set)):
106
- for m in taxonomy_matches:
107
- if isinstance(m, dict):
108
- for k in ("level1_code", "level2_code"):
109
- v = m.get(k)
110
- if v: codes.add(v)
111
- else:
112
- # dataclass / simple object
113
- for k in ("level1_code", "level2_code"):
114
- v = getattr(m, k, None)
115
- if v: codes.add(v)
116
- else:
117
- _extract_from_str(str(taxonomy_matches))
118
-
119
- if not codes:
120
- logger.warning("Could extract from filtered taxonomy")
121
- return False
122
-
123
-
124
- logo_lt = label_title.strip().upper()
125
-
126
- # --- Match products ---
127
- for product in fsc_products:
128
- level1_text = (product.get("ProductLevel1") or "").strip()
129
- level2_text = (product.get("ProductLevel2") or "").strip()
130
-
131
- level1_code = level1_text.split(" ", 1)[0] if level1_text else ""
132
- level2_code = level2_text.split(" ", 1)[0] if level2_text else ""
133
-
134
- if level1_code in codes or level2_code in codes:
135
- # Get allowed label titles for license scope
136
- scope_lt = (product.get("MainOutputCategory") or "")
137
- # Parse string response (format is like 'FSC Mix; FSC 100%; FSC Recycled')
138
- scope_lt_ls = [item.strip().replace('FSC ', '', 1) for item in scope_lt.split(";")]
139
- logger.info(f"Scoped LTs: {scope_lt_ls}, Target LT: {logo_lt}")
140
- # Lookup scoped label_titles in translation dictionary
141
- for lt in scope_lt_ls:
142
- # 100% is same in all languages so no need for dictionary lookup
143
- if lt == '100%':
144
- if logo_lt == lt:
145
- return True
146
- else:
147
- scope_lt_translated = translations_dict[lt.lower()]
148
- # Then check that logo LT is in scope (using all possible translations)
149
- if logo_lt in scope_lt_translated:
150
- return True
151
-
152
- return False
153
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
modules/utils.py CHANGED
@@ -1,5 +1,3 @@
1
- import re
2
-
3
  # Model pricing lookup table
4
  MODEL_PRICING = {
5
  "gpt-5": {"input": 1.25, "cached_input": 0.125, "output": 10.00},
@@ -34,9 +32,4 @@ def get_token_costs(model_name: str) -> tuple[float, float]:
34
  input_cost = pricing["input"] / 1000.0
35
  output_cost = pricing["output"] / 1000.0
36
 
37
- return input_cost, output_cost
38
-
39
- # Extract costs from each step for total calculation
40
- def extract_cost(cost_str):
41
- match = re.search(r'Total Cost: \$([0-9.]+)', cost_str)
42
- return float(match.group(1)) if match else 0.0
 
 
 
1
  # Model pricing lookup table
2
  MODEL_PRICING = {
3
  "gpt-5": {"input": 1.25, "cached_input": 0.125, "output": 10.00},
 
32
  input_cost = pricing["input"] / 1000.0
33
  output_cost = pricing["output"] / 1000.0
34
 
35
+ return input_cost, output_cost
 
 
 
 
 
assets/product_classification.csv → product_classification.csv RENAMED
File without changes
requirements.txt CHANGED
@@ -6,7 +6,4 @@ requests
6
  pypdfium2
7
  pdf2image
8
  Pillow
9
- gradio
10
- opencv-python-headless
11
- pymupdf
12
- rapidfuzz
 
6
  pypdfium2
7
  pdf2image
8
  Pillow
9
+ gradio