Getting Started with FloTorch-core: Building Modular RAG Pipelines
%20(11).png)
FloTorch-core is a modular and extensible Python framework designed for building LLM-powered Retrieval-Augmented Generation (RAG) pipelines. It offers plug-and-play components for embeddings, chunking, retrieval, gateway-based LLM calls, and RAG evaluation.
In this blog, we'll explore how to get started with FloTorch-core, covering installation, core components, and practical code examples.
🚀 Installation
To install the latest version of FloTorch-core, use pip:
pip install FloTorch-core
For development dependencies:
pip install FloTorch-core[dev]
📁 Project Structure Overview
FloTorch-core is structured into modular components, each handling a specific part of the RAG pipeline:
- reader/: Handles input parsing from JSON or PDF files.
- chunking/: Responsible for splitting raw text into manageable chunks for downstream processing.
- embedding/: Integrates embedding models from Bedrock or SageMaker for vector representation of text.
- storage/: Interfaces with vector databases and storage backends such as OpenSearch, S3, and DynamoDB.
- rerank/: Provides mechanisms for reordering retrieved documents based on relevance.
- inferencer/: Connects to Bedrock or SageMaker-hosted LLMs to generate responses based on input queries and retrieved context.
- guardrails/: Supports policy enforcement and safety mechanisms during inference.
- evaluator/: Enables RAG pipeline evaluation using RAGAS metrics like faithfulness and context relevance.
🛠️ Prerequisites
Before proceeding, ensure you have the following:
- AWS Account with access to Amazon Bedrock
Make sure your AWS account has been granted access to Amazon Bedrock service. If you do not have access, request it through the Amazon Bedrock Access Request Form. - Model Access in Bedrock
Within Bedrock, you must have enabled access to the specific foundation model (e.g., Anthropic Claude, Amazon Titan, or AI21). Navigate to the Model Access page in the AWS Console and ensure the model you want to use is listed under "Granted Access".
🖥️ Where Can You Run FloTorch pipeline?
You can run FloTorch pipelines in the following environments:
1. Locally on your machine
To run locally, ensure:
- Python 3.9+ is installed.
AWS CLI is installed and configured with appropriate credentials.
aws configure
You’ll need to provide:
- AWS Access Key ID
- AWS Secret Access Key
- Default region
- Output format (e.g., json)
Make sure the configured user has access to Amazon Bedrock, SageMaker (if used), and other services you're invoking like S3, DynamoDB, OpenSearch.
2. AWS SageMaker Notebooks
If you're using SageMaker Studio or Notebook instances:
- Choose a kernel with Python 3.x (preferably Conda-based for better package isolation).
- Ensure the attached IAM role has the necessary permissions to access Bedrock, S3, and other services.
🧾 Provide Experiment Configuration
The `exp_config_data` dictionary below provides a configuration example, containing key parameters for executing a RAG pipeline with either Bedrock or SageMaker.
exp_config_data = {
"temp_retrieval_llm": '<Temprature>',# Provide temperate value
"ground_truth_data_path" : '<S3 bucket ground truth data path>',
"rerank_model_id": "<Rerank Model>",# Provide Rerank Model name
"retrieval_service":"<Retrieval Service>", # Either Bedrock or Sagemaker services to be provided.
"bedrock_knowledgebase_id":'<bedrock KB ID>', # Unique ID created after creating Bedrock knowledge base
"knn_num": '<K-Nearest Neighbours value>', # refers to the number of most similar vectors (neighbors) you want to retrieve
"retrieval_model": "<Retrieval Model>",# Provide Retrieval Model name
"aws_region": "<aws_region>", # Provide AWS Region
"n_shot_prompt_guide_obj": '<prompt>', # Provide prompt JSON. Here is sample prompt object
"n_shot_prompts": '<n shot Prompts value>'# Number of shots given for LLM model for inference
}
🧾 Reading Data
FloTorch-core provides readers to ingest data from various sources. Here is a sample code snippet of reading JSON data from an S3 bucket and loading. Sample ground truth JSON file
Question Chunking: Each question from the input JSON is transformed into a `Chunk` object using the `get_chunk()` method defined in the Question class. This conversion ensures compatibility with FloTorch's data structures.
from pydantic import BaseModel
from flotorch_core.chunking.chunking import Chunk
class Question(BaseModel):
question: str
answer: str
def get_chunk(self) -> Chunk:
return Chunk(data=self.question)
with open('ground_truth.json', 'r', encoding='utf-8') as file:
ground_truth_data = json.load(file)
qna_list = [Question(**item) for item in ground_truth_data]
🗃️ Vector Storage Options
FloTorch-core is compatible with various vector storage options, such as Bedrock Knowledge Base, Opensearch.
Bedrock Knowledge Base
To set up Bedrock Knowledge Base for vector storage with VectorStorageFactory, you can supply configuration values dynamically, as illustrated below.
Please follow this link to create Bedrock Knowledge base and this notebook to upload data with different chunking mechanisms.
from flotorch_core.storage.db.vector.vector_storage_factory import VectorStorageFactory
knowledge_base_bool = '<knowledge Base>' # Set to True if Opensearch else False
Bedrock_knowledge_base_bool = '<Bedrock knowledge base>' # Set to True if Bedrock knowledge base else False
embedding = '<embedding>' # If opensearch is used, provide embedding AWS model ID, else None
vector_storage = VectorStorageFactory.create_vector_storage(
knowledge_base_bool,
Bedrock_knowledge_base_bool,
embedding,
exp_config_data["bedrock_knowledgebase_id"],
exp_config_data["aws_region"]
)
🧠 Bedrock Reranker Integration
To enhance response relevance after document retrieval from vector storage, FloTorch-core offers reranking capabilities. For instance, the Bedrock Reranker can be employed for a secondary ranking of the initial results.
from flotorch_core.rerank.rerank import BedrockReranker
aws_region = exp_config_data["aws_region"]
rerank_model_id = exp_config_data["rerank_model_id "]
reranker = BedrockReranker(aws_region, rerank_model_id) \
if rerank_model_id.lower() != "none" \
else None
🧬Inferencer Options
FloTorch-core enables response generation from LLMs using various inferencer backends. For instance, the Bedrock Inferencer can be used and configured through environment variables or a configuration file.
from flotorch_core.inferencer.inferencer_provider_factory import InferencerProviderFactory
execution_role_arn = '<Execution Role Arn>' # Provide this role only if sagemaker service is provided to create endpoints else None
temperature = '<temperature_value>' # Provide temperate value
def initialize_inferencer(inference_model):
inferencer = InferencerProviderFactory.create_inferencer_provider(
False,"","",
exp_config_data.get("retrieval_service"),
inference_model,
exp_config_data.get("aws_region"),
execution_role_arn,
exp_config_data.get("n_shot_prompts"),
temperature,
exp_config_data.get("n_shot_prompt_guide_obj")
)
return inferencer
RAG with Flotorch Utility
FloTorch-core enables response generation from LLMs using various inferencer backends. For instance, the Bedrock Inferencer can be used and configured through environment variables or a configuration file.
Steps involved:
- Initialization: The utility takes as input a configuration for the experiment, a vector storage instance, an optional reranker, an inferencer (LLM), and a set of question-answer pairs for evaluation.
- Vector Retrieval: The provided vector storage is queried using each question's embedding to perform a k-Nearest Neighbors (KNN) search. This retrieves relevant documents from the underlying vector database.
- Context Reranking (Optional): If a reranking model (such as Bedrock Reranker) is supplied, the retrieved documents are passed through it. This step aims to refine the relevance of the retrieved context for more accurate answer generation.
- Answer Generation: The inferencer, which is a Large Language Model (LLM), processes the original question along with the retrieved (and potentially reranked) context documents to generate a final answer.
- Metadata Logging: For each question, the utility collects and stores valuable information. This includes metadata about the inference process, the generated answer, the expected (ground truth) answer, the original question, and the documents retrieved as context. This data is crucial for evaluating the performance of the RAG pipeline.
- Iterative Processing: The steps above (2-6) are repeated for every question provided in the input. Finally, all the generated responses and associated metadata are compiled into a comprehensive list of results.
def rag_with_flotorch(exp_config_data, vector_storage, reranker, inferencer, qna_list: list[Question]):
responses_list = []
for question in qna_list:
inference_response = {}
question_chunk = question.get_chunk()
response = vector_storage.search(question_chunk,
int(exp_config_data.get("knn_num")))
vector_response = response.to_json()['result']
if reranker:
vector_response =
reranker.rerank_documents(question_chunk.data, vector_response)
metadata, answer = inferencer.generate_text(question.question, vector_response)
inference_response["metadata"] = metadata
inference_response["generated_answer"] = answer
inference_response["expected_answer"] = question.answer
inference_response["question"] = question.question
inference_response["retrieved_contexts"] = vector_response
responses_list.append(inference_response)
return responses_list
🔁 Executing the RAG Workflow for Multiple Inference Models
Let’s now run the RAG pipeline for the following models:
- us.amazon.nova-lite-v1:0
- us.amazon.nova-micro-v1:0
- us.anthropic.claude-3-5-haiku-20241022-v1:0
- us.anthropic.claude-3-5-sonnet-20241022-v2:0
inference_models = [
"us.amazon.nova-lite-v1:0",
"us.amazon.nova-micro-v1:0",
"us.anthropic.claude-3-5-haiku-20241022-v1:0",
"us.anthropic.claude-3-5-sonnet-20241022-v2:0"
]
Each model will go through the same setup steps involving Vector Storage, Reranker, and Inferencer, and their outputs will be collected for comparison.
📊 The responses for each model will be stored in a dictionary named for further analysis.
rag_inference_response_dict = {}
for inference_model in inference_models:
inferencer = initialize_inferencer(inference_model)
responses = rag_with_flotorch(exp_config_data,
vector_storage, reranker, inferencer, qna_list)
rag_inference_response_dict[inference_model] = responses
Here are the results JSON file which ran on the above attached ground truth JSON file.
Evaluating Multiple Models with FloTorch using Ragas
🧠 Ragas Introduction
Ragas provides a powerful evaluation framework for RAG pipelines—and internally utilizes LLMs to assess quality metrics like faithfulness, answer relevance, and context precision.
🧾 Sample Evaluation Configuration JSON
The `evaluation_config_data` dictionary holds the settings necessary for configuring and assessing the embedding and retrieval pipeline for evaluation. These settings are crucial for testing various embedding models, retrieval methods.
evaluation_config_data = {
"eval_embedding_model" : '<Evaluation Embedding Model>',# Provide Embedding Model name
"eval_retrieval_model" : '<Evaluation Retrieval Model>',# Provide Retrieval Model name
"eval_retrieval_service" :"<Retrieval Service>" ,# Either Bedrock or Sagemaker services to be provided.
"aws_region" :'<AWS Region>' ,# Provide AWS Region
"eval_embed_vector_dimension" : '<Evaluation Embededding Vector Dimension>'# Provide Evaluation Embededding Vector Dimension
}
🧠 Embedding Model Initialization
FloTorch-core utilizes an `embedding_registry` for the dynamic selection and initialization of embedding models as per the specified configuration. This design facilitates the effortless interchangeability of different embedding models, streamlining the evaluation process without necessitating alterations to the fundamental pipeline structure.
from flotorch_core.embedding.embedding_registry import embedding_registry
# If you are using Titan Embedding you need to import this
from flotorch_core.embedding.titanv2_embedding import TitanV2Embedding
# If you are using Cohere Embedding you need to import this
from flotorch_core.embedding.cohere_embedding import CohereEmbedding
embedding_class = embedding_registry.get_model(evaluation_config_data.get("eval_embedding_model"))
embedding = embedding_class(evaluation_config_data.get("eval_embedding_model"),
evaluation_config_data.get("aws_region"),
int(evaluation_config_data.get("eval_embed_vector_dimension"))
)
🤖 Inferencers
FloTorch offers a consistent way to set up an LLM-based inferencer, leveraging either Amazon Bedrock or SageMaker. This allows for adaptable deployment of diverse foundation models to conduct inference on retrieved documents.
from flotorch_core.inferencer.inferencer_provider_factory import InferencerProviderFactory
execution_role_arn = '<Execution Role Arn>' # Provide this role only if sagemaker service is provided to create endpoints else None
inferencer = InferencerProviderFactory.create_inferencer_provider(
False,"","",
evaluation_config_data.get("eval_retrieval_service"),
evaluation_config_data.get("eval_retrieval_model"),
evaluation_config_data.get("aws_region"),
execution_role_arn ,
float(0.1)# Temperature setting for generation
)
📊 Initialize RAG Evaluator
FloTorch integrates with Ragas, allowing the use of the RagasEvaluator to assess RAG pipeline performance. This utility applies standard metrics like Faithfulness, Answer Relevance, and Context Precision to evaluate retrieved documents and generated responses.
from flotorch_core.evaluator.ragas_evaluator import RagasEvaluator
evaluator = RagasEvaluator(inferencer, embedding)
✅ Evaluate RAG Performance
After setting up the evaluator, RAG evaluation is performed on each model in the dataset using the RagasEvaluator. This process calculates crucial performance metrics and structures them for subsequent analysis.
for model in rag_inference_response_dict:
ragas_report = evaluator.evaluate(rag_inference_response_dict[model])
if ragas_report:
eval_metrics = ragas_report._repr_dict
eval_metrics = {key: round(value, 2) if isinstance(value, float) else value for key, value in eval_metrics.items()}
final_evaluation[model] = {
'llm_context_precision_with_reference': eval_metrics['llm_context_precision_with_reference'],
'faithfulness': eval_metrics['faithfulness'],
'answer_relevancy': eval_metrics['answer_relevancy']
}
Here are the Evaluation final results JSON file evaluated with ‘us.amazon.nova-pro-v1:0’ inference model and ‘amazon.titan-embed-text-v2:0’ embedding model.
✅ Plotting RAG Evaluation Metrics
To visualize the metrics from the final evaluation using the plot_grouped_bar function, you can first convert the JSON into a DataFrame, then select the desired metrics to plot. Here's the complete code that does that and produces a grouped bar chart:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
# Convert the nested dictionary to a DataFrame
evaluation_df = pd.DataFrame.from_dict(final_evaluation, orient='index')
# If you want the kb_type as a column instead of an index
evaluation_df = evaluation_df.reset_index().rename(columns={'index': 'model'})
def plot_grouped_bar(df, group_by_index, columns_to_plot, show_values=False, title='Grouped Bar Chart', xlabel='Group', ylabel='Value'):
unique_groups = df[group_by_index].unique()
num_groups = len(unique_groups)
num_cols = len(columns_to_plot)
bar_width = 0.8 / num_cols # Adjust width based on the number of columns
fig, ax = plt.subplots(figsize=(10, 6))
x = np.arange(num_groups) # Positions for the groups
for i, col in enumerate(columns_to_plot):
values = df.groupby(group_by_index)[col].mean().values
positions = x + (i * bar_width) - (0.4 - (bar_width / 2)) # Center the groups
rects = ax.bar(positions, values, bar_width, label=col)
if show_values:
for rect in rects:
height = rect.get_height()
ax.annotate(f'{height:.2f}',
xy=(rect.get_x() + rect.get_width() / 2, height),
xytext=(0, 3),
textcoords="offset points",
ha='center', va='bottom')
ax.set_ylabel(ylabel)
ax.set_xlabel(xlabel)
ax.set_title(title)
ax.set_xticks(x)
ax.set_xticklabels(unique_groups)
plt.xticks(rotation=45)
ax.legend(title='Columns')
fig.tight_layout()
plt.show()
from matplotlab_utils import plot_grouped_bar
plot_grouped_bar(evaluation_df, 'model', ['llm_context_precision_with_reference', 'faithfulness', 'answer_relevancy'], show_values=True, title='Evaluation Metrics', xlabel='KB Type', ylabel='Metrics')
📊 Plot on multiple models with Ragas metrics
Here is the plot of showing multiple models on X-axis and evaluation metrics on Y-axis.

🌐 Additional Resources
- FloTorch: flotorch.ai
- PyPI Package: FloTorch-core
- GitHub Repository: FissionAI/FloTorch
- Flotorch Notebooks: Jupyter Notebooks
📝 Conclusion
FloTorch-core offers a modular approach to building and evaluating RAG pipelines with LLMs. By leveraging its components for data ingestion, embedding, vector storage, inferencing, and evaluation, developers can construct robust and scalable AI solutions.
Let's Talk
Recent Posts
%20(4).png)
%20(1).png)