182 lines
6.9 KiB
Python
182 lines
6.9 KiB
Python
|
from transformers import (
|
||
|
LlamaForCausalLM,
|
||
|
LlamaTokenizer,
|
||
|
AutoTokenizer,
|
||
|
AutoModelForCausalLM,
|
||
|
pipeline,
|
||
|
)
|
||
|
import torch
|
||
|
from llama_cpp import Llama
|
||
|
import json
|
||
|
|
||
|
class LLama:
|
||
|
"""
|
||
|
Initializes the LLama model with options to load various transformer models.
|
||
|
Supports Hugging Face models, ggml models, and GPTQ models.
|
||
|
"""
|
||
|
def __init__(
|
||
|
self,
|
||
|
model_path_hf="",
|
||
|
model_path_ggml="",
|
||
|
model_path_gpqt_hf="",
|
||
|
lora_base="",
|
||
|
ggml=False,
|
||
|
output_hidden_states=True,
|
||
|
hf_model=False,
|
||
|
gptq_hf=False,
|
||
|
lora_path=None,
|
||
|
):
|
||
|
if hf_model:
|
||
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path_hf)
|
||
|
if self.tokenizer.pad_token is None:
|
||
|
self.tokenizer.pad_token = self.tokenizer.eos_token
|
||
|
self.model = AutoModelForCausalLM.from_pretrained(model_path_hf, output_hidden_states=output_hidden_states).to('cpu')
|
||
|
self.model.eval()
|
||
|
print(self.model)
|
||
|
# uncomment for half precision load
|
||
|
# self.model = self.model.half()
|
||
|
# self.model = self.model.cuda()
|
||
|
if gptq_hf:
|
||
|
self.gpqt_tokenizer = AutoTokenizer.from_pretrained(
|
||
|
model_path_gpqt_hf, use_fast=True
|
||
|
)
|
||
|
self.gpqt_model = AutoModelForCausalLM.from_pretrained(
|
||
|
model_path_gpqt_hf,
|
||
|
device_map="auto",
|
||
|
trust_remote_code=False,
|
||
|
revision="main",
|
||
|
)
|
||
|
if ggml:
|
||
|
self.ggml = Llama(
|
||
|
model_path=model_path_ggml,
|
||
|
lora_path=lora_path,
|
||
|
lora_base=lora_base,
|
||
|
n_ctx=5120,
|
||
|
n_parts=1,
|
||
|
f16_kv=3,
|
||
|
embedding=True,
|
||
|
)
|
||
|
|
||
|
def tokenize(self, text):
|
||
|
"""
|
||
|
Tokenizes the given text using the initialized tokenizer.
|
||
|
"""
|
||
|
tokens = self.tokenizer.tokenize(text)
|
||
|
return tokens
|
||
|
|
||
|
def get_embeddings_last_layer(self, text, operation="mean"):
|
||
|
"""
|
||
|
Retrieves embeddings from the last layer of the model for the given text.
|
||
|
Supports different pooling operations like mean, max, min, and sum.
|
||
|
"""
|
||
|
inputs = self.tokenizer(text, return_tensors="pt")
|
||
|
input_ids = inputs["input_ids"].to("cpu")
|
||
|
with torch.no_grad():
|
||
|
outputs = self.model(input_ids)
|
||
|
last_layer_hidden_states = outputs.hidden_states[0]
|
||
|
if operation == "mean":
|
||
|
pooled_embeddings = torch.mean(last_layer_hidden_states, dim=1).cpu().numpy()
|
||
|
elif operation == "max":
|
||
|
pooled_embeddings = torch.max(last_layer_hidden_states, dim=1).values.cpu().numpy()
|
||
|
elif operation == "min":
|
||
|
pooled_embeddings = torch.min(last_layer_hidden_states, dim=1).values.cpu().numpy()
|
||
|
elif operation == "sum":
|
||
|
pooled_embeddings = torch.sum(last_layer_hidden_states, dim=1).cpu().numpy()
|
||
|
else:
|
||
|
pooled_embeddings = last_layer_hidden_states.cpu().numpy()
|
||
|
return pooled_embeddings
|
||
|
|
||
|
# POOLING STRATEGY WEIGHTED MEAN POOLING
|
||
|
# t_input = self.tokenizer([text], padding=True, truncation=True, return_tensors="pt")
|
||
|
# with torch.no_grad():
|
||
|
# last_hidden_state = self.model(**t_input, output_hidden_states=True).hidden_states[-1]
|
||
|
# weights_for_non_padding = t_input.attention_mask * torch.arange(start=1, end=last_hidden_state.shape[1] + 1).unsqueeze(0)
|
||
|
# sum_embeddings = torch.sum(last_hidden_state * weights_for_non_padding.unsqueeze(-1), dim=1)
|
||
|
# print(sum_embeddings,'sum_embeddings')
|
||
|
# num_of_none_padding_tokens = torch.sum(weights_for_non_padding, dim=-1).unsqueeze(-1)
|
||
|
# print(num_of_none_padding_tokens,'num_of_none_padding_tokens')
|
||
|
# sentence_embeddings = sum_embeddings / num_of_none_padding_tokens
|
||
|
# # print(sentence_embeddings,'sentence_embeddings')
|
||
|
# return sentence_embeddings[0]
|
||
|
|
||
|
def get_embeddings(self, text, layer_num=-1):
|
||
|
"""
|
||
|
Retrieves embeddings from a specified layer (layer_num) of the model for the given text.
|
||
|
If layer_num is not specified, uses the last layer by default.
|
||
|
"""
|
||
|
inputs = self.tokenizer(text, return_tensors="pt")
|
||
|
|
||
|
with torch.no_grad():
|
||
|
outputs = self.model(**inputs)
|
||
|
|
||
|
hidden_states = outputs.hidden_states
|
||
|
layer_output = hidden_states[layer_num][0]
|
||
|
with open("hidden_layer.json", "w") as fp:
|
||
|
json.dump(list(hidden_states[layer_num].tolist()), fp)
|
||
|
sentence_embedding = torch.mean(layer_output, dim=0)
|
||
|
sentence_embedding = sentence_embedding.detach().cpu().numpy()
|
||
|
|
||
|
return sentence_embedding
|
||
|
|
||
|
def get_input_embeddings(self, text, operation):
|
||
|
"""
|
||
|
Retrieves input embeddings for the given text.
|
||
|
Supports different operations like mean, max, and min for pooling.
|
||
|
"""
|
||
|
inputs = self.tokenizer(text, return_tensors="pt")
|
||
|
input_ids = inputs["input_ids"]
|
||
|
with torch.no_grad():
|
||
|
input_embeddings = self.model.get_input_embeddings()
|
||
|
embeddings = input_embeddings(input_ids)
|
||
|
|
||
|
if operation == "mean":
|
||
|
return torch.mean(embeddings[0], 0).cpu().numpy()
|
||
|
elif operation =="max":
|
||
|
return torch.max(embeddings, dim=1)[0].cpu().numpy()
|
||
|
else:
|
||
|
return torch.min(embeddings, dim=1)[0].cpu().numpy()
|
||
|
|
||
|
|
||
|
def get_embeddings_ggml(self, text):
|
||
|
"""
|
||
|
Retrieves embeddings for the given text using the ggml model.
|
||
|
"""
|
||
|
return self.ggml.embed_text_llama(doc=text)
|
||
|
|
||
|
def generate_answer_ggml(self, text):
|
||
|
"""
|
||
|
Generates an answer to the given text using the ggml model.
|
||
|
"""
|
||
|
return self.ggml.create_completion(prompt=text)
|
||
|
|
||
|
def generate_answer(self, prompt):
|
||
|
"""
|
||
|
Generates a response to the given prompt using the initialized Hugging Face model.
|
||
|
"""
|
||
|
input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids.cuda()
|
||
|
output = self.model.generate(
|
||
|
inputs=input_ids,
|
||
|
temperature=0.7,
|
||
|
do_sample=True,
|
||
|
top_p=0.95,
|
||
|
top_k=40,
|
||
|
max_new_tokens=200,
|
||
|
)
|
||
|
|
||
|
return self.tokenizer.decode(output[0])
|
||
|
def generate_answer_gptq(self, prompt):
|
||
|
"""
|
||
|
Generates a response to the given prompt using the GPTQ model.
|
||
|
"""
|
||
|
input_ids = self.gpqt_tokenizer(prompt, return_tensors="pt").input_ids.cuda()
|
||
|
output = self.gpqt_model.generate(
|
||
|
inputs=input_ids,
|
||
|
temperature=0.7,
|
||
|
do_sample=True,
|
||
|
top_p=0.95,
|
||
|
top_k=40,
|
||
|
max_new_tokens=512,
|
||
|
)
|
||
|
return self.gpqt_tokenizer.decode(output[0])
|
||
|
|