Skip to content
Snippets Groups Projects
Unverified Commit 4e6e7e4f authored by Hamid Shojanazeri's avatar Hamid Shojanazeri Committed by GitHub
Browse files

Initial Crusoe examples to 3p_integrations recipes (#716)

parents 3128aee0 268e473b
No related branches found
No related tags found
No related merge requests found
Showing
with 2024 additions and 0 deletions
......@@ -1466,3 +1466,20 @@ OCRVQA
OCRVQADataCollator
ocrvqa
langchain
GiB
Terraform
gb
TPOT
ctrl
finetunes
llmcompressor
prefill
qps
terraform
tf
tmux
tpot
ttft
uv
8xL40S
xL
Below are recipes for deploying common Llama workflows on [Crusoe's](https://crusoe.ai) high-performance, sustainable cloud. Each workflow corresponds to a subfolder with its own README and supplemental materials. Please reference the table below for hardware requirements.
| Workflow | Model(s) | VM type | Storage |
|:----: | :----: | :----:| :----: |
| [Serving Llama3.1 in FP8 with vLLM](vllm-fp8/) | [meta-llama/Meta-Llama-3.1-70B-Instruct](https://huggingface.co/meta-llama/Meta-Llama-3.1-70B-Instruct), [meta-llama/Meta-Llama-3.1-8B-Instruct](https://huggingface.co/meta-llama/Meta-Llama-3.1-8B-Instruct) | l40s-48gb.8x | 256 GiB Persistent Disk |
# Requirements
First, ensure that you have a Crusoe account (you can sign up [here](https://console.crusoecloud.com/)). We will provision resources using Terraform, please ensure that your environment is configured and refer to the Crusoe [docs](https://github.com/crusoecloud/terraform-provider-crusoe?tab=readme-ov-file#getting-started) for guidance.
# Serving Models
Some recipes in this repo require firewall rules to expose ports in order to reach the inference server. To manage firewall rules, please refer to our [networking documentation](https://docs.crusoecloud.com/networking/firewall-rules/managing-firewall-rules).
In this article, we will show how to benchmark FP8 models on L40S using the vLLM inference engine. At the end, you should have an understanding of how to use `llm-compressor` to create quantize existing Llama3 finetunes in higher precision to fp8, benchmark throughput and latency to compare performance, and finally serve models using `vllm`.
# Provisioning Resources
First, navigate to this repository from your local machine. Update the corresponding variables in `locals` inside `main.tf` to match your environment (e.g. the path to your SSH key), then initialize the terraform project with `terraform init` and provision resources with `terraform apply`. Note that this will create a VM equipped with 8xL40S and a 256GB persistent disk. After the VM has been created, terraform will output the public IP address.
## Mount Storage
`ssh` into your VM. Then, run the below commands to mount the attached disk to `/scratch`.
```bash
mkfs.ext4 /dev/vdb
mkdir /scratch
mount -t ext4 /dev/vdb /scratch
cd /scratch
```
# Install Dependencies
We'll use [uv](https://github.com/astral-sh/uv) to install dependencies. First, install the tool with
```bash
apt-get update && apt-get install -y curl
apt-get install tmux
curl -LsSf https://astral.sh/uv/install.sh | sh
source $HOME/.cargo/env
```
Now, clone the recipes and navigate to this tutorial. Initialize the virtual environment and install dependencies:
```bash
git clone https://github.com/meta-llama/llama-recipes.git
cd llama-recipes/recipes/3p_integrations/crusoe/vllm-fp8/
uv add vllm setuptools
```
# Run Benchmarks
Before starting the vLLM server, we'll configure HuggingFace to save to our shared disk, specify the model tag, and set tensor parallelism to 1.
```bash
export HF_HOME=/scratch/
export MODEL=neuralmagic/Meta-Llama-3.1-8B-Instruct-FP8-dynamic
export TP_SIZE=1
```
Now, we'll use tmux to run our server inside of a detachable session.
```bash
tmux new -s server
uv run vllm serve $MODEL --enable-chunked-prefill --disable-log-requests --tensor-parallel-size $TP_SIZE
```
vLLM will download the model from HF and serve it on port 8000. Now, detach from the tmux session (`ctrl+b` then `d`) and we'll simulate a client.
```bash
tmux new -s client
chmod +x run_benchmark.sh
./run_benchmark.sh
```
Let's inspect the benchmark script to see what's going on.
```bash
TOTAL_SECONDS=120
QPS_RATES=("1" "3" "5" "7" "9")
for QPS in ${QPS_RATES[@]}; do
NUM_PROMPTS=$((TOTAL_SECONDS * QPS))
echo "===== RUNNING NUM_PROMPTS = $NUM_PROMPTS QPS = $QPS ====="
uv run benchmarks/benchmark_serving.py \
--model $MODEL \
--dataset-name sonnet --sonnet-input-len 550 --sonnet-output-len 150 --dataset-path benchmarks/sonnet.txt \
--num-prompts $NUM_PROMPTS --request-rate $QPS --save-result
done
```
This is a convenience wrapper that re-runs the vLLM `benchmarks/benchmark_serving.py` with queries-per-second (QPS) gradually increasing from 1 to 9 and saves the results. After each run completes, a JSON will appear in the same directory containing inference statistics.
# Results
We repeated the above benchmark across the fp8 and fp16 versions of both Llama3.1 8B and 70B.
![TPOT vs QPS](assets/tpot_vs_qps_chart.png "TPOT vs QPS")
In the above chart, we compare time-per-output-token (TPOT) across different QPS volumes. For fp16 70B we run across 8 GPUs while in fp8 we only use 4 and we still maintain the same TPOT range. The 8B models are run across 1 GPU though fp8 is noticeably faster.
![TPOT vs QPS](assets/ttft_vs_qps_chart.png "TTFT vs QPS")
Looking at our time-to-first-token (TTFT), we observe the same trends. Even though the fp8 70B is run across half as many GPUs, its TTFT is roughly the same as the fp16 version on 8.
# Converting Llama3 models to FP8
If you wish to convert your existing finetunes to FP8, we can easily achieve this using [llmcompressor](https://github.com/vllm-project/llm-compressor).
```bash
uv add llmcompressor
uv run convert_hf_to_fp8.py NousResearch/Hermes-3-Llama-3.1-70B
```
To use the converted model, update `$MODEL` to your absolute path for the converted version, then rerun `uv run vllm serve $MODEL --enable-chunked-prefill --disable-log-requests --tensor-parallel-size $TP_SIZE`. Now, we have a vLLM server up with our converted finetune and can rerun our previous benchmarks to verify performance.
# Cleaning up
To clean up the resources we've provisioned, we can simply run `terraform destroy` from within this repository on your local machine.
recipes/3p_integrations/crusoe/vllm-fp8/assets/tpot_vs_qps_chart.png

332 KiB

recipes/3p_integrations/crusoe/vllm-fp8/assets/ttft_vs_qps_chart.png

288 KiB

import json
import os
import sys
import time
import traceback
from dataclasses import dataclass, field
from typing import List, Optional, Union
import aiohttp
import huggingface_hub.constants
from tqdm.asyncio import tqdm
from transformers import (AutoTokenizer, PreTrainedTokenizer,
PreTrainedTokenizerFast)
AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)
@dataclass
class RequestFuncInput:
prompt: str
api_url: str
prompt_len: int
output_len: int
model: str
best_of: int = 1
use_beam_search: bool = False
@dataclass
class RequestFuncOutput:
generated_text: str = ""
success: bool = False
latency: float = 0.0
ttft: float = 0.0 # Time to first token
itl: List[float] = field(
default_factory=list) # List of inter-token latencies
prompt_len: int = 0
error: str = ""
async def async_request_tgi(
request_func_input: RequestFuncInput,
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
api_url = request_func_input.api_url
assert api_url.endswith("generate_stream")
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
assert not request_func_input.use_beam_search
params = {
"best_of": request_func_input.best_of,
"max_new_tokens": request_func_input.output_len,
"do_sample": True,
"temperature": 0.01, # TGI does not accept 0.0 temperature.
"top_p": 0.99, # TGI does not accept 1.0 top_p.
}
payload = {
"inputs": request_func_input.prompt,
"parameters": params,
}
output = RequestFuncOutput()
output.prompt_len = request_func_input.prompt_len
ttft = 0.0
st = time.perf_counter()
most_recent_timestamp = st
try:
async with session.post(url=api_url, json=payload) as response:
if response.status == 200:
async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
if not chunk_bytes:
continue
chunk_bytes = chunk_bytes.decode("utf-8")
#NOTE: Sometimes TGI returns a ping response without
# any data, we should skip it.
if chunk_bytes.startswith(":"):
continue
chunk = remove_prefix(chunk_bytes, "data:")
data = json.loads(chunk)
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = time.perf_counter() - st
output.ttft = ttft
# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)
most_recent_timestamp = timestamp
output.latency = most_recent_timestamp - st
output.success = True
output.generated_text = data["generated_text"]
else:
output.error = response.reason or ""
output.success = False
except Exception:
output.success = False
exc_info = sys.exc_info()
output.error = "".join(traceback.format_exception(*exc_info))
if pbar:
pbar.update(1)
return output
async def async_request_trt_llm(
request_func_input: RequestFuncInput,
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
api_url = request_func_input.api_url
assert api_url.endswith("generate_stream")
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
assert not request_func_input.use_beam_search
assert request_func_input.best_of == 1
payload = {
"accumulate_tokens": True,
"text_input": request_func_input.prompt,
"temperature": 0.0,
"top_p": 1.0,
"max_tokens": request_func_input.output_len,
"stream": True,
}
output = RequestFuncOutput()
output.prompt_len = request_func_input.prompt_len
ttft = 0.0
st = time.perf_counter()
most_recent_timestamp = st
try:
async with session.post(url=api_url, json=payload) as response:
if response.status == 200:
async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
if not chunk_bytes:
continue
chunk = remove_prefix(chunk_bytes.decode("utf-8"),
"data:")
data = json.loads(chunk)
output.generated_text += data["text_output"]
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = time.perf_counter() - st
output.ttft = ttft
# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)
most_recent_timestamp = timestamp
output.latency = most_recent_timestamp - st
output.success = True
else:
output.error = response.reason or ""
output.success = False
except Exception:
output.success = False
exc_info = sys.exc_info()
output.error = "".join(traceback.format_exception(*exc_info))
if pbar:
pbar.update(1)
return output
async def async_request_deepspeed_mii(
request_func_input: RequestFuncInput,
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
assert request_func_input.best_of == 1
assert not request_func_input.use_beam_search
payload = {
"prompt": request_func_input.prompt,
"max_tokens": request_func_input.output_len,
"temperature": 0.01, # deepspeed-mii does not accept 0.0 temp.
"top_p": 1.0,
}
output = RequestFuncOutput()
output.prompt_len = request_func_input.prompt_len
# NOTE: DeepSpeed-MII doesn't support streaming as of Jan 28 2024,
# will use 0 as placeholder.
# See https://github.com/microsoft/DeepSpeed-MII/pull/311
output.ttft = 0
st = time.perf_counter()
try:
async with session.post(url=request_func_input.api_url,
json=payload) as response:
if response.status == 200:
parsed_resp = await response.json()
output.latency = time.perf_counter() - st
output.generated_text = parsed_resp["text"][0]
output.success = True
else:
output.error = response.reason or ""
output.success = False
except Exception:
output.success = False
exc_info = sys.exc_info()
output.error = "".join(traceback.format_exception(*exc_info))
if pbar:
pbar.update(1)
return output
async def async_request_openai_completions(
request_func_input: RequestFuncInput,
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
api_url = request_func_input.api_url
assert api_url.endswith(
("completions", "profile")
), "OpenAI Completions API URL must end with 'completions' or 'profile'."
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
assert not request_func_input.use_beam_search
payload = {
"model": request_func_input.model,
"prompt": request_func_input.prompt,
"temperature": 0.0,
"best_of": request_func_input.best_of,
"max_tokens": request_func_input.output_len,
"stream": True,
}
headers = {
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"
}
output = RequestFuncOutput()
output.prompt_len = request_func_input.prompt_len
generated_text = ""
ttft = 0.0
st = time.perf_counter()
most_recent_timestamp = st
try:
async with session.post(url=api_url, json=payload,
headers=headers) as response:
if response.status == 200:
async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
if not chunk_bytes:
continue
chunk = remove_prefix(chunk_bytes.decode("utf-8"),
"data: ")
if chunk == "[DONE]":
latency = time.perf_counter() - st
else:
data = json.loads(chunk)
# NOTE: Some completion API might have a last
# usage summary response without a token so we
# want to check a token was generated
if data["choices"][0]["text"]:
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = time.perf_counter() - st
output.ttft = ttft
# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)
most_recent_timestamp = timestamp
generated_text += data["choices"][0]["text"]
output.generated_text = generated_text
output.success = True
output.latency = latency
else:
output.error = response.reason or ""
output.success = False
except Exception:
output.success = False
exc_info = sys.exc_info()
output.error = "".join(traceback.format_exception(*exc_info))
if pbar:
pbar.update(1)
return output
async def async_request_openai_chat_completions(
request_func_input: RequestFuncInput,
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
api_url = request_func_input.api_url
assert api_url.endswith(
"chat/completions"
), "OpenAI Chat Completions API URL must end with 'chat/completions'."
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
assert not request_func_input.use_beam_search
payload = {
"model": request_func_input.model,
"messages": [
{
"role": "user",
"content": request_func_input.prompt,
},
],
"temperature": 0.0,
"max_tokens": request_func_input.output_len,
"stream": True,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
}
output = RequestFuncOutput()
output.prompt_len = request_func_input.prompt_len
generated_text = ""
ttft = 0.0
st = time.perf_counter()
most_recent_timestamp = st
try:
async with session.post(url=api_url, json=payload,
headers=headers) as response:
if response.status == 200:
async for chunk_bytes in response.content:
chunk_bytes = chunk_bytes.strip()
if not chunk_bytes:
continue
chunk = remove_prefix(chunk_bytes.decode("utf-8"),
"data: ")
if chunk == "[DONE]":
latency = time.perf_counter() - st
else:
timestamp = time.perf_counter()
data = json.loads(chunk)
delta = data["choices"][0]["delta"]
if delta.get("content", None):
# First token
if ttft == 0.0:
ttft = time.perf_counter() - st
output.ttft = ttft
# Decoding phase
else:
output.itl.append(timestamp -
most_recent_timestamp)
generated_text += delta["content"]
most_recent_timestamp = timestamp
output.generated_text = generated_text
output.success = True
output.latency = latency
else:
output.error = response.reason or ""
output.success = False
except Exception:
output.success = False
exc_info = sys.exc_info()
output.error = "".join(traceback.format_exception(*exc_info))
if pbar:
pbar.update(1)
return output
# Since vllm must support Python 3.8, we can't use str.removeprefix(prefix)
# introduced in Python 3.9
def remove_prefix(text: str, prefix: str) -> str:
if text.startswith(prefix):
return text[len(prefix):]
return text
def get_model(pretrained_model_name_or_path: str) -> str:
if os.getenv('VLLM_USE_MODELSCOPE', 'False').lower() == 'true':
from modelscope import snapshot_download
model_path = snapshot_download(
model_id=pretrained_model_name_or_path,
local_files_only=huggingface_hub.constants.HF_HUB_OFFLINE,
ignore_file_pattern=[".*.pt", ".*.safetensors", ".*.bin"])
return model_path
return pretrained_model_name_or_path
def get_tokenizer(
pretrained_model_name_or_path: str, trust_remote_code: bool
) -> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]:
if pretrained_model_name_or_path is not None and not os.path.exists(
pretrained_model_name_or_path):
pretrained_model_name_or_path = get_model(
pretrained_model_name_or_path)
return AutoTokenizer.from_pretrained(pretrained_model_name_or_path,
trust_remote_code=trust_remote_code)
ASYNC_REQUEST_FUNCS = {
"tgi": async_request_tgi,
"vllm": async_request_openai_completions,
"lmdeploy": async_request_openai_completions,
"deepspeed-mii": async_request_deepspeed_mii,
"openai": async_request_openai_completions,
"openai-chat": async_request_openai_chat_completions,
"tensorrt-llm": async_request_trt_llm,
"scalellm": async_request_openai_completions,
}
This diff is collapsed.
This diff is collapsed.
import torch
import argparse
from transformers import AutoTokenizer
from llmcompressor.transformers import SparseAutoModelForCausalLM, oneshot
from llmcompressor.transformers.compression.helpers import ( # noqa
calculate_offload_device_map,
custom_offload_device_map,
)
def main():
parser = argparse.ArgumentParser(description="Compress a language model.")
parser.add_argument("model_stub", type=str, help="The model stub (e.g., 'bosonai/Higgs-Llama-3-70B')")
args = parser.parse_args()
recipe = """
quant_stage:
quant_modifiers:
QuantizationModifier:
ignore: ["lm_head"]
config_groups:
group_0:
weights:
num_bits: 8
type: float
strategy: channel
dynamic: false
symmetric: true
input_activations:
num_bits: 8
type: float
strategy: token
dynamic: true
symmetric: true
targets: ["Linear"]
"""
model_stub = args.model_stub
model_name = model_stub.split("/")[-1]
device_map = calculate_offload_device_map(
model_stub, reserve_for_hessians=False, num_gpus=1, torch_dtype=torch.float16
)
model = SparseAutoModelForCausalLM.from_pretrained(
model_stub, torch_dtype=torch.float16, device_map=device_map
)
output_dir = f"./{model_name}-FP8-dynamic"
oneshot(
model=model,
recipe=recipe,
output_dir=output_dir,
save_compressed=True,
tokenizer=AutoTokenizer.from_pretrained(model_stub),
)
if __name__ == "__main__":
main()
\ No newline at end of file
terraform {
required_providers {
crusoe = {
source = "registry.terraform.io/crusoecloud/crusoe"
}
}
}
locals {
my_ssh_key = file("~/.ssh/id_ed25519.pub")
}
// new VM
resource "crusoe_compute_instance" "vllm_vm" {
name = "vllm-example"
type = "l40s-48gb.8x"
location = "us-southcentral1-a"
# specify the base image
image = "ubuntu22.04-nvidia-slurm:12.4"
disks = [
{
id = crusoe_storage_disk.vllm_data_disk.id
mode = "read-write"
attachment_type = "data"
}
]
ssh_key = local.my_ssh_key
}
resource "crusoe_storage_disk" "vllm_data_disk" {
name = "vllm-example-disk"
size = "256GiB"
location = "us-southcentral1-a"
}
output "instance_public_ip" {
value = crusoe_compute_instance.vllm_vm.network_interfaces[0].public_ipv4.address
}
import json
import os
import re
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict
def extract_info_from_filename(filename):
pattern = r'(?P<backend>[^-]+)-(?P<qps>\d+\.\d+)qps-(?P<model>.+)-(?P<date>\d{8}-\d{6})\.json'
match = re.match(pattern, filename)
if match:
return {
'qps': float(match.group('qps')),
'model': match.group('model')
}
return None
def read_json_files(directory):
data_tpot = defaultdict(list)
data_ttft = defaultdict(list)
for filename in os.listdir(directory):
if filename.endswith('.json'):
filepath = os.path.join(directory, filename)
file_info = extract_info_from_filename(filename)
if file_info:
with open(filepath, 'r') as file:
json_data = json.load(file)
median_tpot = json_data.get('median_tpot_ms')
std_tpot = json_data.get('std_tpot_ms')
median_ttft = json_data.get('median_ttft_ms')
std_ttft = json_data.get('std_ttft_ms')
if all(v is not None for v in [median_tpot, std_tpot, median_ttft, std_ttft]):
data_tpot[file_info['model']].append((file_info['qps'], median_tpot, std_tpot))
data_ttft[file_info['model']].append((file_info['qps'], median_ttft, std_ttft))
return {
'tpot': {model: sorted(points) for model, points in data_tpot.items()},
'ttft': {model: sorted(points) for model, points in data_ttft.items()}
}
def create_chart(data, metric, filename):
plt.figure(figsize=(12, 6))
colors = plt.cm.rainbow(np.linspace(0, 1, len(data)))
for (model, points), color in zip(data.items(), colors):
qps_values, median_values, std_values = zip(*points)
plt.errorbar(qps_values, median_values, yerr=std_values, fmt='o-', capsize=5, capthick=2, label=model, color=color)
plt.fill_between(qps_values,
np.array(median_values) - np.array(std_values),
np.array(median_values) + np.array(std_values),
alpha=0.2, color=color)
plt.xlabel('QPS (Queries Per Second)')
plt.ylabel(f'Median {metric.upper()} (ms)')
plt.title(f'Median {metric.upper()} vs QPS with Standard Deviation')
plt.grid(True)
plt.legend(title='Model', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(filename, dpi=300, bbox_inches='tight')
plt.close()
def main():
directory = './'
data = read_json_files(directory)
if data['tpot'] and data['ttft']:
create_chart(data['tpot'], 'tpot', 'tpot_vs_qps_chart.png')
create_chart(data['ttft'], 'ttft', 'ttft_vs_qps_chart.png')
print("Charts have been saved as 'tpot_vs_qps_chart.png' and 'ttft_vs_qps_chart.png'")
else:
print("No valid data found in the specified directory.")
if __name__ == "__main__":
main()
\ No newline at end of file
[project]
name = "vllm-l40s"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"setuptools>=74.0.0",
"vllm>=0.5.5",
"matplotlib>=3.9.2",
"llmcompressor>=0.1.0",
]
TOTAL_SECONDS=120
QPS_RATES=("1" "3" "5" "7" "9")
for QPS in ${QPS_RATES[@]}; do
NUM_PROMPTS=$((TOTAL_SECONDS * QPS))
echo "===== RUNNING NUM_PROMPTS = $NUM_PROMPTS QPS = $QPS ====="
uv run benchmarks/benchmark_serving.py \
--model $MODEL \
--dataset-name sonnet --sonnet-input-len 550 --sonnet-output-len 150 --dataset-path benchmarks/sonnet.txt \
--num-prompts $NUM_PROMPTS --request-rate $QPS --save-result
done
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment