mbudisic commited on
Commit
317ff97
·
1 Parent(s): 542e082

refactor: Moved JSON loading functions and video transcript loaders from loader.py to datastore.py for better organization

Browse files
app.py CHANGED
@@ -29,7 +29,7 @@ import pstuts_rag.rag
29
 
30
  from pstuts_rag.graph import create_rag_node
31
 
32
- from pstuts_rag.loader import load_json_files
33
  from pstuts_rag.prompts import SUPERVISOR_SYSTEM
34
 
35
  import nest_asyncio
 
29
 
30
  from pstuts_rag.graph import create_rag_node
31
 
32
+ from pstuts_rag.datastore import load_json_files
33
  from pstuts_rag.prompts import SUPERVISOR_SYSTEM
34
 
35
  import nest_asyncio
app_simple_rag.py CHANGED
@@ -16,7 +16,7 @@ from qdrant_client import QdrantClient
16
 
17
  import pstuts_rag.datastore
18
  import pstuts_rag.rag
19
- from pstuts_rag.loader import load_json_files
20
 
21
 
22
  @dataclass
 
16
 
17
  import pstuts_rag.datastore
18
  import pstuts_rag.rag
19
+ from pstuts_rag.datastore import load_json_files
20
 
21
 
22
  @dataclass
pstuts_rag/pstuts_rag/datastore.py CHANGED
@@ -1,13 +1,14 @@
1
  import asyncio
 
2
  from typing import List, Dict, Iterator, Any
3
  import uuid
4
 
5
 
 
6
  from langchain_experimental.text_splitter import SemanticChunker
7
  from langchain_openai.embeddings import OpenAIEmbeddings
8
  from langchain_core.documents import Document
9
  from langchain_core.embeddings import Embeddings
10
- from .loader import VideoTranscriptBulkLoader, VideoTranscriptChunkLoader
11
 
12
  from langchain_core.vectorstores import VectorStoreRetriever
13
 
@@ -22,6 +23,119 @@ def batch(iterable: List[Any], size: int = 16) -> Iterator[List[Any]]:
22
  yield iterable[i : i + size]
