# pylint: disable=ungrouped-imports from typing import List, Dict, Union, Optional, Any, Literal, Callable import logging from pathlib import Path from copy import deepcopy from requests.exceptions import HTTPError import numpy as np from tqdm import tqdm import pandas as pd from huggingface_hub import hf_hub_download from haystack.errors import HaystackError from haystack.schema import Document, FilterType from haystack.document_stores import BaseDocumentStore from haystack.telemetry import send_event from haystack.lazy_imports import LazyImport from haystack.nodes.retriever import DenseRetriever logger = logging.getLogger(__name__) with LazyImport(message="Run 'pip install farm-haystack[inference]'") as torch_and_transformers_import: import torch from haystack.modeling.utils import initialize_device_settings # pylint: disable=ungrouped-imports from transformers import AutoConfig import sys sys.path.append("../..") from api.embeddingsServiceCaller import EmbeddingServiceCaller _EMBEDDING_ENCODERS: Dict[str, Callable] = { "llama": {} } class LlamaRetriever(DenseRetriever): def __init__( self, model_format = "llama", document_store: Optional[BaseDocumentStore] = None, model_version: Optional[str] = None, use_gpu: bool = True, batch_size: int = 32, max_seq_len: int = 512, pooling_strategy: str = "reduce_mean", emb_extraction_layer: int = -1, top_k: int = 10, progress_bar: bool = True, devices: Optional[List[Union[str, "torch.device"]]] = None, use_auth_token: Optional[Union[str, bool]] = None, scale_score: bool = True, embed_meta_fields: Optional[List[str]] = None, api_key: Optional[str] = None, azure_api_version: str = "2022-12-01", azure_base_url: Optional[str] = None, azure_deployment_name: Optional[str] = None, api_base: str = "https://api.openai.com/v1", openai_organization: Optional[str] = None, ): """ :param document_store: An instance of DocumentStore from which to retrieve documents. :param embedding_model: Local path or name of model in Hugging Face's model hub such as ``'sentence-transformers/all-MiniLM-L6-v2'``. The embedding model could also potentially be an OpenAI model ["ada", "babbage", "davinci", "curie"] or a Cohere model ["small", "medium", "large"]. :param model_version: The version of model to use from the HuggingFace model hub. Can be tag name, branch name, or commit hash. :param use_gpu: Whether to use all available GPUs or the CPU. Falls back on CPU if no GPU is available. :param batch_size: Number of documents to encode at once. :param max_seq_len: Longest length of each document sequence. Maximum number of tokens for the document text. Longer ones will be cut down. :param model_format: Name of framework that was used for saving the model or model type. If no model_format is provided, it will be inferred automatically from the model configuration files. Options: - ``'farm'`` (will use `_DefaultEmbeddingEncoder` as embedding encoder) - ``'transformers'`` (will use `_DefaultEmbeddingEncoder` as embedding encoder) - ``'sentence_transformers'`` (will use `_SentenceTransformersEmbeddingEncoder` as embedding encoder) - ``'retribert'`` (will use `_RetribertEmbeddingEncoder` as embedding encoder) - ``'openai'``: (will use `_OpenAIEmbeddingEncoder` as embedding encoder) - ``'cohere'``: (will use `_CohereEmbeddingEncoder` as embedding encoder) :param pooling_strategy: Strategy for combining the embeddings from the model (for farm / transformers models only). Options: - ``'cls_token'`` (sentence vector) - ``'reduce_mean'`` (sentence vector) - ``'reduce_max'`` (sentence vector) - ``'per_token'`` (individual token vectors) :param emb_extraction_layer: Number of layer from which the embeddings shall be extracted (for farm / transformers models only). Default: -1 (very last layer). :param top_k: How many documents to return per query. :param progress_bar: If true displays progress bar during embedding. :param devices: List of torch devices (e.g. cuda, cpu, mps) to limit inference to specific devices. A list containing torch device objects and/or strings is supported (For example [torch.device('cuda:0'), "mps", "cuda:1"]). When specifying `use_gpu=False` the devices parameter is not used and a single cpu device is used for inference. Note: As multi-GPU training is currently not implemented for EmbeddingRetriever, training will only use the first device provided in this list. :param use_auth_token: The API token used to download private models from Huggingface. If this parameter is set to `True`, then the token generated when running `transformers-cli login` (stored in ~/.huggingface) will be used. Additional information can be found here https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]). If true (default) similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (e.g. cosine or dot_product) will be used. :param embed_meta_fields: Concatenate the provided meta fields and text passage / table to a text pair that is then used to create the embedding. This approach is also used in the TableTextRetriever paper and is likely to improve performance if your titles contain meaningful information for retrieval (topic, entities etc.). If no value is provided, a default empty list will be created. :param api_key: The OpenAI API key or the Cohere API key. Required if one wants to use OpenAI/Cohere embeddings. For more details see https://beta.openai.com/account/api-keys and https://dashboard.cohere.ai/api-keys :param azure_api_version: The version of the Azure OpenAI API to use. The default is `2022-12-01` version. :param azure_base_url: The base URL for the Azure OpenAI API. If not supplied, Azure OpenAI API will not be used. This parameter is an OpenAI Azure endpoint, usually in the form `https://.openai.azure.com' :param azure_deployment_name: The name of the Azure OpenAI API deployment. If not supplied, Azure OpenAI API will not be used. :param api_base: The OpenAI API base URL, defaults to `"https://api.openai.com/v1"`. :param openai_organization: The OpenAI-Organization ID, defaults to `None`. For more details, see OpenAI [documentation](https://platform.openai.com/docs/api-reference/requesting-organization). """ torch_and_transformers_import.check() if embed_meta_fields is None: embed_meta_fields = [] super().__init__() self.devices, _ = initialize_device_settings(devices=devices, use_cuda=use_gpu, multi_gpu=True) if batch_size < len(self.devices): logger.warning("Batch size is less than the number of devices.All gpus will not be utilized.") self.document_store = document_store self.model_version = model_version self.use_gpu = use_gpu self.batch_size = batch_size self.max_seq_len = max_seq_len self.pooling_strategy = pooling_strategy self.emb_extraction_layer = emb_extraction_layer self.top_k = top_k self.progress_bar = progress_bar self.use_auth_token = use_auth_token self.scale_score = scale_score self.api_key = api_key self.api_base = api_base self.api_version = azure_api_version self.azure_base_url = azure_base_url self.azure_deployment_name = azure_deployment_name self.openai_organization = openai_organization self.model_format= model_format self.emb_caller= EmbeddingServiceCaller() self.embed_meta_fields = embed_meta_fields def retrieve( self, query: str, filters: Optional[FilterType] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, scale_score: Optional[bool] = None, document_store: Optional[BaseDocumentStore] = None, ) -> List[Document]: """ Scan through the documents in a DocumentStore and return a small number of documents that are most relevant to the query. :param query: The query :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or (in case of `"$in"`) a list of values as value. If no logical operator is provided, `"$and"` is used as default operation. If no comparison operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default operation. __Example__: ```python filters = { "$and": { "type": {"$eq": "article"}, "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": {"$in": ["economy", "politics"]}, "publisher": {"$eq": "nytimes"} } } } # or simpler using default operators filters = { "type": "article", "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": ["economy", "politics"], "publisher": "nytimes" } } ``` To use the same logical operator multiple times on the same level, logical operators take optionally a list of dictionaries as value. __Example__: ```python filters = { "$or": [ { "$and": { "Type": "News Paper", "Date": { "$lt": "2019-01-01" } } }, { "$and": { "Type": "Blog Post", "Date": { "$gte": "2019-01-01" } } } ] } ``` :param top_k: How many documents to return per query. :param index: The name of the index in the DocumentStore from which to retrieve documents :param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication) :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]). If true similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (e.g. cosine or dot_product) will be used. :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead. """ document_store = document_store or self.document_store if document_store is None: raise ValueError( "This Retriever was not initialized with a Document Store. Provide one to the retrieve() method." ) if top_k is None: top_k = self.top_k if index is None: index = document_store.index if scale_score is None: scale_score = self.scale_score query_emb = self.embed_queries(queries=[query]) documents = document_store.query_by_embedding( query_emb=query_emb, filters=filters, top_k=top_k, index=index, headers=headers, scale_score=scale_score ) return documents def retrieve_batch( self, queries: List[str], filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None, top_k: Optional[int] = None, index: Optional[str] = None, headers: Optional[Dict[str, str]] = None, batch_size: Optional[int] = None, scale_score: Optional[bool] = None, document_store: Optional[BaseDocumentStore] = None, ) -> List[List[Document]]: """ Scan through the documents in a DocumentStore and return a small number of documents that are most relevant to the supplied queries. Returns a list of lists of Documents (one per query). :param queries: List of query strings. :param filters: Optional filters to narrow down the search space to documents whose metadata fulfill certain conditions. Can be a single filter that will be applied to each query or a list of filters (one filter per query). Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator (`"$and"`, `"$or"`, `"$not"`), a comparison operator (`"$eq"`, `"$in"`, `"$gt"`, `"$gte"`, `"$lt"`, `"$lte"`) or a metadata field name. Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or (in case of `"$in"`) a list of values as value. If no logical operator is provided, `"$and"` is used as default operation. If no comparison operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used as default operation. __Example__: ```python filters = { "$and": { "type": {"$eq": "article"}, "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": {"$in": ["economy", "politics"]}, "publisher": {"$eq": "nytimes"} } } } # or simpler using default operators filters = { "type": "article", "date": {"$gte": "2015-01-01", "$lt": "2021-01-01"}, "rating": {"$gte": 3}, "$or": { "genre": ["economy", "politics"], "publisher": "nytimes" } } ``` To use the same logical operator multiple times on the same level, logical operators take optionally a list of dictionaries as value. __Example__: ```python filters = { "$or": [ { "$and": { "Type": "News Paper", "Date": { "$lt": "2019-01-01" } } }, { "$and": { "Type": "Blog Post", "Date": { "$gte": "2019-01-01" } } } ] } ``` :param top_k: How many documents to return per query. :param index: The name of the index in the DocumentStore from which to retrieve documents :param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic API_KEY'} for basic authentication) :param batch_size: Number of queries to embed at a time. :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]). If true similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. Otherwise raw similarity scores (e.g. cosine or dot_product) will be used. :param document_store: the docstore to use for retrieval. If `None`, the one given in the `__init__` is used instead. """ document_store = document_store or self.document_store if document_store is None: raise ValueError( "This Retriever was not initialized with a Document Store. Provide one to the retrieve_batch() method." ) if top_k is None: top_k = self.top_k if batch_size is None: batch_size = self.batch_size if index is None: index = document_store.index if scale_score is None: scale_score = self.scale_score # embed_queries is already batched within by batch_size, so no need to batch the input here query_embs: np.ndarray = self.embed_queries(queries=queries) batched_query_embs: List[np.ndarray] = [] for i in range(0, len(query_embs), batch_size): batched_query_embs.extend(query_embs[i : i + batch_size]) documents = document_store.query_by_embedding_batch( query_embs=batched_query_embs, top_k=top_k, filters=filters, index=index, headers=headers, scale_score=scale_score, ) return documents def embed_queries(self, queries: List[str]) -> np.ndarray: if isinstance(queries, str): queries = [queries] assert isinstance(queries, list), "Expecting a list of texts, i.e. create_embeddings(texts=['text1',...])" return np.array(self.emb_caller.get_embeddings(queries[0], embedding_type="last_layer" )) def embed_documents(self, documents: List[Document]) -> np.ndarray: documents = self._preprocess_documents(documents) embeddings=[] for doc in documents: embeddings.append(self.emb_caller.get_embeddings(doc.content, embedding_type="last_layer")) return np.array(embeddings) def _preprocess_documents(self, docs: List[Document]) -> List[Document]: """ Turns table documents into text documents by representing the table in csv format. This allows us to use text embedding models for table retrieval. It also concatenates specified meta data fields with the text representations. :param docs: List of documents to linearize. If the document is not a table, it is returned as is. :return: List of documents with meta data + linearized tables or original documents if they are not tables. """ linearized_docs = [] for doc in docs: doc = deepcopy(doc) if doc.content_type == "table": if isinstance(doc.content, pd.DataFrame): doc.content = doc.content.to_csv(index=False) else: raise HaystackError("Documents of type 'table' need to have a pd.DataFrame as content field") # Gather all relevant metadata fields meta_data_fields = [] for key in self.embed_meta_fields: if key in doc.meta and doc.meta[key]: if isinstance(doc.meta[key], list): meta_data_fields.extend([item for item in doc.meta[key]]) else: meta_data_fields.append(doc.meta[key]) # Convert to type string (e.g. for ints or floats) meta_data_fields = [str(field) for field in meta_data_fields] doc.content = "\n".join(meta_data_fields + [doc.content]) linearized_docs.append(doc) return linearized_docs @staticmethod def _infer_model_format(model_name_or_path: str, use_auth_token: Optional[Union[str, bool]]) -> str: valid_openai_model_name = model_name_or_path in ["ada", "babbage", "davinci", "curie"] or any( m in model_name_or_path for m in ["-ada-", "-babbage-", "-davinci-", "-curie-"] ) if valid_openai_model_name: return "openai" if model_name_or_path in ["small", "medium", "large", "multilingual-22-12", "finance-sentiment"]: return "cohere" # Check if model name is a local directory with sentence transformers config file in it if Path(model_name_or_path).exists(): if Path(f"{model_name_or_path}/config_sentence_transformers.json").exists(): return "sentence_transformers" # Check if sentence transformers config file in model hub else: try: hf_hub_download( # type: ignore [call-arg] repo_id=model_name_or_path, filename="config_sentence_transformers.json", use_auth_token=use_auth_token, ) return "sentence_transformers" except HTTPError: pass # Check if retribert model config = AutoConfig.from_pretrained(model_name_or_path, use_auth_token=use_auth_token) if config.model_type == "retribert": return "retribert" # Model is neither sentence-transformers nor retribert model -> use _DefaultEmbeddingEncoder return "farm" def train( self, training_data: List[Dict[str, Any]], learning_rate: float = 2e-5, n_epochs: int = 1, num_warmup_steps: Optional[int] = None, batch_size: int = 16, train_loss: Literal["mnrl", "margin_mse"] = "mnrl", num_workers: int = 0, use_amp: bool = False, **kwargs, ) -> None: """ Trains/adapts the underlying embedding model. We only support the training of sentence-transformer embedding models. Each training data example is a dictionary with the following keys: * question: the question string * pos_doc: the positive document string * neg_doc: the negative document string * score: the score margin :param training_data: The training data in a dictionary format. :param learning_rate: The learning rate. :param n_epochs: The number of epochs that you want the train for. :param num_warmup_steps: Behavior depends on the scheduler. For WarmupLinear (default), the learning rate is increased from 0 up to the maximal learning rate. After these many training steps, the learning rate is decreased linearly back to zero. :param batch_size: The batch size to use for the training. The default values is 16. :param train_loss: The loss to use for training. If you're using a sentence-transformer embedding_model (which is the only model that training is supported for), possible values are 'mnrl' (Multiple Negatives Ranking Loss) or 'margin_mse' (MarginMSE). :param num_workers: The number of subprocesses to use for the Pytorch DataLoader. :param use_amp: Use Automatic Mixed Precision (AMP). :param kwargs: Additional training key word arguments to pass to the `SentenceTransformer.fit` function. Please reference the Sentence-Transformers [documentation](https://www.sbert.net/docs/training/overview.html#sentence_transformers.SentenceTransformer.fit) for a full list of keyword arguments. """ send_event(event_name="Training", event_properties={"class": self.__class__.__name__, "function_name": "train"})