from transformers import ( LlamaForCausalLM, LlamaTokenizer, AutoTokenizer, AutoModelForCausalLM, pipeline, ) import torch from llama_cpp import Llama import json class LLama: """ Initializes the LLama model with options to load various transformer models. Supports Hugging Face models, ggml models, and GPTQ models. """ def __init__( self, model_path_hf="", model_path_ggml="", model_path_gpqt_hf="", lora_base="", ggml=False, output_hidden_states=True, hf_model=False, gptq_hf=False, lora_path=None, ): if hf_model: self.tokenizer = AutoTokenizer.from_pretrained(model_path_hf) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.model = AutoModelForCausalLM.from_pretrained(model_path_hf, output_hidden_states=output_hidden_states).to('cpu') self.model.eval() print(self.model) # uncomment for half precision load # self.model = self.model.half() # self.model = self.model.cuda() if gptq_hf: self.gpqt_tokenizer = AutoTokenizer.from_pretrained( model_path_gpqt_hf, use_fast=True ) self.gpqt_model = AutoModelForCausalLM.from_pretrained( model_path_gpqt_hf, device_map="auto", trust_remote_code=False, revision="main", ) if ggml: self.ggml = Llama( model_path=model_path_ggml, lora_path=lora_path, lora_base=lora_base, n_ctx=5120, n_parts=1, f16_kv=3, embedding=True, ) def tokenize(self, text): """ Tokenizes the given text using the initialized tokenizer. """ tokens = self.tokenizer.tokenize(text) return tokens def get_embeddings_last_layer(self, text, operation="mean"): """ Retrieves embeddings from the last layer of the model for the given text. Supports different pooling operations like mean, max, min, and sum. """ inputs = self.tokenizer(text, return_tensors="pt") input_ids = inputs["input_ids"].to("cpu") with torch.no_grad(): outputs = self.model(input_ids) last_layer_hidden_states = outputs.hidden_states[0] if operation == "mean": pooled_embeddings = torch.mean(last_layer_hidden_states, dim=1).cpu().numpy() elif operation == "max": pooled_embeddings = torch.max(last_layer_hidden_states, dim=1).values.cpu().numpy() elif operation == "min": pooled_embeddings = torch.min(last_layer_hidden_states, dim=1).values.cpu().numpy() elif operation == "sum": pooled_embeddings = torch.sum(last_layer_hidden_states, dim=1).cpu().numpy() else: pooled_embeddings = last_layer_hidden_states.cpu().numpy() return pooled_embeddings # POOLING STRATEGY WEIGHTED MEAN POOLING # t_input = self.tokenizer([text], padding=True, truncation=True, return_tensors="pt") # with torch.no_grad(): # last_hidden_state = self.model(**t_input, output_hidden_states=True).hidden_states[-1] # weights_for_non_padding = t_input.attention_mask * torch.arange(start=1, end=last_hidden_state.shape[1] + 1).unsqueeze(0) # sum_embeddings = torch.sum(last_hidden_state * weights_for_non_padding.unsqueeze(-1), dim=1) # print(sum_embeddings,'sum_embeddings') # num_of_none_padding_tokens = torch.sum(weights_for_non_padding, dim=-1).unsqueeze(-1) # print(num_of_none_padding_tokens,'num_of_none_padding_tokens') # sentence_embeddings = sum_embeddings / num_of_none_padding_tokens # # print(sentence_embeddings,'sentence_embeddings') # return sentence_embeddings[0] def get_embeddings(self, text, layer_num=-1): """ Retrieves embeddings from a specified layer (layer_num) of the model for the given text. If layer_num is not specified, uses the last layer by default. """ inputs = self.tokenizer(text, return_tensors="pt") with torch.no_grad(): outputs = self.model(**inputs) hidden_states = outputs.hidden_states layer_output = hidden_states[layer_num][0] with open("hidden_layer.json", "w") as fp: json.dump(list(hidden_states[layer_num].tolist()), fp) sentence_embedding = torch.mean(layer_output, dim=0) sentence_embedding = sentence_embedding.detach().cpu().numpy() return sentence_embedding def get_input_embeddings(self, text, operation): """ Retrieves input embeddings for the given text. Supports different operations like mean, max, and min for pooling. """ inputs = self.tokenizer(text, return_tensors="pt") input_ids = inputs["input_ids"] with torch.no_grad(): input_embeddings = self.model.get_input_embeddings() embeddings = input_embeddings(input_ids) if operation == "mean": return torch.mean(embeddings[0], 0).cpu().numpy() elif operation =="max": return torch.max(embeddings, dim=1)[0].cpu().numpy() else: return torch.min(embeddings, dim=1)[0].cpu().numpy() def get_embeddings_ggml(self, text): """ Retrieves embeddings for the given text using the ggml model. """ return self.ggml.embed_text_llama(doc=text) def generate_answer_ggml(self, text): """ Generates an answer to the given text using the ggml model. """ return self.ggml.create_completion(prompt=text) def generate_answer(self, prompt): """ Generates a response to the given prompt using the initialized Hugging Face model. """ input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids.cuda() output = self.model.generate( inputs=input_ids, temperature=0.7, do_sample=True, top_p=0.95, top_k=40, max_new_tokens=200, ) return self.tokenizer.decode(output[0]) def generate_answer_gptq(self, prompt): """ Generates a response to the given prompt using the GPTQ model. """ input_ids = self.gpqt_tokenizer(prompt, return_tensors="pt").input_ids.cuda() output = self.gpqt_model.generate( inputs=input_ids, temperature=0.7, do_sample=True, top_p=0.95, top_k=40, max_new_tokens=512, ) return self.gpqt_tokenizer.decode(output[0])