23
 
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  async def chunk_transcripts(
26
  json_transcripts: List[Dict[str, Any]],
27
  semantic_chunker_embedding_model: Embeddings = OpenAIEmbeddings(
@@ -236,3 +350,74 @@ class DatastoreManager:
236
  return self.vector_store.as_retriever(
237
  search_kwargs={"k": n_context_docs}
238
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import asyncio
2
+ from pathlib import Path
3
  from typing import List, Dict, Iterator, Any
4
  import uuid
5
 
6
 
7
+ from langchain_core.document_loaders import BaseLoader
8
  from langchain_experimental.text_splitter import SemanticChunker
9
  from langchain_openai.embeddings import OpenAIEmbeddings
10
  from langchain_core.documents import Document
11
  from langchain_core.embeddings import Embeddings
 
12
 
13
  from langchain_core.vectorstores import VectorStoreRetriever
14
 
 
23
  yield iterable[i : i + size]
24
 
25
 
26
+ class VideoTranscriptBulkLoader(BaseLoader):
27
+ """
28
+ Loads video transcripts as bulk documents for document processing pipelines.
29
+
30
+ Each video becomes a single document with all transcript sentences concatenated.
31
+ Useful for semantic search across entire video content.
32
+
33
+ Inherits from LangChain's BaseLoader for compatibility with document processing chains.
34
+
35
+ Attributes:
36
+ json_payload (List[Dict]): List of video dictionaries containing transcript data
37
+ """
38
+
39
+ def __init__(self, json_payload: List[Dict]):
40
+ """
41
+ Initialize the bulk loader with video transcript data.
42
+
43
+ Args:
44
+ json_payload (List[Dict]): List of video dictionaries, each containing:
45
+ - transcripts: List of transcript segments
46
+ - qa: Q&A data (optional)
47
+ - url: Video URL
48
+ - other metadata fields
49
+ """
50
+
51
+ self.json_payload = json_payload
52
+
53
+ def lazy_load(self) -> Iterator[Document]:
54
+ """
55
+ Lazy loader that yields Document objects with concatenated transcripts.
56
+
57
+ Creates one Document per video with all transcript sentences joined by newlines.
58
+ Metadata includes all video fields except 'transcripts' and 'qa'.
59
+ The 'url' field is renamed to 'source' for LangChain compatibility.
60
+
61
+ Yields:
62
+ Document: LangChain Document with page_content as concatenated transcript
63
+ and metadata containing video information
64
+ """
65
+
66
+ for video in self.json_payload:
67
+ metadata = dict(video)
68
+ metadata.pop("transcripts", None)
69
+ metadata.pop("qa", None)
70
+ # Rename 'url' key to 'source' in metadata if it exists
71
+ if "url" in metadata:
72
+ metadata["source"] = metadata.pop("url")
73
+ yield Document(
74
+ page_content="\n".join(
75
+ t["sent"] for t in video["transcripts"]
76
+ ),
77
+ metadata=metadata,
78
+ )
79
+
80
+
81
+ class VideoTranscriptChunkLoader(BaseLoader):
82
+ """
83
+ Loads video transcripts as individual chunk documents for fine-grained processing.
84
+
85
+ Each transcript segment becomes a separate document with timing information.
86
+ Useful for precise timestamp-based retrieval and time-sensitive queries.
87
+
88
+ Inherits from LangChain's BaseLoader for compatibility with document processing chains.
89
+
90
+ Attributes:
91
+ json_payload (List[Dict]): List of video dictionaries containing transcript data
92
+ """
93
+
94
+ def __init__(self, json_payload: List[Dict]):
95
+ """
96
+ Initialize the chunk loader with video transcript data.
97
+
98
+ Args:
99
+ json_payload (List[Dict]): List of video dictionaries, each containing:
100
+ - transcripts: List of transcript segments with timing
101
+ - qa: Q&A data (optional)
102
+ - url: Video URL
103
+ - other metadata fields
104
+ """
105
+
106
+ self.json_payload = json_payload
107
+
108
+ def lazy_load(self) -> Iterator[Document]:
109
+ """
110
+ Lazy loader that yields individual Document objects for each transcript segment.
111
+
112
+ Creates one Document per transcript segment with timing metadata.
113
+ Each document contains a single transcript sentence with precise start/end times.
114
+ The 'url' field is renamed to 'source' for LangChain compatibility.
115
+
116
+ Yields:
117
+ Document: LangChain Document with page_content as single transcript sentence
118
+ and metadata containing video info plus time_start and time_end
119
+ """
120
+
121
+ for video in self.json_payload:
122
+ metadata = dict(video)
123
+ transcripts = metadata.pop("transcripts", None)
124
+ metadata.pop("qa", None)
125
+ # Rename 'url' key to 'source' in metadata if it exists
126
+ if "url" in metadata:
127
+ metadata["source"] = metadata.pop("url")
128
+ for transcript in transcripts:
129
+ yield Document(
130
+ page_content=transcript["sent"],
131
+ metadata=metadata
132
+ | {
133
+ "time_start": transcript["begin"],
134
+ "time_end": transcript["end"],
135
+ },
136
+ )
137
+
138
+
139
  async def chunk_transcripts(
140
  json_transcripts: List[Dict[str, Any]],
141
  semantic_chunker_embedding_model: Embeddings = OpenAIEmbeddings(
 
350
  return self.vector_store.as_retriever(
351
  search_kwargs={"k": n_context_docs}
352
  )
353
+
354
+
355
+ def load_json_string(content: str, group: str):
356
+ """
357
+ Parse JSON string content and add group metadata to each video entry.
358
+
359
+ Args:
360
+ content (str): JSON string containing a list of video objects
361
+ group (str): Group identifier to be added to each video entry
362
+
363
+ Returns:
364
+ List[Dict]: List of video dictionaries with added 'group' field
365
+
366
+ Raises:
367
+ json.JSONDecodeError: If content is not valid JSON
368
+ """
369
+ payload: List[Dict] = json.loads(content)
370
+ [video.update({"group": group}) for video in payload]
371
+ return payload
372
+
373
+
374
+ async def load_single_json(filepath):
375
+ """
376
+ Asynchronously load and parse a single JSON file containing video data.
377
+
378
+ Args:
379
+ filepath (str | Path): Path to the JSON file to load
380
+
381
+ Returns:
382
+ List[Dict]: List of video dictionaries with group field set to filename
383
+
384
+ Raises:
385
+ FileNotFoundError: If the specified file doesn't exist
386
+ json.JSONDecodeError: If file content is not valid JSON
387
+ PermissionError: If file cannot be read due to permissions
388
+ """
389
+ my_path = Path(filepath)
390
+
391
+ async with aiofiles.open(my_path, mode="r", encoding="utf-8") as f:
392
+ content = await f.read()
393
+ payload = load_json_string(content, my_path.name)
394
+
395
+ return payload
396
+
397
+
398
+ async def load_json_files(path_pattern: List[str]):
399
+ """
400
+ Asynchronously load and parse multiple JSON files matching given patterns.
401
+
402
+ Uses glob patterns to find files and loads them concurrently for better performance.
403
+ All results are flattened into a single list.
404
+
405
+ Args:
406
+ path_pattern (List[str]): List of glob patterns to match JSON files
407
+ (supports recursive patterns with **)
408
+
409
+ Returns:
410
+ List[Dict]: Flattened list of all video dictionaries from matched files
411
+
412
+ Raises:
413
+ FileNotFoundError: If any matched file doesn't exist during loading
414
+ json.JSONDecodeError: If any file content is not valid JSON
415
+ PermissionError: If any file cannot be read due to permissions
416
+ """
417
+ files = []
418
+ for f in path_pattern:
419
+ (files.extend(glob.glob(f, recursive=True)))
420
+
421
+ tasks = [load_single_json(f) for f in files]
422
+ results = await asyncio.gather(*tasks)
423
+ return [item for sublist in results for item in sublist] # flatten
pstuts_rag/pstuts_rag/loader.py DELETED
@@ -1,193 +0,0 @@
1
- import glob
2
- import json
3
- from langchain_core.document_loaders import BaseLoader
4
- from typing import List, Dict, Iterator
5
- from langchain_core.documents import Document
6
-
7
- import aiofiles
8
- import asyncio
9
- from pathlib import Path
10
-
11
-
12
- def load_json_string(content: str, group: str):
13
- """
14
- Parse JSON string content and add group metadata to each video entry.
15
-
16
- Args:
17
- content (str): JSON string containing a list of video objects
18
- group (str): Group identifier to be added to each video entry
19
-
20
- Returns:
21
- List[Dict]: List of video dictionaries with added 'group' field
22
-
23
- Raises:
24
- json.JSONDecodeError: If content is not valid JSON
25
- """
26
- payload: List[Dict] = json.loads(content)
27
- [video.update({"group": group}) for video in payload]
28
- return payload
29
-
30
-
31
- async def load_single_json(filepath):
32
- """
33
- Asynchronously load and parse a single JSON file containing video data.
34
-
35
- Args:
36
- filepath (str | Path): Path to the JSON file to load
37
-
38
- Returns:
39
- List[Dict]: List of video dictionaries with group field set to filename
40
-
41
- Raises:
42
- FileNotFoundError: If the specified file doesn't exist
43
- json.JSONDecodeError: If file content is not valid JSON
44
- PermissionError: If file cannot be read due to permissions
45
- """
46
- my_path = Path(filepath)
47
-
48
- async with aiofiles.open(my_path, mode="r", encoding="utf-8") as f:
49
- content = await f.read()
50
- payload = load_json_string(content, my_path.name)
51
-
52
- return payload
53
-
54
-
55
- async def load_json_files(path_pattern: List[str]):
56
- """
57
- Asynchronously load and parse multiple JSON files matching given patterns.
58
-
59
- Uses glob patterns to find files and loads them concurrently for better performance.
60
- All results are flattened into a single list.
61
-
62
- Args:
63
- path_pattern (List[str]): List of glob patterns to match JSON files
64
- (supports recursive patterns with **)
65
-
66
- Returns:
67
- List[Dict]: Flattened list of all video dictionaries from matched files
68
-
69
- Raises:
70
- FileNotFoundError: If any matched file doesn't exist during loading
71
- json.JSONDecodeError: If any file content is not valid JSON
72
- PermissionError: If any file cannot be read due to permissions
73
- """
74
- files = []
75
- for f in path_pattern:
76
- (files.extend(glob.glob(f, recursive=True)))
77
-
78
- tasks = [load_single_json(f) for f in files]
79
- results = await asyncio.gather(*tasks)
80
- return [item for sublist in results for item in sublist] # flatten
81
-
82
-
83
- class VideoTranscriptBulkLoader(BaseLoader):
84
- """
85
- Loads video transcripts as bulk documents for document processing pipelines.
86
-
87
- Each video becomes a single document with all transcript sentences concatenated.
88
- Useful for semantic search across entire video content.
89
-
90
- Inherits from LangChain's BaseLoader for compatibility with document processing chains.
91
-
92
- Attributes:
93
- json_payload (List[Dict]): List of video dictionaries containing transcript data
94
- """
95
-
96
- def __init__(self, json_payload: List[Dict]):
97
- """
98
- Initialize the bulk loader with video transcript data.
99
-
100
- Args:
101
- json_payload (List[Dict]): List of video dictionaries, each containing:
102
- - transcripts: List of transcript segments
103
- - qa: Q&A data (optional)
104
- - url: Video URL
105
- - other metadata fields
106
- """
107
-
108
- self.json_payload = json_payload
109
-
110
- def lazy_load(self) -> Iterator[Document]:
111
- """
112
- Lazy loader that yields Document objects with concatenated transcripts.
113
-
114
- Creates one Document per video with all transcript sentences joined by newlines.
115
- Metadata includes all video fields except 'transcripts' and 'qa'.
116
- The 'url' field is renamed to 'source' for LangChain compatibility.
117
-
118
- Yields:
119
- Document: LangChain Document with page_content as concatenated transcript
120
- and metadata containing video information
121
- """
122
-
123
- for video in self.json_payload:
124
- metadata = dict(video)
125
- metadata.pop("transcripts", None)
126
- metadata.pop("qa", None)
127
- # Rename 'url' key to 'source' in metadata if it exists
128
- if "url" in metadata:
129
- metadata["source"] = metadata.pop("url")
130
- yield Document(
131
- page_content="\n".join(
132
- t["sent"] for t in video["transcripts"]
133
- ),
134
- metadata=metadata,
135
- )
136
-
137
-
138
- class VideoTranscriptChunkLoader(BaseLoader):
139
- """
140
- Loads video transcripts as individual chunk documents for fine-grained processing.
141
-
142
- Each transcript segment becomes a separate document with timing information.
143
- Useful for precise timestamp-based retrieval and time-sensitive queries.
144
-
145
- Inherits from LangChain's BaseLoader for compatibility with document processing chains.
146
-
147
- Attributes:
148
- json_payload (List[Dict]): List of video dictionaries containing transcript data
149
- """
150
-
151
- def __init__(self, json_payload: List[Dict]):
152
- """
153
- Initialize the chunk loader with video transcript data.
154
-
155
- Args:
156
- json_payload (List[Dict]): List of video dictionaries, each containing:
157
- - transcripts: List of transcript segments with timing
158
- - qa: Q&A data (optional)
159
- - url: Video URL
160
- - other metadata fields
161
- """
162
-
163
- self.json_payload = json_payload
164
-
165
- def lazy_load(self) -> Iterator[Document]:
166
- """
167
- Lazy loader that yields individual Document objects for each transcript segment.
168
-
169
- Creates one Document per transcript segment with timing metadata.
170
- Each document contains a single transcript sentence with precise start/end times.
171
- The 'url' field is renamed to 'source' for LangChain compatibility.
172
-
173
- Yields:
174
- Document: LangChain Document with page_content as single transcript sentence
175
- and metadata containing video info plus time_start and time_end
176
- """
177
-
178
- for video in self.json_payload:
179
- metadata = dict(video)
180
- transcripts = metadata.pop("transcripts", None)
181
- metadata.pop("qa", None)
182
- # Rename 'url' key to 'source' in metadata if it exists
183
- if "url" in metadata:
184
- metadata["source"] = metadata.pop("url")
185
- for transcript in transcripts:
186
- yield Document(
187
- page_content=transcript["sent"],
188
- metadata=metadata
189
- | {
190
- "time_start": transcript["begin"],
191
- "time_end": transcript["end"],
192
- },
193
- )