-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkshop.qmd
456 lines (360 loc) · 12.5 KB
/
workshop.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
---
jupyter: python3
---
# 🚀 LLMOps for Production RAG
<a target="_blank" href="https://colab.research.google.com/github/unionai-oss/llmops-production-rag/blob/main/workshop.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>
Welcome to the LLMOps for Production RAG workshop! In this workshop, we will cover:
1. Creating a baseline RAG pipeline
2. Bootstrapping an evaluation dataset
3. RAG Hyperparameter Optimization
## 📦 Install Dependencies
```{python}
%pip install gradio
try:
import google.colab
IN_COLAB = True
except ImportError:
IN_COLAB = False
if IN_COLAB:
!git clone https://github.com/unionai-oss/llmops-production-rag.git
%cd llmops-production-rag
%pip install -r requirements.lock.txt
```
While dependencies are being installed, create an account on Union Serverless:
👉 https://signup.union.ai/
Go to the Union Serverless dashboard to make sure you can check out the UI:
👉 https://serverless.union.ai/
Then, login to Union on this notebook session:
```{python}
%cd /content/llmops-production-rag
!union create login --auth device-flow --serverless
```
## 🔑 Create OpenAI API Key Secret on Union
First go to https://platform.openai.com/account/api-keys and create an OpenAI API key.
Then, run the following command to make the secret accessible on Union:
```{python}
!union create secret openai_api_key
```
```{python}
!union get secret
```
If you have issues with the secret, you can delete it by uncommenting the code cell below:
```{python}
#!union delete secret openai_api_key
```
## 🗂️ Creating a Baseline RAG Pipeline
Create the vector store:
```{python}
%%writefile vector_store.py
import flytekit as fl
from typing import Optional
from flytekit.types.directory import FlyteDirectory
from llmops_rag.vector_store import create_knowledge_base, chunk_and_embed_documents
@fl.workflow
def create_vector_store(
root_url_tags_mapping: Optional[dict] = None,
splitter: str = "character",
chunk_size: int = 2048,
limit: Optional[int | float] = None,
embedding_model: Optional[str] = "text-embedding-ada-002",
exclude_patterns: Optional[list[str]] = None,
) -> FlyteDirectory:
"""
Workflow for creating the vector store knowledge base.
"""
docs = create_knowledge_base(
root_url_tags_mapping=root_url_tags_mapping,
limit=limit,
exclude_patterns=exclude_patterns,
)
vector_store = chunk_and_embed_documents(
documents=docs,
splitter=splitter,
chunk_size=chunk_size,
embedding_model=embedding_model,
)
return vector_store
```
```{python}
!union run --remote vector_store.py create_vector_store --limit 10
```
⚠️ Note: The above command will take a few minutes to complete since we're building our container for the first time.
Then implement a basic RAG pipeline:
```{python}
%%writefile rag_basic.py
import flytekit as fl
from typing import Optional
from flytekit.types.directory import FlyteDirectory
from llmops_rag.vector_store import VectorStore
from llmops_rag.rag_basic import retrieve, generate
@fl.workflow
def rag_basic(
questions: list[str],
vector_store: FlyteDirectory = VectorStore.query(), # 👈 this uses the vector store artifact by default
embedding_model: str = "text-embedding-ada-002",
generation_model: str = "gpt-4o-mini",
search_type: str = "similarity",
rerank: bool = False,
num_retrieved_docs: int = 20,
num_docs_final: int = 5,
prompt_template: Optional[str] = None,
) -> list[str]:
contexts = retrieve(
questions=questions,
vector_store=vector_store,
embedding_model=embedding_model,
search_type=search_type,
rerank=rerank,
num_retrieved_docs=num_retrieved_docs,
num_docs_final=num_docs_final,
)
return generate(
questions=questions,
contexts=contexts,
generation_model=generation_model,
prompt_template=prompt_template,
)
```
Then run it with `union run`:
```{python}
!union run --remote rag_basic.py rag_basic --questions '["How do I read and write a pandas dataframe to csv format?"]'
```
You can also run the pipeline with an Ollama server:
```{python}
!union run --remote llmops_rag/rag_basic.py rag_basic_ollama --questions '["How do I read and write a pandas dataframe to csv format?"]'
```
### ✨ Maintaining a Fresh Vector Store
Let's use launch plan schedules to maintain a fresh vector store.
```{python}
%%writefile vector_store_launchplan.py
import flytekit as fl
from datetime import timedelta
from vector_store import create_vector_store
schedule_vector_store_lp = fl.LaunchPlan.get_or_create(
name="schedule_vector_store_lp",
workflow=create_vector_store,
default_inputs={
"limit": 10,
},
schedule=fl.FixedRate(
duration=timedelta(minutes=3)
)
)
```
```{python}
!union register vector_store_launchplan.py
!union launchplan schedule_vector_store_lp --activate
```
Go to the Serverless dashboard to see the schedule in action. Then you can deactivate it with:
```{python}
!union launchplan schedule_vector_store_lp --deactivate
```
### 💻 Run RAG pipeline with Gradio App
```{python}
import gradio as gr
from app import bot, add_message
with gr.Blocks() as demo:
chatbot = gr.Chatbot(elem_id="chatbot", bubble_full_width=False, type="messages")
chat_input = gr.Textbox(
interactive=True,
placeholder="How do I write a dataframe to csv?",
show_label=False,
)
chat_msg = chat_input.submit(
add_message, [chatbot, chat_input], [chatbot, chat_input]
)
bot_msg = chat_msg.then(bot, chatbot, chatbot, api_name="bot_response")
bot_msg.then(lambda: gr.Textbox(interactive=True), None, [chat_input])
demo.launch(debug=True)
```
## 🥾 Bootstrapping an Evaluation Dataset
Then generate a question and answer dataset. This will use the raw knowledge base we created
in the previous step.
```{python}
%%writefile create_qa_dataset.py
import functools
from typing import Annotated
import flytekit as fl
from flytekit.types.file import FlyteFile
from llmops_rag.create_qa_dataset import generate_qa_datapoints, create_dataset, QuestionAndAnswerDataset
from llmops_rag.document import CustomDocument
from llmops_rag.vector_store import KnowledgeBase
@fl.workflow
def create_qa_dataset(
documents: list[CustomDocument] = KnowledgeBase.query(),
n_questions_per_doc: int = 1,
n_answers_per_question: int = 5,
) -> Annotated[FlyteFile, QuestionAndAnswerDataset]:
partial_task = functools.partial(
generate_qa_datapoints,
n_questions_per_doc=n_questions_per_doc,
n_answers_per_question=n_answers_per_question,
)
questions_and_answers = fl.map_task(partial_task)(flyte_doc=documents)
return create_dataset(questions_and_answers, n_answers_per_question)
```
```{python}
!union run --remote create_qa_dataset.py create_qa_dataset --n_questions_per_doc 5 --n_answers_per_question 5
```
Filter the dataset with an LLM critic:
```{python}
%%writefile create_llm_filtered_dataset.py
import flytekit as fl
from llmops_rag.create_llm_filtered_dataset import apply_llm_critic, filter_dataset, prepare_dataset
from llmops_rag.create_qa_dataset import QuestionAndAnswerDataset
from flytekit.types.file import FlyteFile
import pandas as pd
@fl.workflow
def create_llm_filtered_dataset(
dataset: FlyteFile = QuestionAndAnswerDataset.query(),
) -> pd.DataFrame:
scores = apply_llm_critic(dataset)
reference_answers = filter_dataset(dataset, scores)
return prepare_dataset(reference_answers)
```
```{python}
!union run --remote create_llm_filtered_dataset.py create_llm_filtered_dataset
```
## 📊 RAG Hyperparameter Optimization
Experiment with different embedding models:
```{python}
%%writefile optimize_rag.py
import flytekit as fl
from typing import Optional, Annotated
from llmops_rag.optimize_rag import (
GridSearchConfig,
prepare_hpo_configs,
prepare_questions,
gridsearch,
combine_answers,
evaluate,
report,
)
from llmops_rag.config import RAGConfig
from llmops_rag.create_llm_filtered_dataset import EvalDatasetArtifact
import pandas as pd
@fl.workflow
def optimize_rag(
gridsearch_config: GridSearchConfig,
root_url_tags_mapping: Optional[dict] = None,
exclude_patterns: Optional[list[str]] = None,
limit: Optional[int] = 10,
eval_dataset: Annotated[pd.DataFrame, EvalDatasetArtifact] = EvalDatasetArtifact.query(dataset_type="llm_filtered"),
eval_prompt_template: Optional[str] = None,
n_answers: int = 1,
) -> RAGConfig:
hpo_configs = prepare_hpo_configs(gridsearch_config)
questions = prepare_questions(eval_dataset, n_answers)
answers = gridsearch(questions, hpo_configs, root_url_tags_mapping, exclude_patterns, limit)
answers_dataset = combine_answers(answers, hpo_configs, questions)
best_config, evaluation, evalution_summary = evaluate(
answers_dataset, eval_prompt_template
)
report(evaluation, evalution_summary)
return best_config
```
```{python}
!union run --remote optimize_rag.py optimize_rag --gridsearch_config config/embedding_model_experiment.yaml
```
### 🧪 More experiments to run
Experiment with different chunksizes:
```{python}
!union run --remote optimize_rag.py optimize_rag --gridsearch_config config/chunksize_experiment.yaml
```
Experiment with different splitters:
```{python}
!union run --remote optimize_rag.py optimize_rag --gridsearch_config config/splitter_experiment.yaml
```
Experiment with reranking:
```{python}
!union run --remote optimize_rag.py optimize_rag --gridsearch_config config/reranking_experiment.yaml
```
Experiment with document retrieval:
```{python}
!union run --remote optimize_rag.py optimize_rag --gridsearch_config config/search_params_experiment.yaml
```
## 🔄 Putting it all together into a reactive pipeline
```{python}
%%writefile reactive_pipeline.py
from typing import Annotated, Optional
from datetime import timedelta
import flytekit as fl
import pandas as pd
from union.artifacts import OnArtifact
from llmops_rag.document import CustomDocument
from llmops_rag.create_qa_dataset import create_qa_dataset
from llmops_rag.create_llm_filtered_dataset import create_llm_filtered_dataset, EvalDatasetArtifact
from llmops_rag.optimize_rag import optimize_rag, GridSearchConfig
from llmops_rag.vector_store import create_knowledge_base, KnowledgeBase
@fl.workflow
def knowledge_base_workflow(
root_url_tags_mapping: Optional[dict] = None,
limit: Optional[int | float] = None,
exclude_patterns: Optional[list[str]] = None,
) -> list[CustomDocument]:
return create_knowledge_base(
root_url_tags_mapping=root_url_tags_mapping,
limit=limit,
exclude_patterns=exclude_patterns,
).with_overrides(cache=False)
@fl.workflow
def create_eval_dataset(
documents: list[CustomDocument],
n_questions_per_doc: int = 1,
n_answers_per_question: int = 5,
) -> pd.DataFrame:
qa_dataset = create_qa_dataset(
documents=documents,
n_questions_per_doc=n_questions_per_doc,
n_answers_per_question=n_answers_per_question,
)
return create_llm_filtered_dataset(dataset=qa_dataset)
knowledge_base_lp = fl.LaunchPlan.get_or_create(
knowledge_base_workflow,
name="knowledge_base_lp",
default_inputs={"limit": 10},
schedule=fl.FixedRate(duration=timedelta(minutes=3))
)
create_eval_dataset_lp = fl.LaunchPlan.get_or_create(
create_eval_dataset,
name="create_eval_dataset_lp",
trigger=OnArtifact(
trigger_on=KnowledgeBase,
inputs={"documents": KnowledgeBase.query()},
)
)
optimize_rag_lp = fl.LaunchPlan.get_or_create(
optimize_rag,
name="optimize_rag_lp",
default_inputs={
"gridsearch_config": GridSearchConfig(
embedding_model=[
"text-embedding-ada-002",
"text-embedding-3-small",
"text-embedding-3-large",
],
chunk_size=[256],
splitter=["recursive"],
),
},
trigger=OnArtifact(
trigger_on=EvalDatasetArtifact,
inputs={"eval_dataset": EvalDatasetArtifact.query()},
)
)
```
Register and activate the reactive pipeline:
```{python}
!union register reactive_pipeline.py
!union launchplan knowledge_base_lp --activate
!union launchplan create_eval_dataset_lp --activate
!union launchplan optimize_rag_lp --activate
```
Deactivate the reactive pipeline:
```{python}
!union launchplan knowledge_base_lp --deactivate
!union launchplan create_eval_dataset_lp --deactivate
!union launchplan optimize_rag_lp --deactivate
```