AI Tools from Intel
Find answers to your toolkit installation, configuration, and get-started questions.
130 Discussions

Attempting to Use Intel-extension-for-pytorch to distribute the run of my Local Llama LLM

CMCAlephInnv
Novice
457 Views

Hi, I am attempting to switch over to intel-extension-for-pytorch to run my Local LLama LLM since I ran into problems when trying to distribute on intel-extension-for-tensorflow. I work on 16 Intel Datacenter GPU Max 1550s, which have a total FP32 performance of ~1000 TFLOPS. I am running into some rather early errors here and struggled to find similar issues on the intel-extension-for-pytorch github forum. The code for my LocalLlama is set up correctly to run my Llama model correctly (I believe), and I have specific methods in the class for my specific application.

Here is my error (I am not sure how to install drivers for my intel cloud xpus):

2024-10-12 14:13:42,060 - __main__ - ERROR - XPU is not available through IPEX.
2024-10-12 14:13:42,060 - __main__ - ERROR - To fix this, please ensure you have:
2024-10-12 14:13:42,060 - __main__ - ERROR - 1. Installed Intel Extension for PyTorch: pip install intel_extension_for_pytorch
2024-10-12 14:13:42,060 - __main__ - ERROR - 2. Proper Intel GPU drivers installed
2024-10-12 14:13:42,060 - __main__ - ERROR - 3. Verified installation with: python -c 'import intel_extension_for_pytorch as ipex; print(ipex.xpu.is_available())'
Traceback (most recent call last):
File "/home/sdp/AA_KEEP_THIS/notebooks/DataImprovements/Llama LLM/LocalLlama.py", line 2122, in <module>
llama = DistributedLocalLLaMA(MODEL_PATH)
File "/home/sdp/AA_KEEP_THIS/notebooks/DataImprovements/Llama LLM/LocalLlama.py", line 275, in __init__
raise RuntimeError("IPEX XPU support is not properly configured. Please see the error messages above for instructions.")
RuntimeError: IPEX XPU support is not properly configured. Please see the error messages above for instructions.

Here is my code, let me know what I can do to solve this!:

 

## Dependencies

import os
import logging
from functools import (
    partial, 
    lru_cache
)
import tensorflow as tf
import torch
import intel_extension_for_tensorflow as itex
import intel_extension_for_pytorch as ipex
import torch.utils.checkpoint
from tensorflow.keras import mixed_precision

logger = logging.getLogger(__name__)

# Create a separate logger for IPEX warnings
ipex_logger = logging.getLogger("IPEX")
ipex_logger.setLevel(logging.WARNING)
# ... NOT ALL DEPENDENCIES SHOWN

def check_xpu_support():
    try:
        if not ipex.xpu.is_available():
            logger.error("XPU is not available through IPEX.")
            logger.error("To fix this, please ensure you have:")
            logger.error("1. Installed Intel Extension for PyTorch: pip install intel_extension_for_pytorch")
            logger.error("2. Proper Intel GPU drivers installed")
            logger.error("3. Verified installation with: python -c 'import intel_extension_for_pytorch as ipex; print(ipex.xpu.is_available())'")
            return False
        return True
    except ImportError:
        logger.error("Intel Extension for PyTorch (IPEX) is not installed.")
        logger.error("Please install it with: pip install intel_extension_for_pytorch")
        return False

class DistributedLocalLLaMA:
    def __init__(self, model_path: str):
        if not check_xpu_support():
            raise RuntimeError("IPEX XPU support is not properly configured. Please see the error messages above for instructions.")
        
        self.xpu_devices = list(range(ipex.xpu.device_count()))
        self.tf_devices = [f"/XPU:{i}" for i in self.xpu_devices]
        
        logger.info(f"PyTorch version: {torch.__version__}")
        logger.info(f"IPEX version: {ipex.__version__}")
        logger.info(f"XPU available through IPEX: {ipex.xpu.is_available()}")
        logger.info(f"XPU device count: {ipex.xpu.device_count()}")
        
        try:
            for device_id in self.xpu_devices:
                ipex.xpu.set_device(device_id)
                torch.zeros(1, device=f"xpu:{device_id}")
            logger.info(f"Successfully verified XPU devices: {self.xpu_devices}")
        except Exception as e:
            raise RuntimeError(f"Failed to initialize XPU devices {self.xpu_devices}. Error: {e}")

        try:
            self.strategy = tf.distribute.MirroredStrategy(devices=self.tf_devices)
            logger.info(f"Successfully created MirroredStrategy with devices: {self.tf_devices}")
        except Exception as e:
            raise RuntimeError(f"Failed to create MirroredStrategy with devices {self.tf_devices}. Error: {e}")
        
        with self.strategy.scope():
            self.device = "xpu"
            logger.info(f"Using device: {self.device}")
            
            logger.info("Loading tokenizer...")
            self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
            
            logger.info("Loading model...")
            config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
            model = AutoModelForCausalLM.from_pretrained(
                model_path,
                config=config,
                trust_remote_code=True,
                torch_dtype=torch.bfloat16,
            )
            
            logger.info(f"Optimizing model with IPEX for device xpu:{self.xpu_devices[0]}...")
            try:
                self.model = ipex.optimize(model, dtype=torch.bfloat16)
                self.model = self.model.to(f"xpu:{self.xpu_devices[0]}")
                logger.info("Model optimization and device placement completed successfully!")
            except Exception as e:
                logger.warning(f"IPEX optimization encountered issues: {e}")
                logger.warning("Continuing with unoptimized model...")
                self.model = model.to(f"xpu:{self.xpu_devices[0]}")

    def bias_logits(self, input_ids: torch.Tensor, choices: List[str]) -> torch.Tensor:
        vocab_size = self.model.config.vocab_size
        choice_mask = torch.zeros(vocab_size, device=f"xpu:{self.xpu_devices[0]}")
        
        for choice in choices:
            choice_tokens = self.tokenizer.encode(choice, add_special_tokens=False)
            for token in choice_tokens:
                if token < vocab_size:
                    choice_mask[token] = 5.0
        
        return choice_mask

    @TF.function
    def generate_response(self, prompt: str, choices: List[str]) -> str:
        def generate_fn():
            inputs = self.tokenizer(prompt, return_tensors="pt")
            inputs = {k: v.to(f"xpu:{self.xpu_devices[0]}") for k, v in inputs.items()}
            
            choice_bias = self.bias_logits(inputs['input_ids'], choices)
            
            def apply_choice_bias(input_ids, logits):
                return logits + choice_bias.expand_as(logits)
            
            with torch.no_grad():
                with ipex.xpu.amp.autocast():
                    outputs = self.model.generate(
                        **inputs,
                        max_new_tokens=50,
                        do_sample=True,
                        temperature=0.3,
                        top_p=0.85,
                        num_return_sequences=1,
                        pad_token_id=self.tokenizer.eos_token_id,
                        logits_processor=[apply_choice_bias],
                    )
            
            response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            response = response[len(prompt):].strip()
            return response
        
        return self.strategy.run(generate_fn)

 

 

Labels (3)
0 Kudos
0 Replies
Reply