BA-Chatbot/model_service/embeddings/llama.py

182 lines
6.9 KiB
Python
Raw Permalink Normal View History

2023-11-15 14:28:48 +01:00
from transformers import (
LlamaForCausalLM,
LlamaTokenizer,
AutoTokenizer,
AutoModelForCausalLM,
pipeline,
)
import torch
from llama_cpp import Llama
import json
class LLama:
"""
Initializes the LLama model with options to load various transformer models.
Supports Hugging Face models, ggml models, and GPTQ models.
"""
def __init__(
self,
model_path_hf="",
model_path_ggml="",
model_path_gpqt_hf="",
lora_base="",
ggml=False,
output_hidden_states=True,
hf_model=False,
gptq_hf=False,
lora_path=None,
):
if hf_model:
self.tokenizer = AutoTokenizer.from_pretrained(model_path_hf)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(model_path_hf, output_hidden_states=output_hidden_states).to('cpu')
self.model.eval()
print(self.model)
# uncomment for half precision load
# self.model = self.model.half()
# self.model = self.model.cuda()
if gptq_hf:
self.gpqt_tokenizer = AutoTokenizer.from_pretrained(
model_path_gpqt_hf, use_fast=True
)
self.gpqt_model = AutoModelForCausalLM.from_pretrained(
model_path_gpqt_hf,
device_map="auto",
trust_remote_code=False,
revision="main",
)
if ggml:
self.ggml = Llama(
model_path=model_path_ggml,
lora_path=lora_path,
lora_base=lora_base,
n_ctx=5120,
n_parts=1,
f16_kv=3,
embedding=True,
)
def tokenize(self, text):
"""
Tokenizes the given text using the initialized tokenizer.
"""
tokens = self.tokenizer.tokenize(text)
return tokens
def get_embeddings_last_layer(self, text, operation="mean"):
"""
Retrieves embeddings from the last layer of the model for the given text.
Supports different pooling operations like mean, max, min, and sum.
"""
inputs = self.tokenizer(text, return_tensors="pt")
input_ids = inputs["input_ids"].to("cpu")
with torch.no_grad():
outputs = self.model(input_ids)
last_layer_hidden_states = outputs.hidden_states[0]
if operation == "mean":
pooled_embeddings = torch.mean(last_layer_hidden_states, dim=1).cpu().numpy()
elif operation == "max":
pooled_embeddings = torch.max(last_layer_hidden_states, dim=1).values.cpu().numpy()
elif operation == "min":
pooled_embeddings = torch.min(last_layer_hidden_states, dim=1).values.cpu().numpy()
elif operation == "sum":
pooled_embeddings = torch.sum(last_layer_hidden_states, dim=1).cpu().numpy()
else:
pooled_embeddings = last_layer_hidden_states.cpu().numpy()
return pooled_embeddings
# POOLING STRATEGY WEIGHTED MEAN POOLING
# t_input = self.tokenizer([text], padding=True, truncation=True, return_tensors="pt")
# with torch.no_grad():
# last_hidden_state = self.model(**t_input, output_hidden_states=True).hidden_states[-1]
# weights_for_non_padding = t_input.attention_mask * torch.arange(start=1, end=last_hidden_state.shape[1] + 1).unsqueeze(0)
# sum_embeddings = torch.sum(last_hidden_state * weights_for_non_padding.unsqueeze(-1), dim=1)
# print(sum_embeddings,'sum_embeddings')
# num_of_none_padding_tokens = torch.sum(weights_for_non_padding, dim=-1).unsqueeze(-1)
# print(num_of_none_padding_tokens,'num_of_none_padding_tokens')
# sentence_embeddings = sum_embeddings / num_of_none_padding_tokens
# # print(sentence_embeddings,'sentence_embeddings')
# return sentence_embeddings[0]
def get_embeddings(self, text, layer_num=-1):
"""
Retrieves embeddings from a specified layer (layer_num) of the model for the given text.
If layer_num is not specified, uses the last layer by default.
"""
inputs = self.tokenizer(text, return_tensors="pt")
with torch.no_grad():
outputs = self.model(**inputs)
hidden_states = outputs.hidden_states
layer_output = hidden_states[layer_num][0]
with open("hidden_layer.json", "w") as fp:
json.dump(list(hidden_states[layer_num].tolist()), fp)
sentence_embedding = torch.mean(layer_output, dim=0)
sentence_embedding = sentence_embedding.detach().cpu().numpy()
return sentence_embedding
def get_input_embeddings(self, text, operation):
"""
Retrieves input embeddings for the given text.
Supports different operations like mean, max, and min for pooling.
"""
inputs = self.tokenizer(text, return_tensors="pt")
input_ids = inputs["input_ids"]
with torch.no_grad():
input_embeddings = self.model.get_input_embeddings()
embeddings = input_embeddings(input_ids)
if operation == "mean":
return torch.mean(embeddings[0], 0).cpu().numpy()
elif operation =="max":
return torch.max(embeddings, dim=1)[0].cpu().numpy()
else:
return torch.min(embeddings, dim=1)[0].cpu().numpy()
def get_embeddings_ggml(self, text):
"""
Retrieves embeddings for the given text using the ggml model.
"""
return self.ggml.embed_text_llama(doc=text)
def generate_answer_ggml(self, text):
"""
Generates an answer to the given text using the ggml model.
"""
return self.ggml.create_completion(prompt=text)
def generate_answer(self, prompt):
"""
Generates a response to the given prompt using the initialized Hugging Face model.
"""
input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids.cuda()
output = self.model.generate(
inputs=input_ids,
temperature=0.7,
do_sample=True,
top_p=0.95,
top_k=40,
max_new_tokens=200,
)
return self.tokenizer.decode(output[0])
def generate_answer_gptq(self, prompt):
"""
Generates a response to the given prompt using the GPTQ model.
"""
input_ids = self.gpqt_tokenizer(prompt, return_tensors="pt").input_ids.cuda()
output = self.gpqt_model.generate(
inputs=input_ids,
temperature=0.7,
do_sample=True,
top_p=0.95,
top_k=40,
max_new_tokens=512,
)
return self.gpqt_tokenizer.decode(output[0])