-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdoc_utils.py
76 lines (64 loc) · 2.27 KB
/
doc_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import json
import logging
import os
from functools import lru_cache
# UTILS
# Set up logging
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def search_documents_by_file_name(index,
query_embedding_tuple,
file_name,
top_k=5,
include_metadata=True):
file_name_filter = None
if file_name:
file_name_filter = {"file_name": {"$eq": file_name}}
response = query_pinecone(
index,
query_embedding_tuple,
top_k=top_k,
filter_dict=file_name_filter,
include_metadata=include_metadata,
)
logger.info(f"search_documents_by_file_name response ready: {response}")
return response
def query_pinecone(index,
query_embedding_tuple,
top_k=5,
filter_dict=None,
include_metadata=True):
logger.info(f"Query pinecone filter_dict: {filter_dict}")
# Convert the tuple back to a list
query_embedding = list(query_embedding_tuple)
response = index.query(
query_embedding,
top_k=top_k,
filter=filter_dict,
include_metadata=include_metadata,
)
return response
def update_filenames_json(file_name: str, file_unique_id: str):
file_path = "filenames.json"
# Check if the file exists and is not empty
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
# Read the existing content
with open(file_path, "r") as f:
try:
filenames = json.load(f)
except json.JSONDecodeError:
# If there is a decoding error, initialize an empty dictionary
filenames = {}
else:
# If the file doesn't exist or is empty, initialize an empty dictionary
filenames = {}
# Update the dictionary with the new file name and unique ID
filenames[file_name] = file_unique_id
# Save the updated dictionary back to the file
with open(file_path, "w") as f:
json.dump(filenames, f)
# Fetch 10k
def fetchTopK(index):
response = index.query(vector=[0] * 1536, top_k=10000, include_values=True)
return response