feat: add docstring to EndpointHandler.__call__ ; when multiple inputs are sent, the output now also contains a token_list k/v pair for easier human inspection
Browse files- embed_two_chunks.sh +1 -1
- handler.py +31 -10
- test_endpoint.py +11 -4
embed_two_chunks.sh
CHANGED
|
@@ -5,5 +5,5 @@ curl \
|
|
| 5 |
--request POST \
|
| 6 |
--url http://localhost:4999 \
|
| 7 |
--header 'Content-Type: application/json' \
|
| 8 |
-
--data '{"inputs": ["Please embed me", "
|
| 9 |
-w "\n"
|
|
|
|
| 5 |
--request POST \
|
| 6 |
--url http://localhost:4999 \
|
| 7 |
--header 'Content-Type: application/json' \
|
| 8 |
+
--data '{"inputs": ["Please embed me", "En en en mij ook, alsjeblieft !!!"]}' \
|
| 9 |
-w "\n"
|
handler.py
CHANGED
|
@@ -7,6 +7,7 @@ import logging
|
|
| 7 |
|
| 8 |
logger = logging.getLogger(__name__)
|
| 9 |
|
|
|
|
| 10 |
MODEL = "fdurant/colbert-xm-for-inference-api"
|
| 11 |
|
| 12 |
class EndpointHandler():
|
|
@@ -18,11 +19,25 @@ class EndpointHandler():
|
|
| 18 |
nbits=2, # The number bits that each dimension encodes to.
|
| 19 |
kmeans_niters=4, # Number of iterations for k-means clustering during quantization.
|
| 20 |
nranks=-1, # Number of ranks (processors) to use for distributed computing; -1 uses all available CPUs/GPUs.
|
| 21 |
-
checkpoint=MODEL,
|
| 22 |
)
|
| 23 |
self._checkpoint = Checkpoint(self._config.checkpoint, colbert_config=self._config, verbose=3)
|
| 24 |
|
| 25 |
def __call__(self, data: Any) -> List[Dict[str, Any]]:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 26 |
inputs = data["inputs"]
|
| 27 |
texts = []
|
| 28 |
if isinstance(inputs, str):
|
|
@@ -35,31 +50,37 @@ class EndpointHandler():
|
|
| 35 |
|
| 36 |
if len(texts) == 1:
|
| 37 |
# It's a query
|
| 38 |
-
logger.
|
| 39 |
embedding = self._checkpoint.queryFromText(
|
| 40 |
queries=texts,
|
| 41 |
full_length_search=False, # Indicates whether to encode the query for a full-length search.
|
| 42 |
)
|
| 43 |
-
logger.
|
| 44 |
return [
|
| 45 |
{"input": inputs, "query_embedding": embedding.tolist()[0]}
|
| 46 |
]
|
| 47 |
elif len(texts) > 1:
|
| 48 |
# It's a batch of chunks
|
| 49 |
logger.info(f"Batch of chunks: {texts}")
|
| 50 |
-
embeddings,
|
| 51 |
docs=texts,
|
| 52 |
bsize=self._config.bsize, # Batch size
|
| 53 |
keep_dims=True, # Do NOT flatten the embeddings
|
| 54 |
return_tokens=True, # Return the tokens as well
|
| 55 |
)
|
| 56 |
-
|
| 57 |
-
|
| 58 |
-
logger.
|
| 59 |
-
logger.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 60 |
return [
|
| 61 |
-
{"input": _input, "chunk_embedding": embedding.tolist(), "
|
| 62 |
-
for _input, embedding,
|
| 63 |
]
|
| 64 |
else:
|
| 65 |
raise ValueError("No data to process")
|
|
|
|
| 7 |
|
| 8 |
logger = logging.getLogger(__name__)
|
| 9 |
|
| 10 |
+
# Hardcoded, I know
|
| 11 |
MODEL = "fdurant/colbert-xm-for-inference-api"
|
| 12 |
|
| 13 |
class EndpointHandler():
|
|
|
|
| 19 |
nbits=2, # The number bits that each dimension encodes to.
|
| 20 |
kmeans_niters=4, # Number of iterations for k-means clustering during quantization.
|
| 21 |
nranks=-1, # Number of ranks (processors) to use for distributed computing; -1 uses all available CPUs/GPUs.
|
| 22 |
+
checkpoint=MODEL, # Path to the model checkpoint.
|
| 23 |
)
|
| 24 |
self._checkpoint = Checkpoint(self._config.checkpoint, colbert_config=self._config, verbose=3)
|
| 25 |
|
| 26 |
def __call__(self, data: Any) -> List[Dict[str, Any]]:
|
| 27 |
+
"""
|
| 28 |
+
data args:
|
| 29 |
+
inputs (:obj: `str`)
|
| 30 |
+
Return:
|
| 31 |
+
A :obj:`list` : will be serialized and returned.
|
| 32 |
+
When the input is a single query string, the returned list will contain a single dictionary with:
|
| 33 |
+
- input (:obj: `str`) : The input query.
|
| 34 |
+
- query_embedding (:obj: `list`) : The query embedding of shape (1, 32, 128).
|
| 35 |
+
When the input is a batch (= list) of chunk strings, the returned list will contain a dictionary for each chunk:
|
| 36 |
+
- input (:obj: `str`) : The input chunk.
|
| 37 |
+
- chunk_embedding (:obj: `list`) : The chunk embedding of shape (1, num_tokens, 128)
|
| 38 |
+
- token_ids (:obj: `list`) : The token ids.
|
| 39 |
+
- token_list (:obj: `list`) : The token list.
|
| 40 |
+
"""
|
| 41 |
inputs = data["inputs"]
|
| 42 |
texts = []
|
| 43 |
if isinstance(inputs, str):
|
|
|
|
| 50 |
|
| 51 |
if len(texts) == 1:
|
| 52 |
# It's a query
|
| 53 |
+
logger.debug(f"Query: {texts}")
|
| 54 |
embedding = self._checkpoint.queryFromText(
|
| 55 |
queries=texts,
|
| 56 |
full_length_search=False, # Indicates whether to encode the query for a full-length search.
|
| 57 |
)
|
| 58 |
+
logger.debug(f"Query embedding shape: {embedding.shape}")
|
| 59 |
return [
|
| 60 |
{"input": inputs, "query_embedding": embedding.tolist()[0]}
|
| 61 |
]
|
| 62 |
elif len(texts) > 1:
|
| 63 |
# It's a batch of chunks
|
| 64 |
logger.info(f"Batch of chunks: {texts}")
|
| 65 |
+
embeddings, token_id_lists = self._checkpoint.docFromText(
|
| 66 |
docs=texts,
|
| 67 |
bsize=self._config.bsize, # Batch size
|
| 68 |
keep_dims=True, # Do NOT flatten the embeddings
|
| 69 |
return_tokens=True, # Return the tokens as well
|
| 70 |
)
|
| 71 |
+
token_lists = []
|
| 72 |
+
for text, embedding, token_ids in zip(texts, embeddings, token_id_lists):
|
| 73 |
+
logger.debug(f"Chunk: {text}")
|
| 74 |
+
logger.debug(f"Chunk embedding shape: {embedding.shape}")
|
| 75 |
+
logger.debug(f"Chunk token ids: {token_ids}")
|
| 76 |
+
token_list = self._checkpoint.doc_tokenizer.tok.convert_ids_to_tokens(token_ids)
|
| 77 |
+
token_lists.append(token_list)
|
| 78 |
+
logger.debug(f"Chunk tokens: {token_list}")
|
| 79 |
+
# reconstructed_text = self._checkpoint.doc_tokenizer.tok.decode(token_count)
|
| 80 |
+
# logger.debug(f"Reconstructed text with special tokens: {reconstructed_text}")
|
| 81 |
return [
|
| 82 |
+
{"input": _input, "chunk_embedding": embedding.tolist(), "token_ids": token_ids.tolist(), "token_list": token_list}
|
| 83 |
+
for _input, embedding, token_ids, token_list in zip(texts, embeddings, token_id_lists, token_lists)
|
| 84 |
]
|
| 85 |
else:
|
| 86 |
raise ValueError("No data to process")
|
test_endpoint.py
CHANGED
|
@@ -40,7 +40,8 @@ def test_query_returns_expected_result():
|
|
| 40 |
|
| 41 |
def test_batch_returns_expected_result():
|
| 42 |
chunks = ["try me", "try me again and again and again"]
|
| 43 |
-
|
|
|
|
| 44 |
payload = {"inputs": chunks}
|
| 45 |
|
| 46 |
response = requests.request("POST", URL, json=payload, headers=HEADERS)
|
|
@@ -56,12 +57,18 @@ def test_batch_returns_expected_result():
|
|
| 56 |
|
| 57 |
# Check chunk embedding (actually a list of embeddings, one per token in the chunk)
|
| 58 |
chunk_embedding = response_chunk.get("chunk_embedding")
|
| 59 |
-
|
| 60 |
assert isinstance(chunk_embedding, list)
|
| 61 |
-
assert len(chunk_embedding) == len(
|
| 62 |
-
assert len(
|
|
|
|
| 63 |
|
| 64 |
# Check first of the token embeddings
|
| 65 |
first_token_embedding = chunk_embedding[0]
|
| 66 |
assert len(first_token_embedding) == 128
|
| 67 |
assert all(isinstance(value, float) for value in first_token_embedding)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 40 |
|
| 41 |
def test_batch_returns_expected_result():
|
| 42 |
chunks = ["try me", "try me again and again and again"]
|
| 43 |
+
length_of_longest_chunk = 11 # Including special tokens and padding
|
| 44 |
+
doc_maxlen=512
|
| 45 |
payload = {"inputs": chunks}
|
| 46 |
|
| 47 |
response = requests.request("POST", URL, json=payload, headers=HEADERS)
|
|
|
|
| 57 |
|
| 58 |
# Check chunk embedding (actually a list of embeddings, one per token in the chunk)
|
| 59 |
chunk_embedding = response_chunk.get("chunk_embedding")
|
| 60 |
+
token_ids = response_chunk.get("token_ids")
|
| 61 |
assert isinstance(chunk_embedding, list)
|
| 62 |
+
assert len(chunk_embedding) == len(token_ids)
|
| 63 |
+
assert len(token_ids) == length_of_longest_chunk
|
| 64 |
+
assert len(token_ids) <= doc_maxlen
|
| 65 |
|
| 66 |
# Check first of the token embeddings
|
| 67 |
first_token_embedding = chunk_embedding[0]
|
| 68 |
assert len(first_token_embedding) == 128
|
| 69 |
assert all(isinstance(value, float) for value in first_token_embedding)
|
| 70 |
+
|
| 71 |
+
# Check token list
|
| 72 |
+
token_list = response_chunk.get("token_list")
|
| 73 |
+
assert len(token_ids) == len(token_list)
|
| 74 |
+
assert all(isinstance(token, str) for token in token_list)
|