From 19724458b164b7fae728a0e4da4c43514125ed62 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 6 May 2024 17:57:45 -0700 Subject: [PATCH 01/28] Ported over prometheus implementation from previous repo --- llama_cpp/_utils.py | 72 ++++++++++++++++++++++++- llama_cpp/llama.py | 113 ++++++++++++++++++++++++++++++++++------ llama_cpp/server/app.py | 7 +++ pyproject.toml | 2 + 4 files changed, 176 insertions(+), 18 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index 781b265010..7ab94964b1 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -1,7 +1,9 @@ import os import sys +import psutil +import subprocess -from typing import Any, Dict +from typing import Any, Dict, List # Avoid "LookupError: unknown encoding: ascii" when open() called in a destructor outnull_file = open(os.devnull, "w") @@ -75,3 +77,71 @@ class Singleton(object, metaclass=MetaSingleton): def __init__(self): super(Singleton, self).__init__() + + +# Get snapshot of RAM and GPU usage before and after function execution. +# Adapted from: https://github.com/abetlen/llama-cpp-python/issues/223#issuecomment-1556203616 +def get_cpu_usage(pid) -> float: + """ + CPU usage in percentage by the current process. + """ + process = psutil.Process(pid) + return process.cpu_percent() + +def get_ram_usage(pid) -> float: + """ + RAM usage in MiB by the current process. + """ + process = psutil.Process(pid) + ram_info = process.memory_info() + ram_usage = ram_info.rss / (1024 * 1024) # Convert to MiB + return ram_usage + +def get_gpu_info_by_pid(pid) -> float: + """ + GPU memory usage by the current process (if GPU is available) + """ + try: + gpu_info = subprocess.check_output(["nvidia-smi", "--query-compute-apps=pid,used_memory", "--format=csv,noheader"]).decode("utf-8") + gpu_info = gpu_info.strip().split("\n") + for info in gpu_info: + gpu_pid, gpu_ram_usage = info.split(", ") + if int(gpu_pid) == pid: + return float(gpu_ram_usage.split()[0]) + except (subprocess.CalledProcessError, FileNotFoundError): + pass + return 0.0 + +def get_gpu_general_info() -> tuple[float, float, float]: + """ + GPU general info (if GPU is available) + """ + try: + gpu_info = subprocess.check_output(["nvidia-smi", "--query-gpu=utilization.gpu,memory.used,memory.free", "--format=csv,noheader"]).decode("utf-8") + gpu_utilization, gpu_memory_used, gpu_memory_free = gpu_info.strip().split("\n")[0].split(", ") + return tuple(float(tup.split()[0]) for tup in [gpu_utilization, gpu_memory_used, gpu_memory_free]) + except (subprocess.CalledProcessError, FileNotFoundError): + pass + return 0.0, 0.0, 0.0 + +def infer_service_from_prompt(prompt: str | List[str]): + """ + Infer the service for which a completion request is sent based on the prompt. + """ + LABEL_SUGGESTIONS_TASK = "Your task is to select the most relevant labels for a GitHub issue title from a list of labels provided." + ACCEPTANCE_CRITERIA_TASK = "Your task is to write the acceptance criteria for a GitHub issue." + SPRINT_REVIEW_TASK = "You are helping me prepare a sprint review." + + if isinstance(prompt, list): + prompt = " ".join(prompt) + + if LABEL_SUGGESTIONS_TASK in prompt: + return "label-suggestions" + + elif ACCEPTANCE_CRITERIA_TASK in prompt: + return "acceptance-criteria" + + elif SPRINT_REVIEW_TASK in prompt: + return "sprint-review" + + return "not-specified" diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index f927f0ca26..a32ea1e8df 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -38,6 +38,16 @@ import llama_cpp.llama_cpp as llama_cpp import llama_cpp.llama_chat_format as llama_chat_format +from llama_cpp.llama_metrics import Metrics, MetricsExporter + +from llama_cpp._utils import ( + infer_service_from_prompt, + get_cpu_usage, + get_ram_usage, + get_gpu_info_by_pid, + get_gpu_general_info, +) + from llama_cpp.llama_speculative import LlamaDraftModel import numpy as np @@ -448,6 +458,9 @@ def __init__( if self.verbose: print(f"Using fallback chat format: {chat_format}", file=sys.stderr) + # Prometheus metrics + self.metrics = MetricsExporter() + @property def ctx(self) -> llama_cpp.llama_context_p: assert self._ctx.ctx is not None @@ -950,6 +963,19 @@ def _create_completion( completion_id: str = f"cmpl-{str(uuid.uuid4())}" created: int = int(time.time()) + + # Variables required for metric collection + _metrics_dict = {} + _ttft_start = time.time() + _pid = os.getpid() + _tpot_metrics = [] + _labels = { + "service": infer_service_from_prompt(prompt), # Infer the service for which the completion is being generated + "request_type": "chat/completions", + } + # Get CPU usage before generating completion so it can be used to calculate CPU when called after completing the process + _ = get_cpu_usage(_pid) + # If prompt is empty, initialize completion with BOS token to avoid # detokenization including a space at the beginning of the completion completion_tokens: List[int] = [] if len(prompt) > 0 else [self.token_bos()] @@ -1043,23 +1069,26 @@ def logit_bias_processor( finish_reason = "length" multibyte_fix = 0 - for token in self.generate( - prompt_tokens, - top_k=top_k, - top_p=top_p, - min_p=min_p, - typical_p=typical_p, - temp=temperature, - tfs_z=tfs_z, - mirostat_mode=mirostat_mode, - mirostat_tau=mirostat_tau, - mirostat_eta=mirostat_eta, - frequency_penalty=frequency_penalty, - presence_penalty=presence_penalty, - repeat_penalty=repeat_penalty, - stopping_criteria=stopping_criteria, - logits_processor=logits_processor, - grammar=grammar, + _tpot_start = time.time() + for idx, token in enumerate( + self.generate( + prompt_tokens, + top_k=top_k, + top_p=top_p, + min_p=min_p, + typical_p=typical_p, + temp=temperature, + tfs_z=tfs_z, + mirostat_mode=mirostat_mode, + mirostat_tau=mirostat_tau, + mirostat_eta=mirostat_eta, + frequency_penalty=frequency_penalty, + presence_penalty=presence_penalty, + repeat_penalty=repeat_penalty, + stopping_criteria=stopping_criteria, + logits_processor=logits_processor, + grammar=grammar, + ) ): assert self._model.model is not None if llama_cpp.llama_token_is_eog(self._model.model, token): @@ -1216,6 +1245,14 @@ def logit_bias_processor( finish_reason = "length" break + # Record TTFT metric (once) + if idx == 0: + _metrics_dict["time_to_first_token"] = time.time() - _ttft_start + # Record TPOT metric + else: + _tpot_metrics.append(time.time() - _tpot_start) + _tpot_start = time.time() # reset + if stopping_criteria is not None and stopping_criteria( self._input_ids, self._scores[-1, :] ): @@ -1403,6 +1440,48 @@ def logit_bias_processor( "token_logprobs": token_logprobs, "top_logprobs": top_logprobs, } + + # Record TPOT metrics (per generated token) + _metrics_dict["time_per_output_token"] = _tpot_metrics + + # Record metrics from the C++ backend (converted to seconds) + _timings = llama_cpp.llama_get_timings(self._ctx.ctx) + _metrics_dict["load_time"] = round(_timings.t_load_ms / 1e3, 2) + _metrics_dict["sample_time"] = round(_timings.t_sample_ms / 1e3, 2) + _metrics_dict["sample_throughput"] = round(1e3 / _timings.t_sample_ms * _timings.n_sample, 2) if _timings.t_sample_ms > 0 else 0.0 + _metrics_dict["prompt_eval_time"] = round(_timings.t_p_eval_ms / 1e3, 2) + _metrics_dict["prompt_eval_throughput"] = round(1e3 / _timings.t_p_eval_ms * _timings.n_p_eval, 2) if _timings.t_p_eval_ms > 0 else 0.0 + _metrics_dict["completion_eval_time"] = round(_timings.t_eval_ms / 1e3, 2) + _metrics_dict["completion_eval_throughput"] = round(1e3 / _timings.t_eval_ms * _timings.n_eval, 2) if _timings.t_eval_ms > 0 else 0.0 + _metrics_dict["end_to_end_latency"] = round((_timings.t_end_ms - _timings.t_start_ms) / 1e3, 2) + + # Record prefill and generation token metrics + _metrics_dict["prefill_tokens"] = len(prompt_tokens) + _metrics_dict["generation_tokens"] = len(completion_tokens) + + # Record system info + _gpu_utilization, _gpu_memory_used, _gpu_memory_free = get_gpu_general_info() + _metrics_dict["cpu_utilization"] = get_cpu_usage(_pid) # TODO: Returning always 0.0 -> check + _metrics_dict["cpu_ram_pid"] = get_ram_usage(_pid) + _metrics_dict["gpu_utilization"] = _gpu_utilization + _metrics_dict["gpu_ram_usage"] = _gpu_memory_used + _metrics_dict["gpu_ram_free"] = _gpu_memory_free + _metrics_dict["gpu_ram_pid"] = get_gpu_info_by_pid(_pid) + _metrics_dict["state_size"] = llama_cpp.llama_get_state_size(self._ctx.ctx) + _metrics_dict["kv_cache_usage_ratio"] = round(1. * llama_cpp.llama_get_kv_cache_used_cells(self._ctx.ctx) / self.n_ctx(), 2) + _metrics_dict["system_info"] = { + "model": model_name, + "n_params": str(llama_cpp.llama_model_n_params(self.model)), + "n_embd": str(self.n_embd()), + "n_ctx": str(self.n_ctx()), + "n_vocab": str(self.n_vocab()), + "n_threads": str(self.n_threads) + } + + # Log metrics to Prometheus + #print(_metrics_dict, file=sys.stderr) + _all_metrics = Metrics(**_metrics_dict) + self.metrics.log_metrics(_all_metrics, labels=_labels) yield { "id": completion_id, diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index b6ed9b1b6b..d9bed39225 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -7,6 +7,8 @@ from functools import partial from typing import Iterator, List, Optional, Union, Dict +from prometheus_client import make_asgi_app + import llama_cpp import anyio @@ -145,6 +147,11 @@ def create_app( assert model_settings is not None set_llama_proxy(model_settings=model_settings) + # Add prometheus asgi middleware to route /metrics requests + # see: https://prometheus.github.io/client_python/exporting/http/fastapi-gunicorn/ + metrics_app = make_asgi_app() + app.mount("/metrics", metrics_app) + if server_settings.disable_ping_events: set_ping_message_factory(lambda: bytes()) diff --git a/pyproject.toml b/pyproject.toml index 8345cb1f09..4b0246623f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,8 @@ server = [ "sse-starlette>=1.6.1", "starlette-context>=0.3.6,<0.4", "PyYAML>=5.1", + "prometheus_client>=0.20.0", + "psutil>=5.9.8" ] test = [ "pytest>=7.4.0", From edd0ec69a2c34d14756a5d79a7f93ae28dd3a958 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 6 May 2024 17:58:40 -0700 Subject: [PATCH 02/28] Added kn_cache_usage_ratio metric --- llama_cpp/llama_metrics.py | 218 +++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 llama_cpp/llama_metrics.py diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py new file mode 100644 index 0000000000..9e00e8b99d --- /dev/null +++ b/llama_cpp/llama_metrics.py @@ -0,0 +1,218 @@ +from dataclasses import dataclass +from typing import Any, Optional, Dict, List + +from prometheus_client import Gauge, Info, Histogram + + +LABELS = ["request_type", "service"] + +@dataclass +class Metrics: + """ + A dataclass to store metrics for a request. + """ + # System metrics + system_info: Dict[str, Any] + state_size: int + cpu_utilization: float + cpu_ram_pid: float + gpu_utilization: float + gpu_ram_usage: float + gpu_ram_free: float + gpu_ram_pid: float + + # Metrics from the C++ backend + load_time: float + sample_time: float + sample_throughput: float + time_to_first_token: float + time_per_output_token: List[float] + prompt_eval_time: float + prompt_eval_throughput: float + completion_eval_time: float + completion_eval_throughput: float + end_to_end_latency: float + prefill_tokens: int + generation_tokens: int + kv_cache_usage_ratio: int + + +class MetricsExporter: + """ + A custom Prometheus Metrics Explorer for the LLAMA C++ backend. + Collects metrics per request sent to the backend. + """ + def __init__(self): + self.labels = LABELS + # One-time metrics + self._histrogram_load_time = Histogram( + name="llama_cpp_python:load_t_seconds", + documentation="Histogram of load time in seconds", + labelnames=self.labels, + buckets=[ + 0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, + 8.0, 9.0, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0 + ] + ) + # Request-level latencies + self._histogram_sample_time = Histogram( + name="llama_cpp_python:sample_t_seconds", + documentation="Histogram of token sampling time in seconds", + labelnames=self.labels, + buckets=[ + 0.00001, 0.00005, 0.0001, 0.00025, 0.0005, 0.001, 0.0025, + 0.005, 0.0075, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, + ] + ) + self._histogram_time_to_first_token = Histogram( + name="llama_cpp_python:ttft_seconds", + documentation="Histogram of time to first token in seconds", + labelnames=self.labels, + buckets=[ + 0.001, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.25, 0.5, + 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0 + ] + ) + self._histogram_time_per_output_token = Histogram( + name="llama_cpp_python:tpot_seconds", + documentation="Histogram of time per output token in seconds", + labelnames=self.labels, + buckets=[ + 0.001, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.25, 0.5, + 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0 + ] + ) + self._histogram_prompt_eval_time = Histogram( + name="llama_cpp_python:p_eval_t_seconds", + documentation="Histogram of prompt evaluation time in seconds", + labelnames=self.labels, + buckets=[ + 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, + 20.0, 25.0, 30.0, 40.0, 50.0, 60.0 + ] + ) + self._histogram_completion_eval_time = Histogram( + name="llama_cpp_python:c_eval_t_seconds", + documentation="Histogram of completion evaluation time in seconds", + labelnames=self.labels, + buckets=[ + 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, + 20.0, 25.0, 30.0, 40.0, 50.0, 60.0 + ] + ) + self._histogram_e2e_request_latency = Histogram( + name="llama_cpp_python:e2e_seconds", + documentation="Histogram of end-to-end request latency in seconds", + labelnames=self.labels, + buckets=[ + 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, + 20.0, 25.0, 30.0, 40.0, 50.0, 60.0 + ] + ) + # Prefill and generation tokens + self._histogram_prefill_tokens = Histogram( + name="llama_cpp_python:prefill_tokens_total", + documentation="Histogram of number of prefill tokens processed", + labelnames=self.labels, + buckets=[ + 1, 10, 25, 50, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, + 3500, 4000, 4500, 5000 + ] + ) + self._histogram_generation_tokens = Histogram( + name="llama_cpp_python:completion_tokens_total", + documentation="Histogram of number of generation tokens processed", + labelnames=self.labels, + buckets=[ + 1, 10, 25, 50, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, + 3500, 4000, 4500, 5000 + ] + ) + # Current throughput + self._gauge_prompt_eval_throughput = Gauge( + name="llama_cpp_python:prompt_eval_throughput", + documentation="Current throughput of the prompt evaluation process (in tokens/second)", + labelnames=self.labels + ) + self._gauge_completion_eval_throughput = Gauge( + name="llama_cpp_python:completion_eval_throughput", + documentation="Current throughput of the completion evaluation process (in tokens/second)", + labelnames=self.labels + ) + self._gauge_sample_throughput = Gauge( + name="llama_cpp_python:sample_throughput", + documentation="Current throughput of the token sampling process (in tokens/second)", + labelnames=self.labels + ) + # System info + self._gauge_state_size = Gauge( + name="llama_cpp_python:state_size", + documentation="Current state size in bytes of various components such as rng (random number generator), logits, embedding, and kv_cache (key-value cache)", + labelnames=self.labels + ) + self._gauge_cpu_utilization = Gauge( + name="llama_cpp_python:cpu_utilization", + documentation="Current CPU utilization", + labelnames=self.labels + ) + self._gauge_cpu_ram_usage_by_pid = Gauge( + name="llama_cpp_python:cpu_memory_usage_by_pid", + documentation="Current CPU memory usage during the request", + labelnames=self.labels + ) + self._gauge_gpu_utilization = Gauge( + name="llama_cpp_python:gpu_utilization", + documentation="Current GPU utilization", + labelnames=self.labels + ) + self._gauge_gpu_memory_usage = Gauge( + name="llama_cpp_python:gpu_memory_usage", + documentation="Current GPU memory usage", + labelnames=self.labels + ) + self._gauge_gpu_memory_free = Gauge( + name="llama_cpp_python:gpu_memory_free", + documentation="Current free GPU memory", + labelnames=self.labels + ) + self._gauge_gpu_memory_usage_by_pid = Gauge( + name="llama_cpp_python:gpu_memory_usage_by_pid", + documentation="Current GPU memory usage during the request", + labelnames=self.labels + ) + self._gauge_kv_cache_usage_ratio = Gauge( + name="llama_cpp_python:kv_cache_usage_ratio", + documentation="KV-cache usage. 1 means 100 percent usage", + labelnames=self.labels + ) + self._info = Info( + name="llama_cpp_python:info", + documentation="Server metadata" + ) + + def log_metrics(self, metrics: Metrics, labels: Dict[str, str]): + """ + Log the metrics using the Prometheus client. + """ + self._histrogram_load_time.labels(**labels).observe(metrics.load_time) + self._histogram_sample_time.labels(**labels).observe(metrics.sample_time) + self._histogram_time_to_first_token.labels(**labels).observe(metrics.time_to_first_token) + for _tpot in metrics.time_per_output_token: + self._histogram_time_per_output_token.labels(**labels).observe(_tpot) + self._histogram_prompt_eval_time.labels(**labels).observe(metrics.prompt_eval_time) + self._histogram_completion_eval_time.labels(**labels).observe(metrics.completion_eval_time) + self._histogram_e2e_request_latency.labels(**labels).observe(metrics.end_to_end_latency) + self._histogram_prefill_tokens.labels(**labels).observe(metrics.prefill_tokens) + self._histogram_generation_tokens.labels(**labels).observe(metrics.generation_tokens) + self._gauge_prompt_eval_throughput.labels(**labels).set(metrics.prompt_eval_throughput) + self._gauge_completion_eval_throughput.labels(**labels).set(metrics.completion_eval_throughput) + self._gauge_sample_throughput.labels(**labels).set(metrics.sample_throughput) + self._gauge_cpu_utilization.labels(**labels).set(metrics.cpu_utilization) + self._gauge_cpu_ram_usage_by_pid.labels(**labels).set(metrics.cpu_ram_pid) + self._gauge_gpu_utilization.labels(**labels).set(metrics.gpu_utilization) + self._gauge_gpu_memory_usage.labels(**labels).set(metrics.gpu_ram_usage) + self._gauge_gpu_memory_free.labels(**labels).set(metrics.gpu_ram_free) + self._gauge_gpu_memory_usage_by_pid.labels(**labels).set(metrics.gpu_ram_pid) + self._gauge_state_size.labels(**labels).set(metrics.state_size) + self._gauge_kv_cache_usage_ratio.labels(**labels).set(metrics.kv_cache_usage_ratio) + self._info.info(metrics.system_info) \ No newline at end of file From bd84f3c179bac46dd1dc0604db638dfd3852fdae Mon Sep 17 00:00:00 2001 From: juanroesel Date: Tue, 7 May 2024 14:53:25 -0700 Subject: [PATCH 03/28] Pulled synced commits locally and changed data type --- llama_cpp/llama_metrics.py | 2 +- vendor/llama.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py index 9e00e8b99d..7334c71401 100644 --- a/llama_cpp/llama_metrics.py +++ b/llama_cpp/llama_metrics.py @@ -34,7 +34,7 @@ class Metrics: end_to_end_latency: float prefill_tokens: int generation_tokens: int - kv_cache_usage_ratio: int + kv_cache_usage_ratio: float class MetricsExporter: diff --git a/vendor/llama.cpp b/vendor/llama.cpp index 628b299106..f364eb6fb5 160000 --- a/vendor/llama.cpp +++ b/vendor/llama.cpp @@ -1 +1 @@ -Subproject commit 628b299106d1e9476fdecb3cbe546bf5c60f1b89 +Subproject commit f364eb6fb5d46118a76fa045f487318de4c24961 From d9233fc40719e782e2dd241fb1c2397847a69987 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 13 May 2024 15:51:19 -0700 Subject: [PATCH 04/28] Launch local llama_cpp server running on zh-service-network --- .dockerignore | 1 + dev.Dockerfile | 44 ++++++++++++++++++++++++++++++++++++++++++++ dev.docker-compose | 15 +++++++++++++++ docker/simple/run.sh | 3 ++- 4 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 dev.Dockerfile create mode 100644 dev.docker-compose diff --git a/.dockerignore b/.dockerignore index fd64c09b37..1b85f8c9a3 100644 --- a/.dockerignore +++ b/.dockerignore @@ -2,6 +2,7 @@ _skbuild/ .envrc +# LLMs - comment if you'd like to bake the model into the image models/ # Byte-compiled / optimized / DLL files diff --git a/dev.Dockerfile b/dev.Dockerfile new file mode 100644 index 0000000000..24f5be7270 --- /dev/null +++ b/dev.Dockerfile @@ -0,0 +1,44 @@ +# Define the image argument and provide a default value +ARG IMAGE=python:3.11.8 + +# Use the image as specified +FROM ${IMAGE} + +# Re-declare the ARG after FROM +ARG IMAGE + +# Update and upgrade the existing packages +RUN apt-get update && apt-get upgrade -y && apt-get install -y --no-install-recommends \ + python3 \ + python3-pip \ + ninja-build \ + libopenblas-dev \ + build-essential \ + git + +RUN mkdir /app +WORKDIR /app +COPY . /app + +RUN python3 -m pip install --upgrade pip + +RUN make deps && make build && make clean + +# Set environment variable for the host +ENV GH_TOKEN=$GH_TOKEN +ENV HOST=0.0.0.0 +ENV PORT=8000 +ENV MODEL=/app/models/mistral-7b-openorca.Q5_K_M.gguf + +# # Install depencencies +# RUN python3 -m pip install --upgrade pip pytest cmake scikit-build setuptools fastapi uvicorn sse-starlette pydantic-settings starlette-context psutil prometheus_client + +# # Install llama-cpp-python (build with METAL) +# RUN CMAKE_ARGS="-DLLAMA_METAL=on" FORCE_CMAKE=1 pip install git+https://${GH_TOKEN}@github.com/ZenHubHQ/llama-cpp-python.git --force-reinstall --upgrade --no-cache-dir --verbose + +# Expose a port for the server +EXPOSE 8000 + +# Run the server start script +CMD ["/bin/sh", "/app/docker/simple/run.sh"] +# CMD python3 -m llama_cpp.server --n_gpu_layers -1 diff --git a/dev.docker-compose b/dev.docker-compose new file mode 100644 index 0000000000..7b21e468a2 --- /dev/null +++ b/dev.docker-compose @@ -0,0 +1,15 @@ +version: '3' +services: + dev-llama-cpp-python: + build: + context: . + dockerfile: dev.Dockerfile + ports: + - 8000:8000 + volumes: + - ./llama_cpp:/app/llama_cpp + networks: + - zh-service-network +networks: + zh-service-network: + external: true \ No newline at end of file diff --git a/docker/simple/run.sh b/docker/simple/run.sh index c85e73d2b6..d4fd489a0e 100644 --- a/docker/simple/run.sh +++ b/docker/simple/run.sh @@ -1,4 +1,5 @@ #!/bin/bash make build -uvicorn --factory llama_cpp.server.app:create_app --host $HOST --port $PORT +# uvicorn --factory llama_cpp.server.app:create_app --host $HOST --port $PORT --reload +python3 -m llama_cpp.server --model $MODEL --n_gpu_layers -1 \ No newline at end of file From a66c267345ead355731b4492d11bdad52ba7ff47 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 13 May 2024 15:55:09 -0700 Subject: [PATCH 05/28] Integrates 'ai_service' parameter with the backend --- llama_cpp/llama.py | 21 +++++++++++++++++---- llama_cpp/llama_chat_format.py | 3 +++ llama_cpp/server/app.py | 6 +++++- llama_cpp/server/types.py | 3 +++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 432f4db3b0..3f3662de66 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -70,6 +70,7 @@ class Llama: """High-level Python wrapper for a llama.cpp model.""" __backend_initialized = False + __prometheus_metrics = MetricsExporter() def __init__( self, @@ -464,7 +465,7 @@ def __init__( print(f"Using fallback chat format: {chat_format}", file=sys.stderr) # Prometheus metrics - self.metrics = MetricsExporter() + self.metrics = self.__prometheus_metrics @property def ctx(self) -> llama_cpp.llama_context_p: @@ -960,6 +961,7 @@ def _create_completion( logits_processor: Optional[LogitsProcessorList] = None, grammar: Optional[LlamaGrammar] = None, logit_bias: Optional[Dict[str, float]] = None, + ai_service: Optional[str] = None ) -> Union[ Iterator[CreateCompletionResponse], Iterator[CreateCompletionStreamResponse] ]: @@ -974,8 +976,10 @@ def _create_completion( _ttft_start = time.time() _pid = os.getpid() _tpot_metrics = [] + if not ai_service: + raise ValueError("ai_service must be provided") _labels = { - "service": infer_service_from_prompt(prompt), # Infer the service for which the completion is being generated + "service": ai_service if ai_service is not None else "not-specified", "request_type": "chat/completions", } # Get CPU usage before generating completion so it can be used to calculate CPU when called after completing the process @@ -1445,7 +1449,10 @@ def logit_bias_processor( "token_logprobs": token_logprobs, "top_logprobs": top_logprobs, } - + # Record TTFT metric -- Setting to None if no tokens were generated + if not _metrics_dict.get("time_to_first_token"): + _metrics_dict["time_to_first_token"] = None + # Record TPOT metrics (per generated token) _metrics_dict["time_per_output_token"] = _tpot_metrics @@ -1484,7 +1491,6 @@ def logit_bias_processor( } # Log metrics to Prometheus - #print(_metrics_dict, file=sys.stderr) _all_metrics = Metrics(**_metrics_dict) self.metrics.log_metrics(_all_metrics, labels=_labels) @@ -1493,6 +1499,7 @@ def logit_bias_processor( "object": "text_completion", "created": created, "model": model_name, + "service": ai_service, "choices": [ { "text": text_str, @@ -1535,6 +1542,7 @@ def create_completion( logits_processor: Optional[LogitsProcessorList] = None, grammar: Optional[LlamaGrammar] = None, logit_bias: Optional[Dict[str, float]] = None, + ai_service: Optional[str] = None ) -> Union[CreateCompletionResponse, Iterator[CreateCompletionStreamResponse]]: """Generate text from a prompt. @@ -1598,6 +1606,7 @@ def create_completion( logits_processor=logits_processor, grammar=grammar, logit_bias=logit_bias, + ai_service=ai_service ) if stream: chunks: Iterator[CreateCompletionStreamResponse] = completion_or_chunks @@ -1632,6 +1641,7 @@ def __call__( logits_processor: Optional[LogitsProcessorList] = None, grammar: Optional[LlamaGrammar] = None, logit_bias: Optional[Dict[str, float]] = None, + ai_service: Optional[str] = None ) -> Union[CreateCompletionResponse, Iterator[CreateCompletionStreamResponse]]: """Generate text from a prompt. @@ -1695,6 +1705,7 @@ def __call__( logits_processor=logits_processor, grammar=grammar, logit_bias=logit_bias, + ai_service=ai_service ) def create_chat_completion( @@ -1727,6 +1738,7 @@ def create_chat_completion( logit_bias: Optional[Dict[str, float]] = None, logprobs: Optional[bool] = None, top_logprobs: Optional[int] = None, + ai_service: Optional[str] = None ) -> Union[ CreateChatCompletionResponse, Iterator[CreateChatCompletionStreamResponse] ]: @@ -1796,6 +1808,7 @@ def create_chat_completion( logits_processor=logits_processor, grammar=grammar, logit_bias=logit_bias, + ai_service=ai_service ) def create_chat_completion_openai_v1( diff --git a/llama_cpp/llama_chat_format.py b/llama_cpp/llama_chat_format.py index 3ab94e0d3c..f099715d3a 100644 --- a/llama_cpp/llama_chat_format.py +++ b/llama_cpp/llama_chat_format.py @@ -87,6 +87,7 @@ def __call__( grammar: Optional[llama.LlamaGrammar] = None, logprobs: Optional[bool] = None, top_logprobs: Optional[int] = None, + ai_service: Optional[str] = None, **kwargs, # type: ignore ) -> Union[ llama_types.CreateChatCompletionResponse, @@ -535,6 +536,7 @@ def chat_completion_handler( logit_bias: Optional[Dict[str, float]] = None, logprobs: Optional[bool] = None, top_logprobs: Optional[int] = None, + ai_service: Optional[str] = None, **kwargs, # type: ignore ) -> Union[ llama_types.CreateChatCompletionResponse, @@ -625,6 +627,7 @@ def chat_completion_handler( stopping_criteria=stopping_criteria, grammar=grammar, logit_bias=logit_bias, + ai_service=ai_service ) if tool is not None: tool_name = tool["function"]["name"] diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index cb3a30582d..7301204835 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -386,6 +386,7 @@ async def create_chat_completion( {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of France?"}, ], + "ai_service": "copilot" }, }, "json_mode": { @@ -454,6 +455,8 @@ async def create_chat_completion( "user", } kwargs = body.model_dump(exclude=exclude) + # Adds the ai_service value from the request body to the kwargs + kwargs["ai_service"] = body.ai_service llama = llama_proxy(body.model) if body.logit_bias is not None: kwargs["logit_bias"] = ( @@ -471,7 +474,8 @@ async def create_chat_completion( if isinstance(iterator_or_completion, Iterator): # EAFP: It's easier to ask for forgiveness than permission - first_response = await run_in_threadpool(next, iterator_or_completion) + # NOTE: Including kwargs so it can also pass the "ai_service" argument to the iterator + first_response = await run_in_threadpool(next, iterator_or_completion, **kwargs) # If no exception was raised from first_response, we can assume that # the iterator is valid and we can use it to stream the response. diff --git a/llama_cpp/server/types.py b/llama_cpp/server/types.py index a20b3940f2..f3fa5fa733 100644 --- a/llama_cpp/server/types.py +++ b/llama_cpp/server/types.py @@ -259,6 +259,9 @@ class CreateChatCompletionRequest(BaseModel): } } + # AI service added as request body parameter by Client + ai_service: Optional[str] = None + class ModelData(TypedDict): id: str From 50d5f2b87741851c705359aefc68a92750e4b2fc Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 13 May 2024 15:55:36 -0700 Subject: [PATCH 06/28] Allows time_to_first_token to be None --- llama_cpp/llama_metrics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py index 7334c71401..46ef5ba1a7 100644 --- a/llama_cpp/llama_metrics.py +++ b/llama_cpp/llama_metrics.py @@ -196,7 +196,8 @@ def log_metrics(self, metrics: Metrics, labels: Dict[str, str]): """ self._histrogram_load_time.labels(**labels).observe(metrics.load_time) self._histogram_sample_time.labels(**labels).observe(metrics.sample_time) - self._histogram_time_to_first_token.labels(**labels).observe(metrics.time_to_first_token) + if metrics.time_to_first_token: + self._histogram_time_to_first_token.labels(**labels).observe(metrics.time_to_first_token) for _tpot in metrics.time_per_output_token: self._histogram_time_per_output_token.labels(**labels).observe(_tpot) self._histogram_prompt_eval_time.labels(**labels).observe(metrics.prompt_eval_time) From 33c022fcb8ac4b5afdad15e0b8980b5213e22f45 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 13 May 2024 15:59:03 -0700 Subject: [PATCH 07/28] ZenHubHQ/devops#2233 - Modified tests to accomodate 'ai_service' param integration --- tests/test_llama.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/tests/test_llama.py b/tests/test_llama.py index 469ef91cab..92b1c0a9e8 100644 --- a/tests/test_llama.py +++ b/tests/test_llama.py @@ -153,7 +153,9 @@ def mock_kv_cache_seq_add( def test_llama_patch(mock_llama): n_ctx = 128 + ai_service = "testing" llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, n_ctx=n_ctx) + n_vocab = llama_cpp.llama_n_vocab(llama._model.model) assert n_vocab == 32000 @@ -163,32 +165,32 @@ def test_llama_patch(mock_llama): ## Test basic completion from bos until eos mock_llama(llama, all_text) - completion = llama.create_completion("", max_tokens=36) + completion = llama.create_completion("", max_tokens=36, ai_service=ai_service) assert completion["choices"][0]["text"] == all_text assert completion["choices"][0]["finish_reason"] == "stop" ## Test basic completion until eos mock_llama(llama, all_text) - completion = llama.create_completion(text, max_tokens=20) + completion = llama.create_completion(text, max_tokens=20, ai_service=ai_service) assert completion["choices"][0]["text"] == output_text assert completion["choices"][0]["finish_reason"] == "stop" ## Test streaming completion until eos mock_llama(llama, all_text) - chunks = list(llama.create_completion(text, max_tokens=20, stream=True)) + chunks = list(llama.create_completion(text, max_tokens=20, stream=True, ai_service=ai_service)) assert "".join(chunk["choices"][0]["text"] for chunk in chunks) == output_text assert chunks[-1]["choices"][0]["finish_reason"] == "stop" ## Test basic completion until stop sequence mock_llama(llama, all_text) - completion = llama.create_completion(text, max_tokens=20, stop=["lazy"]) + completion = llama.create_completion(text, max_tokens=20, stop=["lazy"], ai_service=ai_service) assert completion["choices"][0]["text"] == " jumps over the " assert completion["choices"][0]["finish_reason"] == "stop" ## Test streaming completion until stop sequence mock_llama(llama, all_text) chunks = list( - llama.create_completion(text, max_tokens=20, stream=True, stop=["lazy"]) + llama.create_completion(text, max_tokens=20, stream=True, stop=["lazy"], ai_service=ai_service) ) assert ( "".join(chunk["choices"][0]["text"] for chunk in chunks) == " jumps over the " @@ -197,13 +199,13 @@ def test_llama_patch(mock_llama): ## Test basic completion until length mock_llama(llama, all_text) - completion = llama.create_completion(text, max_tokens=2) + completion = llama.create_completion(text, max_tokens=2, ai_service=ai_service) assert completion["choices"][0]["text"] == " jumps" assert completion["choices"][0]["finish_reason"] == "length" ## Test streaming completion until length mock_llama(llama, all_text) - chunks = list(llama.create_completion(text, max_tokens=2, stream=True)) + chunks = list(llama.create_completion(text, max_tokens=2, stream=True, ai_service=ai_service)) assert "".join(chunk["choices"][0]["text"] for chunk in chunks) == " jumps" assert chunks[-1]["choices"][0]["finish_reason"] == "length" @@ -230,15 +232,16 @@ def test_utf8(mock_llama): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, logits_all=True) output_text = "😀" + ai_service = "testing" ## Test basic completion with utf8 multibyte mock_llama(llama, output_text) - completion = llama.create_completion("", max_tokens=4) + completion = llama.create_completion("", max_tokens=4, ai_service=ai_service) assert completion["choices"][0]["text"] == output_text ## Test basic completion with incomplete utf8 multibyte mock_llama(llama, output_text) - completion = llama.create_completion("", max_tokens=1) + completion = llama.create_completion("", max_tokens=1, ai_service=ai_service) assert completion["choices"][0]["text"] == "" From b99dbf80a5e80b113f3759bd9bbaaf8ddb5bfe6d Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 13 May 2024 16:15:05 -0700 Subject: [PATCH 08/28] Using correct typing object Tuple --- llama_cpp/_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index 7ab94964b1..dbdc3179f6 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -3,7 +3,7 @@ import psutil import subprocess -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple # Avoid "LookupError: unknown encoding: ascii" when open() called in a destructor outnull_file = open(os.devnull, "w") @@ -112,7 +112,7 @@ def get_gpu_info_by_pid(pid) -> float: pass return 0.0 -def get_gpu_general_info() -> tuple[float, float, float]: +def get_gpu_general_info() -> Tuple[float, float, float]: """ GPU general info (if GPU is available) """ From d39c7d963586937427829448019098b16439734c Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 13 May 2024 16:18:49 -0700 Subject: [PATCH 09/28] Removing previous function to detect ai-service based off system message --- llama_cpp/_utils.py | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index dbdc3179f6..db5b6eb684 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -3,7 +3,7 @@ import psutil import subprocess -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Tuple, Union # Avoid "LookupError: unknown encoding: ascii" when open() called in a destructor outnull_file = open(os.devnull, "w") @@ -123,25 +123,3 @@ def get_gpu_general_info() -> Tuple[float, float, float]: except (subprocess.CalledProcessError, FileNotFoundError): pass return 0.0, 0.0, 0.0 - -def infer_service_from_prompt(prompt: str | List[str]): - """ - Infer the service for which a completion request is sent based on the prompt. - """ - LABEL_SUGGESTIONS_TASK = "Your task is to select the most relevant labels for a GitHub issue title from a list of labels provided." - ACCEPTANCE_CRITERIA_TASK = "Your task is to write the acceptance criteria for a GitHub issue." - SPRINT_REVIEW_TASK = "You are helping me prepare a sprint review." - - if isinstance(prompt, list): - prompt = " ".join(prompt) - - if LABEL_SUGGESTIONS_TASK in prompt: - return "label-suggestions" - - elif ACCEPTANCE_CRITERIA_TASK in prompt: - return "acceptance-criteria" - - elif SPRINT_REVIEW_TASK in prompt: - return "sprint-review" - - return "not-specified" From 4b476b96166b4d856e5ec0099555ca14eb33e104 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 13 May 2024 16:27:25 -0700 Subject: [PATCH 10/28] Removed infer_service_from_prompt from imports --- llama_cpp/llama.py | 1 - 1 file changed, 1 deletion(-) diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 3f3662de66..00835256d3 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -41,7 +41,6 @@ from llama_cpp.llama_metrics import Metrics, MetricsExporter from llama_cpp._utils import ( - infer_service_from_prompt, get_cpu_usage, get_ram_usage, get_gpu_info_by_pid, From 7ea1675c1e1d20299a60b6fdd9e47f95fe8a21dc Mon Sep 17 00:00:00 2001 From: juanroesel Date: Tue, 14 May 2024 10:14:26 -0700 Subject: [PATCH 11/28] Changes value of 'ai_service' in the unit tests --- tests/test_llama.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_llama.py b/tests/test_llama.py index 92b1c0a9e8..7e5ab3efe4 100644 --- a/tests/test_llama.py +++ b/tests/test_llama.py @@ -153,7 +153,7 @@ def mock_kv_cache_seq_add( def test_llama_patch(mock_llama): n_ctx = 128 - ai_service = "testing" + ai_service = "label-suggestions" llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, n_ctx=n_ctx) n_vocab = llama_cpp.llama_n_vocab(llama._model.model) @@ -232,7 +232,7 @@ def test_utf8(mock_llama): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, logits_all=True) output_text = "😀" - ai_service = "testing" + ai_service = "label-suggestions" ## Test basic completion with utf8 multibyte mock_llama(llama, output_text) From bfefa26222a66dd4a3da34f9bd4862063e5afcae Mon Sep 17 00:00:00 2001 From: juanroesel Date: Wed, 15 May 2024 11:01:43 -0700 Subject: [PATCH 12/28] Added simple test to check health of metrics endpoint --- tests/test_llama.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/tests/test_llama.py b/tests/test_llama.py index 7e5ab3efe4..aca1745a86 100644 --- a/tests/test_llama.py +++ b/tests/test_llama.py @@ -153,7 +153,8 @@ def mock_kv_cache_seq_add( def test_llama_patch(mock_llama): n_ctx = 128 - ai_service = "label-suggestions" + ai_service_completion = "test-label-suggestions" + ai_service_streaming = "test-acceptance-criteria" llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, n_ctx=n_ctx) n_vocab = llama_cpp.llama_n_vocab(llama._model.model) @@ -165,32 +166,32 @@ def test_llama_patch(mock_llama): ## Test basic completion from bos until eos mock_llama(llama, all_text) - completion = llama.create_completion("", max_tokens=36, ai_service=ai_service) + completion = llama.create_completion("", max_tokens=36, ai_service=ai_service_completion) assert completion["choices"][0]["text"] == all_text assert completion["choices"][0]["finish_reason"] == "stop" ## Test basic completion until eos mock_llama(llama, all_text) - completion = llama.create_completion(text, max_tokens=20, ai_service=ai_service) + completion = llama.create_completion(text, max_tokens=20, ai_service=ai_service_completion) assert completion["choices"][0]["text"] == output_text assert completion["choices"][0]["finish_reason"] == "stop" ## Test streaming completion until eos mock_llama(llama, all_text) - chunks = list(llama.create_completion(text, max_tokens=20, stream=True, ai_service=ai_service)) + chunks = list(llama.create_completion(text, max_tokens=20, stream=True, ai_service=ai_service_streaming)) assert "".join(chunk["choices"][0]["text"] for chunk in chunks) == output_text assert chunks[-1]["choices"][0]["finish_reason"] == "stop" ## Test basic completion until stop sequence mock_llama(llama, all_text) - completion = llama.create_completion(text, max_tokens=20, stop=["lazy"], ai_service=ai_service) + completion = llama.create_completion(text, max_tokens=20, stop=["lazy"], ai_service=ai_service_completion) assert completion["choices"][0]["text"] == " jumps over the " assert completion["choices"][0]["finish_reason"] == "stop" ## Test streaming completion until stop sequence mock_llama(llama, all_text) chunks = list( - llama.create_completion(text, max_tokens=20, stream=True, stop=["lazy"], ai_service=ai_service) + llama.create_completion(text, max_tokens=20, stream=True, stop=["lazy"], ai_service=ai_service_streaming) ) assert ( "".join(chunk["choices"][0]["text"] for chunk in chunks) == " jumps over the " @@ -199,13 +200,13 @@ def test_llama_patch(mock_llama): ## Test basic completion until length mock_llama(llama, all_text) - completion = llama.create_completion(text, max_tokens=2, ai_service=ai_service) + completion = llama.create_completion(text, max_tokens=2, ai_service=ai_service_completion) assert completion["choices"][0]["text"] == " jumps" assert completion["choices"][0]["finish_reason"] == "length" ## Test streaming completion until length mock_llama(llama, all_text) - chunks = list(llama.create_completion(text, max_tokens=2, stream=True, ai_service=ai_service)) + chunks = list(llama.create_completion(text, max_tokens=2, stream=True, ai_service=ai_service_streaming)) assert "".join(chunk["choices"][0]["text"] for chunk in chunks) == " jumps" assert chunks[-1]["choices"][0]["finish_reason"] == "length" @@ -269,6 +270,22 @@ def test_llama_server(): } +def test_metrics_endpoint(): + from fastapi.testclient import TestClient + from llama_cpp.server.app import create_app, Settings + + settings = Settings( + model=MODEL, + vocab_only=True, + ) + app = create_app(settings) + client = TestClient(app) + response = client.get("/metrics") + assert response.status_code == 200 + assert "test-label-suggestions" in response.text + assert "test-acceptance-criteria" in response.text + + @pytest.mark.parametrize( "size_and_axis", [ From 67ae74a4ad17c7b2c9ae1ed6c32b3945bdd46f85 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Wed, 15 May 2024 11:02:15 -0700 Subject: [PATCH 13/28] Fixed bug of llama metrics not collecting during streaming --- llama_cpp/llama.py | 57 ++++++++++++++++++++++++++++++++++ llama_cpp/llama_chat_format.py | 2 ++ llama_cpp/llama_metrics.py | 4 +-- llama_cpp/server/app.py | 6 ++-- 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 00835256d3..e0a24ce855 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -1281,6 +1281,14 @@ def logit_bias_processor( token_end_position = 0 for token in remaining_tokens: + # Record TTFT metric (once) + if idx == 0: + _metrics_dict["time_to_first_token"] = time.time() - _ttft_start + # Record TPOT metric + else: + _tpot_metrics.append(time.time() - _tpot_start) + _tpot_start = time.time() # reset + token_end_position += len(self.detokenize([token], prev_tokens=prompt_tokens + completion_tokens[:returned_tokens])) logprobs_or_none: Optional[CompletionLogprobs] = None @@ -1374,6 +1382,53 @@ def logit_bias_processor( print("Llama._create_completion: cache save", file=sys.stderr) self.cache[prompt_tokens + completion_tokens] = self.save_state() print("Llama._create_completion: cache saved", file=sys.stderr) + + ## PROMETHEUS METRICS IN STREAMING MODE ## + # Record TTFT metric -- Setting to None if no tokens were generated + if not _metrics_dict.get("time_to_first_token"): + _metrics_dict["time_to_first_token"] = None + + # Record TPOT metrics (per generated token) + _metrics_dict["time_per_output_token"] = _tpot_metrics + + # Record metrics from the C++ backend (converted to seconds) + _timings = llama_cpp.llama_get_timings(self._ctx.ctx) + _metrics_dict["load_time"] = round(_timings.t_load_ms / 1e3, 2) + _metrics_dict["sample_time"] = round(_timings.t_sample_ms / 1e3, 2) + _metrics_dict["sample_throughput"] = round(1e3 / _timings.t_sample_ms * _timings.n_sample, 2) if _timings.t_sample_ms > 0 else 0.0 + _metrics_dict["prompt_eval_time"] = round(_timings.t_p_eval_ms / 1e3, 2) + _metrics_dict["prompt_eval_throughput"] = round(1e3 / _timings.t_p_eval_ms * _timings.n_p_eval, 2) if _timings.t_p_eval_ms > 0 else 0.0 + _metrics_dict["completion_eval_time"] = round(_timings.t_eval_ms / 1e3, 2) + _metrics_dict["completion_eval_throughput"] = round(1e3 / _timings.t_eval_ms * _timings.n_eval, 2) if _timings.t_eval_ms > 0 else 0.0 + _metrics_dict["end_to_end_latency"] = round((_timings.t_end_ms - _timings.t_start_ms) / 1e3, 2) + + # Record prefill and generation token metrics + _metrics_dict["prefill_tokens"] = len(prompt_tokens) + _metrics_dict["generation_tokens"] = len(completion_tokens) + + # Record system info + _gpu_utilization, _gpu_memory_used, _gpu_memory_free = get_gpu_general_info() + _metrics_dict["cpu_utilization"] = get_cpu_usage(_pid) # TODO: Returning always 0.0 -> check + _metrics_dict["cpu_ram_pid"] = get_ram_usage(_pid) + _metrics_dict["gpu_utilization"] = _gpu_utilization + _metrics_dict["gpu_ram_usage"] = _gpu_memory_used + _metrics_dict["gpu_ram_free"] = _gpu_memory_free + _metrics_dict["gpu_ram_pid"] = get_gpu_info_by_pid(_pid) + _metrics_dict["state_size"] = llama_cpp.llama_get_state_size(self._ctx.ctx) + _metrics_dict["kv_cache_usage_ratio"] = round(1. * llama_cpp.llama_get_kv_cache_used_cells(self._ctx.ctx) / self.n_ctx(), 2) + _metrics_dict["system_info"] = { + "model": model_name, + "n_params": str(llama_cpp.llama_model_n_params(self.model)), + "n_embd": str(self.n_embd()), + "n_ctx": str(self.n_ctx()), + "n_vocab": str(self.n_vocab()), + "n_threads": str(self.n_threads) + } + + # Log metrics to Prometheus + _all_metrics = Metrics(**_metrics_dict) + self.metrics.log_metrics(_all_metrics, labels=_labels) + return if self.cache: @@ -1448,6 +1503,8 @@ def logit_bias_processor( "token_logprobs": token_logprobs, "top_logprobs": top_logprobs, } + + ## PROMETHEUS METRICS IN CHAT COMPLETION MODE ## # Record TTFT metric -- Setting to None if no tokens were generated if not _metrics_dict.get("time_to_first_token"): _metrics_dict["time_to_first_token"] = None diff --git a/llama_cpp/llama_chat_format.py b/llama_cpp/llama_chat_format.py index f099715d3a..d5194bb91a 100644 --- a/llama_cpp/llama_chat_format.py +++ b/llama_cpp/llama_chat_format.py @@ -1718,6 +1718,7 @@ def functionary_v1_v2_chat_handler( model: Optional[str] = None, logits_processor: Optional[llama.LogitsProcessorList] = None, grammar: Optional[llama.LlamaGrammar] = None, + ai_service: Optional[str] = None, **kwargs, # type: ignore ) -> Union[llama_types.ChatCompletion, Iterator[llama_types.ChatCompletionChunk]]: SYSTEM_MESSAGE = """A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polite answers to the user's questions. The assistant calls functions with appropriate input when necessary""" @@ -1934,6 +1935,7 @@ def prepare_messages_for_inference( model=model, logits_processor=logits_processor, grammar=grammar, + ai_service=ai_service ) if stream is False: completion_or_completion_chunks["choices"][0]["text"] = completion_or_completion_chunks["choices"][0]["text"].lstrip() diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py index 46ef5ba1a7..105346fed6 100644 --- a/llama_cpp/llama_metrics.py +++ b/llama_cpp/llama_metrics.py @@ -45,7 +45,7 @@ class MetricsExporter: def __init__(self): self.labels = LABELS # One-time metrics - self._histrogram_load_time = Histogram( + self._histogram_load_time = Histogram( name="llama_cpp_python:load_t_seconds", documentation="Histogram of load time in seconds", labelnames=self.labels, @@ -194,7 +194,7 @@ def log_metrics(self, metrics: Metrics, labels: Dict[str, str]): """ Log the metrics using the Prometheus client. """ - self._histrogram_load_time.labels(**labels).observe(metrics.load_time) + self._histogram_load_time.labels(**labels).observe(metrics.load_time) self._histogram_sample_time.labels(**labels).observe(metrics.sample_time) if metrics.time_to_first_token: self._histogram_time_to_first_token.labels(**labels).observe(metrics.time_to_first_token) diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 7301204835..5d9abf22fb 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -455,8 +455,11 @@ async def create_chat_completion( "user", } kwargs = body.model_dump(exclude=exclude) + # Adds the ai_service value from the request body to the kwargs + # to be passed downstream to the llama_cpp.ChatCompletion object kwargs["ai_service"] = body.ai_service + llama = llama_proxy(body.model) if body.logit_bias is not None: kwargs["logit_bias"] = ( @@ -474,8 +477,7 @@ async def create_chat_completion( if isinstance(iterator_or_completion, Iterator): # EAFP: It's easier to ask for forgiveness than permission - # NOTE: Including kwargs so it can also pass the "ai_service" argument to the iterator - first_response = await run_in_threadpool(next, iterator_or_completion, **kwargs) + first_response = await run_in_threadpool(next, iterator_or_completion) # If no exception was raised from first_response, we can assume that # the iterator is valid and we can use it to stream the response. From 6fb10138ffddb7ba543571cda6d0fa028fd97e35 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Wed, 15 May 2024 12:18:16 -0700 Subject: [PATCH 14/28] Removed hard check on ai_service parameter --- llama_cpp/llama.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index e0a24ce855..126edbef65 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -975,8 +975,6 @@ def _create_completion( _ttft_start = time.time() _pid = os.getpid() _tpot_metrics = [] - if not ai_service: - raise ValueError("ai_service must be provided") _labels = { "service": ai_service if ai_service is not None else "not-specified", "request_type": "chat/completions", From d7806d73e71134b4774d4d05a3ea422563d90dd4 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 27 May 2024 18:46:14 -0700 Subject: [PATCH 15/28] Added mechanism to monitor task queue and log metric to prometheus --- llama_cpp/_utils.py | 63 +++++++- llama_cpp/llama.py | 15 +- llama_cpp/llama_metrics.py | 291 ++++++++++++++++++++++++++++++------- llama_cpp/server/app.py | 60 +++++++- llama_cpp/server/types.py | 6 +- 5 files changed, 365 insertions(+), 70 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index db5b6eb684..a4b4bbd515 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -1,6 +1,7 @@ import os import sys import psutil +import asyncio import subprocess from typing import Any, Dict, List, Tuple, Union @@ -12,6 +13,7 @@ STDOUT_FILENO = 1 STDERR_FILENO = 2 + class suppress_stdout_stderr(object): # NOTE: these must be "saved" here to avoid exceptions when using # this context manager inside of a __del__ method @@ -88,6 +90,7 @@ def get_cpu_usage(pid) -> float: process = psutil.Process(pid) return process.cpu_percent() + def get_ram_usage(pid) -> float: """ RAM usage in MiB by the current process. @@ -97,12 +100,19 @@ def get_ram_usage(pid) -> float: ram_usage = ram_info.rss / (1024 * 1024) # Convert to MiB return ram_usage + def get_gpu_info_by_pid(pid) -> float: """ GPU memory usage by the current process (if GPU is available) """ try: - gpu_info = subprocess.check_output(["nvidia-smi", "--query-compute-apps=pid,used_memory", "--format=csv,noheader"]).decode("utf-8") + gpu_info = subprocess.check_output( + [ + "nvidia-smi", + "--query-compute-apps=pid,used_memory", + "--format=csv,noheader", + ] + ).decode("utf-8") gpu_info = gpu_info.strip().split("\n") for info in gpu_info: gpu_pid, gpu_ram_usage = info.split(", ") @@ -112,14 +122,59 @@ def get_gpu_info_by_pid(pid) -> float: pass return 0.0 + def get_gpu_general_info() -> Tuple[float, float, float]: """ GPU general info (if GPU is available) """ try: - gpu_info = subprocess.check_output(["nvidia-smi", "--query-gpu=utilization.gpu,memory.used,memory.free", "--format=csv,noheader"]).decode("utf-8") - gpu_utilization, gpu_memory_used, gpu_memory_free = gpu_info.strip().split("\n")[0].split(", ") - return tuple(float(tup.split()[0]) for tup in [gpu_utilization, gpu_memory_used, gpu_memory_free]) + gpu_info = subprocess.check_output( + [ + "nvidia-smi", + "--query-gpu=utilization.gpu,memory.used,memory.free", + "--format=csv,noheader", + ] + ).decode("utf-8") + gpu_utilization, gpu_memory_used, gpu_memory_free = ( + gpu_info.strip().split("\n")[0].split(", ") + ) + return tuple( + float(tup.split()[0]) + for tup in [gpu_utilization, gpu_memory_used, gpu_memory_free] + ) except (subprocess.CalledProcessError, FileNotFoundError): pass return 0.0, 0.0, 0.0 + + +async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): + """ + An asynchronous function that monitors the task queue and updates + a shared status dictionary with the number of tasks that have not + started and the number of tasks that are currently running. + It recursively calls itself to continuously monitor the task queue. + NOTE: There will always be 4 tasks running in the task queue: + - LifespanOn.main: Main application coroutine + - Server.serve: Server coroutine + - monitor_task_queue: Task queue monitoring coroutine + - RequestReponseCycle.run_asgi: ASGI single cycle coroutine + Any upcoming requests will be added to the task queue in the form of + another RequestReponseCycle.run_asgi coroutine. + """ + all_tasks = asyncio.all_tasks() + + # Get count of all running tasks + _all_tasks = [task for task in all_tasks if task._state == "PENDING"] + status_dict["running_tasks_count"] = len(_all_tasks) + # Get basic metadata of all running tasks + status_dict["running_tasks"] = { + task.get_name(): str(task.get_coro()) + .encode("ascii", errors="ignore") + .strip() + .decode("ascii") + for task in all_tasks + } + + asyncio.create_task( + monitor_task_queue(status_dict) + ) # pass status_dict to the next task diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 86ea829117..fa01a6d217 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -38,13 +38,13 @@ import llama_cpp.llama_cpp as llama_cpp import llama_cpp.llama_chat_format as llama_chat_format -from llama_cpp.llama_metrics import Metrics, MetricsExporter +from llama_cpp.llama_metrics import RequestMetrics, MetricsExporter from llama_cpp._utils import ( get_cpu_usage, get_ram_usage, get_gpu_info_by_pid, - get_gpu_general_info, + get_gpu_general_info ) from llama_cpp.llama_speculative import LlamaDraftModel @@ -938,7 +938,7 @@ def decode_batch(seq_sizes: List[int]): return output, total_tokens else: return output - + def _create_completion( self, prompt: Union[str, List[int]], @@ -972,7 +972,6 @@ def _create_completion( ]: assert self._ctx is not None assert suffix is None or suffix.__class__ is str - # Variables required for metric collection _metrics_dict = {} _ttft_start = time.time() @@ -1464,8 +1463,8 @@ def logit_bias_processor( } # Log metrics to Prometheus - _all_metrics = Metrics(**_metrics_dict) - self.metrics.log_metrics(_all_metrics, labels=_labels) + _all_metrics = RequestMetrics(**_metrics_dict) + self.metrics.log_request_metrics(_all_metrics, labels=_labels) return @@ -1585,8 +1584,8 @@ def logit_bias_processor( } # Log metrics to Prometheus - _all_metrics = Metrics(**_metrics_dict) - self.metrics.log_metrics(_all_metrics, labels=_labels) + _all_metrics = RequestMetrics(**_metrics_dict) + self.metrics.log_request_metrics(_all_metrics, labels=_labels) yield { "id": completion_id, diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py index 105346fed6..0f9b34a42e 100644 --- a/llama_cpp/llama_metrics.py +++ b/llama_cpp/llama_metrics.py @@ -6,11 +6,13 @@ LABELS = ["request_type", "service"] + @dataclass -class Metrics: +class RequestMetrics: """ - A dataclass to store metrics for a request. + A dataclass to store metrics for a given request. """ + # System metrics system_info: Dict[str, Any] state_size: int @@ -33,15 +35,26 @@ class Metrics: completion_eval_throughput: float end_to_end_latency: float prefill_tokens: int - generation_tokens: int + generation_tokens: int kv_cache_usage_ratio: float +@dataclass +class QueueMetrics: + """ + A dataclass to store metrics for the task queue. + """ + + running_tasks_count: int + running_tasks: dict + + class MetricsExporter: """ A custom Prometheus Metrics Explorer for the LLAMA C++ backend. Collects metrics per request sent to the backend. """ + def __init__(self): self.labels = LABELS # One-time metrics @@ -50,9 +63,26 @@ def __init__(self): documentation="Histogram of load time in seconds", labelnames=self.labels, buckets=[ - 0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, - 8.0, 9.0, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0 - ] + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.0, + 3.0, + 4.0, + 5.0, + 6.0, + 7.0, + 8.0, + 9.0, + 10.0, + 12.5, + 15.0, + 20.0, + 25.0, + 30.0, + ], ) # Request-level latencies self._histogram_sample_time = Histogram( @@ -60,54 +90,151 @@ def __init__(self): documentation="Histogram of token sampling time in seconds", labelnames=self.labels, buckets=[ - 0.00001, 0.00005, 0.0001, 0.00025, 0.0005, 0.001, 0.0025, - 0.005, 0.0075, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, - ] + 0.00001, + 0.00005, + 0.0001, + 0.00025, + 0.0005, + 0.001, + 0.0025, + 0.005, + 0.0075, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + ], ) self._histogram_time_to_first_token = Histogram( name="llama_cpp_python:ttft_seconds", documentation="Histogram of time to first token in seconds", labelnames=self.labels, buckets=[ - 0.001, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.25, 0.5, - 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0 - ] + 0.001, + 0.005, + 0.01, + 0.02, + 0.04, + 0.06, + 0.08, + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 12.5, + 15.0, + 20.0, + 25.0, + 30.0, + ], ) self._histogram_time_per_output_token = Histogram( name="llama_cpp_python:tpot_seconds", documentation="Histogram of time per output token in seconds", labelnames=self.labels, buckets=[ - 0.001, 0.005, 0.01, 0.02, 0.04, 0.06, 0.08, 0.1, 0.25, 0.5, - 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, 20.0, 25.0, 30.0 - ] + 0.001, + 0.005, + 0.01, + 0.02, + 0.04, + 0.06, + 0.08, + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 12.5, + 15.0, + 20.0, + 25.0, + 30.0, + ], ) self._histogram_prompt_eval_time = Histogram( name="llama_cpp_python:p_eval_t_seconds", documentation="Histogram of prompt evaluation time in seconds", labelnames=self.labels, buckets=[ - 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, - 20.0, 25.0, 30.0, 40.0, 50.0, 60.0 - ] + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 12.5, + 15.0, + 20.0, + 25.0, + 30.0, + 40.0, + 50.0, + 60.0, + ], ) self._histogram_completion_eval_time = Histogram( name="llama_cpp_python:c_eval_t_seconds", documentation="Histogram of completion evaluation time in seconds", labelnames=self.labels, buckets=[ - 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, - 20.0, 25.0, 30.0, 40.0, 50.0, 60.0 - ] + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 12.5, + 15.0, + 20.0, + 25.0, + 30.0, + 40.0, + 50.0, + 60.0, + ], ) self._histogram_e2e_request_latency = Histogram( name="llama_cpp_python:e2e_seconds", documentation="Histogram of end-to-end request latency in seconds", labelnames=self.labels, buckets=[ - 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, 12.5, 15.0, - 20.0, 25.0, 30.0, 40.0, 50.0, 60.0 - ] + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 12.5, + 15.0, + 20.0, + 25.0, + 30.0, + 40.0, + 50.0, + 60.0, + ], ) # Prefill and generation tokens self._histogram_prefill_tokens = Histogram( @@ -115,98 +242,146 @@ def __init__(self): documentation="Histogram of number of prefill tokens processed", labelnames=self.labels, buckets=[ - 1, 10, 25, 50, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, - 3500, 4000, 4500, 5000 - ] + 1, + 10, + 25, + 50, + 100, + 250, + 500, + 750, + 1000, + 1500, + 2000, + 2500, + 3000, + 3500, + 4000, + 4500, + 5000, + ], ) self._histogram_generation_tokens = Histogram( name="llama_cpp_python:completion_tokens_total", documentation="Histogram of number of generation tokens processed", labelnames=self.labels, buckets=[ - 1, 10, 25, 50, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, - 3500, 4000, 4500, 5000 - ] + 1, + 10, + 25, + 50, + 100, + 250, + 500, + 750, + 1000, + 1500, + 2000, + 2500, + 3000, + 3500, + 4000, + 4500, + 5000, + ], ) # Current throughput self._gauge_prompt_eval_throughput = Gauge( name="llama_cpp_python:prompt_eval_throughput", documentation="Current throughput of the prompt evaluation process (in tokens/second)", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_completion_eval_throughput = Gauge( name="llama_cpp_python:completion_eval_throughput", documentation="Current throughput of the completion evaluation process (in tokens/second)", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_sample_throughput = Gauge( name="llama_cpp_python:sample_throughput", documentation="Current throughput of the token sampling process (in tokens/second)", - labelnames=self.labels + labelnames=self.labels, ) # System info self._gauge_state_size = Gauge( name="llama_cpp_python:state_size", documentation="Current state size in bytes of various components such as rng (random number generator), logits, embedding, and kv_cache (key-value cache)", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_cpu_utilization = Gauge( name="llama_cpp_python:cpu_utilization", documentation="Current CPU utilization", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_cpu_ram_usage_by_pid = Gauge( name="llama_cpp_python:cpu_memory_usage_by_pid", documentation="Current CPU memory usage during the request", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_gpu_utilization = Gauge( name="llama_cpp_python:gpu_utilization", documentation="Current GPU utilization", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_gpu_memory_usage = Gauge( name="llama_cpp_python:gpu_memory_usage", documentation="Current GPU memory usage", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_gpu_memory_free = Gauge( name="llama_cpp_python:gpu_memory_free", documentation="Current free GPU memory", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_gpu_memory_usage_by_pid = Gauge( name="llama_cpp_python:gpu_memory_usage_by_pid", documentation="Current GPU memory usage during the request", - labelnames=self.labels + labelnames=self.labels, ) self._gauge_kv_cache_usage_ratio = Gauge( name="llama_cpp_python:kv_cache_usage_ratio", documentation="KV-cache usage. 1 means 100 percent usage", - labelnames=self.labels + labelnames=self.labels, ) - self._info = Info( - name="llama_cpp_python:info", - documentation="Server metadata" + self._gauge_running_tasks = Gauge( + name="llama_cpp_python:running_tasks", + documentation="Number of running tasks in the task queue", + labelnames=self.labels, ) - def log_metrics(self, metrics: Metrics, labels: Dict[str, str]): + # Server metadata + self._info = Info(name="llama_cpp_python:info", documentation="Server metadata") + + def log_request_metrics(self, metrics: RequestMetrics, labels: Dict[str, str]): """ Log the metrics using the Prometheus client. """ self._histogram_load_time.labels(**labels).observe(metrics.load_time) self._histogram_sample_time.labels(**labels).observe(metrics.sample_time) if metrics.time_to_first_token: - self._histogram_time_to_first_token.labels(**labels).observe(metrics.time_to_first_token) + self._histogram_time_to_first_token.labels(**labels).observe( + metrics.time_to_first_token + ) for _tpot in metrics.time_per_output_token: self._histogram_time_per_output_token.labels(**labels).observe(_tpot) - self._histogram_prompt_eval_time.labels(**labels).observe(metrics.prompt_eval_time) - self._histogram_completion_eval_time.labels(**labels).observe(metrics.completion_eval_time) - self._histogram_e2e_request_latency.labels(**labels).observe(metrics.end_to_end_latency) + self._histogram_prompt_eval_time.labels(**labels).observe( + metrics.prompt_eval_time + ) + self._histogram_completion_eval_time.labels(**labels).observe( + metrics.completion_eval_time + ) + self._histogram_e2e_request_latency.labels(**labels).observe( + metrics.end_to_end_latency + ) self._histogram_prefill_tokens.labels(**labels).observe(metrics.prefill_tokens) - self._histogram_generation_tokens.labels(**labels).observe(metrics.generation_tokens) - self._gauge_prompt_eval_throughput.labels(**labels).set(metrics.prompt_eval_throughput) - self._gauge_completion_eval_throughput.labels(**labels).set(metrics.completion_eval_throughput) + self._histogram_generation_tokens.labels(**labels).observe( + metrics.generation_tokens + ) + self._gauge_prompt_eval_throughput.labels(**labels).set( + metrics.prompt_eval_throughput + ) + self._gauge_completion_eval_throughput.labels(**labels).set( + metrics.completion_eval_throughput + ) self._gauge_sample_throughput.labels(**labels).set(metrics.sample_throughput) self._gauge_cpu_utilization.labels(**labels).set(metrics.cpu_utilization) self._gauge_cpu_ram_usage_by_pid.labels(**labels).set(metrics.cpu_ram_pid) @@ -215,5 +390,13 @@ def log_metrics(self, metrics: Metrics, labels: Dict[str, str]): self._gauge_gpu_memory_free.labels(**labels).set(metrics.gpu_ram_free) self._gauge_gpu_memory_usage_by_pid.labels(**labels).set(metrics.gpu_ram_pid) self._gauge_state_size.labels(**labels).set(metrics.state_size) - self._gauge_kv_cache_usage_ratio.labels(**labels).set(metrics.kv_cache_usage_ratio) - self._info.info(metrics.system_info) \ No newline at end of file + self._gauge_kv_cache_usage_ratio.labels(**labels).set( + metrics.kv_cache_usage_ratio + ) + self._info.info(metrics.system_info) + + def log_queue_metrics(self, metrics: QueueMetrics, labels: Dict[str, str]): + """ + Log the metrics for the task queue. + """ + self._gauge_running_tasks.labels(**labels).set(metrics.running_tasks_count) diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index d11d6c8e39..30601b8de9 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -1,5 +1,7 @@ from __future__ import annotations +from contextlib import asynccontextmanager + import os import json @@ -15,6 +17,7 @@ from anyio.streams.memory import MemoryObjectSendStream from starlette.concurrency import run_in_threadpool, iterate_in_threadpool from fastapi import Depends, FastAPI, APIRouter, Request, HTTPException, status, Body +from fastapi.responses import JSONResponse from fastapi.middleware import Middleware from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer @@ -41,8 +44,11 @@ TokenizeInputCountResponse, DetokenizeInputRequest, DetokenizeInputResponse, + HealthMetrics ) from llama_cpp.server.errors import RouteErrorHandler +from llama_cpp._utils import monitor_task_queue +from llama_cpp.llama_metrics import QueueMetrics router = APIRouter(route_class=RouteErrorHandler) @@ -96,6 +102,17 @@ def set_ping_message_factory(factory): _ping_message_factory = factory +task_queue_status = {} + +@asynccontextmanager +async def lifespan(app: FastAPI): + """ + A context manager that launches tasks to be run during the application's lifespan. + """ + await monitor_task_queue(task_queue_status) + yield + + def create_app( settings: Settings | None = None, server_settings: ServerSettings | None = None, @@ -135,6 +152,7 @@ def create_app( title="🦙 llama.cpp Python API", version=llama_cpp.__version__, root_path=server_settings.root_path, + lifespan=lifespan ) app.add_middleware( CORSMiddleware, @@ -223,6 +241,34 @@ async def authenticate( openai_v1_tag = "OpenAI V1" +@router.get( + "/v1/health", + response_model=HealthMetrics, + summary="Server's health", +) +async def check_health( + +): + # 4 running tasks + new scheduled request + if 0 <= task_queue_status.get("running_tasks_count", 0) <= 5: + return JSONResponse( + content={"status": "OK", "task_queue_status": task_queue_status} + ) + # 1 - 6 scheduled requests + elif 5 < task_queue_status.get("running_tasks_count", 0) <= 10: + return JSONResponse( + content={"status": "Warning", "task_queue_status": task_queue_status} + ) + # 7+ scheduled requests + # TODO: Evaluate if in this case we should manually stop the execution of certain tasks to clear the queue + elif task_queue_status.get("running_tasks_count", 0) > 10: + return JSONResponse( + content={"status": "Critical", "task_queue_status": task_queue_status} + ) + else: + pass + + @router.post( "/v1/completions", summary="Completion", @@ -276,7 +322,6 @@ async def create_completion( if request.url.path != "/v1/engines/copilot-codex/completions" else "copilot-codex" ) - exclude = { "n", "best_of", @@ -351,7 +396,6 @@ async def create_embedding( **request.model_dump(exclude={"user"}), ) - @router.post( "/v1/chat/completions", summary="Chat", @@ -459,12 +503,14 @@ async def create_chat_completion( ), llama_proxy: LlamaProxy = Depends(get_llama_proxy), ) -> llama_cpp.ChatCompletion: + # Extract relevant kwargs from the request body exclude = { "n", "logit_bias_type", "user", "min_tokens", } + kwargs = body.model_dump(exclude=exclude) # Adds the ai_service value from the request body to the kwargs @@ -479,6 +525,14 @@ async def create_chat_completion( else body.logit_bias ) + # Register current running tasks as a Prometheus metric + _labels = { + "service": "general", + "request_type": "chat/completions", + } + _queue_metrics = QueueMetrics(**task_queue_status) + llama.metrics.log_queue_metrics(_queue_metrics, _labels) + if body.grammar is not None: kwargs["grammar"] = llama_cpp.LlamaGrammar.from_string(body.grammar) @@ -494,7 +548,7 @@ async def create_chat_completion( iterator_or_completion: Union[ llama_cpp.ChatCompletion, Iterator[llama_cpp.ChatCompletionChunk] ] = await run_in_threadpool(llama.create_chat_completion, **kwargs) - + if isinstance(iterator_or_completion, Iterator): # EAFP: It's easier to ask for forgiveness than permission first_response = await run_in_threadpool(next, iterator_or_completion) diff --git a/llama_cpp/server/types.py b/llama_cpp/server/types.py index 1a0f4bcca5..9c32fe568e 100644 --- a/llama_cpp/server/types.py +++ b/llama_cpp/server/types.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import List, Optional, Union, Dict +from typing import List, Optional, Union, Dict, Any from typing_extensions import TypedDict, Literal from pydantic import BaseModel, Field @@ -317,3 +317,7 @@ class DetokenizeInputResponse(BaseModel): model_config = { "json_schema_extra": {"example": {"text": "How many tokens in this query?"}} } + +class HealthMetrics(BaseModel): + model_config = {"arbitrary_types_allowed": True} + task_queue_status: Dict[str, Any] \ No newline at end of file From c01fe01a3fefee6593a600223446b9c5077ce9ec Mon Sep 17 00:00:00 2001 From: juanroesel Date: Thu, 13 Jun 2024 16:08:00 -0700 Subject: [PATCH 16/28] Added instructions to sync with upstream repo --- INSTRUCTIONS.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 INSTRUCTIONS.md diff --git a/INSTRUCTIONS.md b/INSTRUCTIONS.md new file mode 100644 index 0000000000..a562384cfb --- /dev/null +++ b/INSTRUCTIONS.md @@ -0,0 +1,29 @@ +## Syncing with upstream repo + +See [here](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork#syncing-a-fork-branch-from-the-web-ui) for more details. + +1. On the GitHub UI, create a new branch `repo-sync`, if the branch doesn't exist already. + +2. Click on the "Sync fork" button and then click on the "Update branch" button. This will import all the commits from the upstream repo. + +3. Create a local branch `repo-sync` and pull the contents from the remote `repo-sync` branch. + +4. Solve for any conflicts if they arise. Otherwise, proceed to the next step. + +5. Since changes have probably been made to the vendor libraries (`llama_cpp`, `kompute`), we need to recompile the `llama_cpp` package. Navigate to the `vendor/llama.cpp` folder and clean the build cache: + +``` +make clean +``` +6. Navigate back to the root directory and type the following to recompile the `llama_cpp` package and build the dependenies again: + +``` +make deps && make build +``` +7. Launch the `llama_cpp_python` server using the following command: +``` +python -m llama_cpp.server --model $MODEL --n_gpu_layers -1 +``` +NOTE: Modify the launch arguments as needed. Make sure the `MODEL` environment variable points to an absolute path containing a `.gguf` model. + +8. If the server launches without issues, then you can proceed to create a PR with the latest changes \ No newline at end of file From 61e20378a184a3e60ef96d547df8dd136ff7d03f Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 13:11:58 -0700 Subject: [PATCH 17/28] Adds timer to avoid saturating CPU usage --- llama_cpp/_utils.py | 5 +++-- llama_cpp/server/app.py | 14 ++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index a4b4bbd515..2a10f861a6 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -153,11 +153,10 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): a shared status dictionary with the number of tasks that have not started and the number of tasks that are currently running. It recursively calls itself to continuously monitor the task queue. - NOTE: There will always be 4 tasks running in the task queue: + NOTE: There will always be 3 tasks running in the task queue: - LifespanOn.main: Main application coroutine - Server.serve: Server coroutine - monitor_task_queue: Task queue monitoring coroutine - - RequestReponseCycle.run_asgi: ASGI single cycle coroutine Any upcoming requests will be added to the task queue in the form of another RequestReponseCycle.run_asgi coroutine. """ @@ -175,6 +174,8 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): for task in all_tasks } + await asyncio.sleep(5) # adds a delay of 5 seconds to avoid overloading the CPU + asyncio.create_task( monitor_task_queue(status_dict) ) # pass status_dict to the next task diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 30601b8de9..43f36dd8e7 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -246,22 +246,20 @@ async def authenticate( response_model=HealthMetrics, summary="Server's health", ) -async def check_health( - -): - # 4 running tasks + new scheduled request - if 0 <= task_queue_status.get("running_tasks_count", 0) <= 5: +async def check_health(): + # 3 running tasks + new scheduled request + if 0 <= task_queue_status.get("running_tasks_count", 0) <= 4: return JSONResponse( content={"status": "OK", "task_queue_status": task_queue_status} ) - # 1 - 6 scheduled requests - elif 5 < task_queue_status.get("running_tasks_count", 0) <= 10: + # 2 - 6 scheduled requests + elif 4 < task_queue_status.get("running_tasks_count", 0) < 10: return JSONResponse( content={"status": "Warning", "task_queue_status": task_queue_status} ) # 7+ scheduled requests # TODO: Evaluate if in this case we should manually stop the execution of certain tasks to clear the queue - elif task_queue_status.get("running_tasks_count", 0) > 10: + elif task_queue_status.get("running_tasks_count", 0) >= 10: return JSONResponse( content={"status": "Critical", "task_queue_status": task_queue_status} ) From aab46dac238824a29d1697c8060b97387b1ed614 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:11:57 -0700 Subject: [PATCH 18/28] Converts running_tasks into Histogram --- llama_cpp/llama_metrics.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py index 0f9b34a42e..a7ffc7a094 100644 --- a/llama_cpp/llama_metrics.py +++ b/llama_cpp/llama_metrics.py @@ -342,10 +342,33 @@ def __init__(self): documentation="KV-cache usage. 1 means 100 percent usage", labelnames=self.labels, ) - self._gauge_running_tasks = Gauge( + self._gauge_running_tasks = Histogram( name="llama_cpp_python:running_tasks", documentation="Number of running tasks in the task queue", labelnames=self.labels, + buckets=[ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 14, + 16, + 18, + 20, + 25, + 30, + 35, + 40, + 45, + 50, + ], ) # Server metadata @@ -399,4 +422,4 @@ def log_queue_metrics(self, metrics: QueueMetrics, labels: Dict[str, str]): """ Log the metrics for the task queue. """ - self._gauge_running_tasks.labels(**labels).set(metrics.running_tasks_count) + self._gauge_running_tasks.labels(**labels).observe(metrics.running_tasks_count) From afd3472806d4b605d64d2f3b188718c6ed18bd6f Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:13:45 -0700 Subject: [PATCH 19/28] Decouples MetricsExporter instance from Llama object --- llama_cpp/_utils.py | 27 ++++++++++++++++++++++----- llama_cpp/llama.py | 8 ++------ llama_cpp/server/app.py | 29 ++++++++++++++++++----------- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index 2a10f861a6..14d0542fcc 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -6,6 +6,9 @@ from typing import Any, Dict, List, Tuple, Union +from llama_cpp.llama_metrics import QueueMetrics, MetricsExporter + + # Avoid "LookupError: unknown encoding: ascii" when open() called in a destructor outnull_file = open(os.devnull, "w") errnull_file = open(os.devnull, "w") @@ -147,7 +150,9 @@ def get_gpu_general_info() -> Tuple[float, float, float]: return 0.0, 0.0, 0.0 -async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): +async def monitor_task_queue( + status_dict: Dict[str, Union[int, float]], metrics_exporter: MetricsExporter +): """ An asynchronous function that monitors the task queue and updates a shared status dictionary with the number of tasks that have not @@ -160,6 +165,9 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): Any upcoming requests will be added to the task queue in the form of another RequestReponseCycle.run_asgi coroutine. """ + if not isinstance(metrics_exporter, MetricsExporter): + raise ValueError("metrics_exporter must be an instance of MetricsExporter") + all_tasks = asyncio.all_tasks() # Get count of all running tasks @@ -168,14 +176,23 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): # Get basic metadata of all running tasks status_dict["running_tasks"] = { task.get_name(): str(task.get_coro()) - .encode("ascii", errors="ignore") - .strip() - .decode("ascii") + .lstrip("\u003C") + .rstrip("\u003E") for task in all_tasks } + assert status_dict is not None + + # Register current running tasks as a Prometheus metric + _labels = { + "service": "general", + "request_type": "health_check", + } + _queue_metrics = QueueMetrics(**status_dict) + metrics_exporter.log_queue_metrics(_queue_metrics, _labels) + await asyncio.sleep(5) # adds a delay of 5 seconds to avoid overloading the CPU asyncio.create_task( - monitor_task_queue(status_dict) + monitor_task_queue(status_dict, metrics_exporter) ) # pass status_dict to the next task diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 2d908d9af4..ccb2adab58 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -43,7 +43,7 @@ import llama_cpp.llama_cpp as llama_cpp import llama_cpp.llama_chat_format as llama_chat_format -from llama_cpp.llama_metrics import RequestMetrics, MetricsExporter +from llama_cpp.llama_metrics import RequestMetrics from llama_cpp._utils import ( get_cpu_usage, @@ -74,7 +74,6 @@ class Llama: """High-level Python wrapper for a llama.cpp model.""" __backend_initialized = False - __prometheus_metrics = MetricsExporter() def __init__( self, @@ -488,10 +487,7 @@ def __init__( self.chat_format = "llama-2" if self.verbose: print(f"Using fallback chat format: {self.chat_format}", file=sys.stderr) - - # Prometheus metrics - self.metrics = self.__prometheus_metrics - + @property def ctx(self) -> llama_cpp.llama_context_p: assert self._ctx.ctx is not None diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 43f36dd8e7..6af7620312 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -48,7 +48,7 @@ ) from llama_cpp.server.errors import RouteErrorHandler from llama_cpp._utils import monitor_task_queue -from llama_cpp.llama_metrics import QueueMetrics +from llama_cpp.llama_metrics import MetricsExporter router = APIRouter(route_class=RouteErrorHandler) @@ -102,14 +102,26 @@ def set_ping_message_factory(factory): _ping_message_factory = factory +def set_metrics_exporter(): + global metrics_exporter + try: + metrics_exporter + except NameError: + metrics_exporter = MetricsExporter() + + return metrics_exporter + task_queue_status = {} + @asynccontextmanager async def lifespan(app: FastAPI): """ A context manager that launches tasks to be run during the application's lifespan. """ - await monitor_task_queue(task_queue_status) + metrics_exporter = set_metrics_exporter() + + await monitor_task_queue(task_queue_status, metrics_exporter) yield @@ -514,7 +526,7 @@ async def create_chat_completion( # Adds the ai_service value from the request body to the kwargs # to be passed downstream to the llama_cpp.ChatCompletion object kwargs["ai_service"] = body.ai_service - + llama = llama_proxy(body.model) if body.logit_bias is not None: kwargs["logit_bias"] = ( @@ -523,14 +535,6 @@ async def create_chat_completion( else body.logit_bias ) - # Register current running tasks as a Prometheus metric - _labels = { - "service": "general", - "request_type": "chat/completions", - } - _queue_metrics = QueueMetrics(**task_queue_status) - llama.metrics.log_queue_metrics(_queue_metrics, _labels) - if body.grammar is not None: kwargs["grammar"] = llama_cpp.LlamaGrammar.from_string(body.grammar) @@ -543,6 +547,9 @@ async def create_chat_completion( else: kwargs["logits_processor"].extend(_min_tokens_logits_processor) + # Set the metrics exporter for the llama object + llama.metrics = set_metrics_exporter() + iterator_or_completion: Union[ llama_cpp.ChatCompletion, Iterator[llama_cpp.ChatCompletionChunk] ] = await run_in_threadpool(llama.create_chat_completion, **kwargs) From 89a96e74ee51d4589e89a03c04d0f321791805c4 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:34:13 -0700 Subject: [PATCH 20/28] Updated unit tests with changes to MetricsExporter init --- tests/test_llama.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/test_llama.py b/tests/test_llama.py index aca1745a86..49150c9f2b 100644 --- a/tests/test_llama.py +++ b/tests/test_llama.py @@ -5,9 +5,18 @@ from scipy.special import log_softmax import llama_cpp +from llama_cpp.llama_metrics import MetricsExporter MODEL = "./vendor/llama.cpp/models/ggml-vocab-llama-spm.gguf" +def set_test_metrics_exporter(): + global metrics_exporter + try: + metrics_exporter + except NameError: + metrics_exporter = MetricsExporter() + + return metrics_exporter def test_llama_cpp_tokenization(): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, verbose=False) @@ -156,7 +165,8 @@ def test_llama_patch(mock_llama): ai_service_completion = "test-label-suggestions" ai_service_streaming = "test-acceptance-criteria" llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, n_ctx=n_ctx) - + llama.metrics = set_test_metrics_exporter() + n_vocab = llama_cpp.llama_n_vocab(llama._model.model) assert n_vocab == 32000 @@ -231,6 +241,7 @@ def test_llama_pickle(): def test_utf8(mock_llama): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, logits_all=True) + llama.metrics = set_test_metrics_exporter() output_text = "😀" ai_service = "label-suggestions" From 9c0820d279dc89d716b41b88293f4e328fb0c606 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 13:11:58 -0700 Subject: [PATCH 21/28] Adds timer to avoid saturating CPU usage --- llama_cpp/_utils.py | 5 +++-- llama_cpp/server/app.py | 14 ++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index a4b4bbd515..2a10f861a6 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -153,11 +153,10 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): a shared status dictionary with the number of tasks that have not started and the number of tasks that are currently running. It recursively calls itself to continuously monitor the task queue. - NOTE: There will always be 4 tasks running in the task queue: + NOTE: There will always be 3 tasks running in the task queue: - LifespanOn.main: Main application coroutine - Server.serve: Server coroutine - monitor_task_queue: Task queue monitoring coroutine - - RequestReponseCycle.run_asgi: ASGI single cycle coroutine Any upcoming requests will be added to the task queue in the form of another RequestReponseCycle.run_asgi coroutine. """ @@ -175,6 +174,8 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): for task in all_tasks } + await asyncio.sleep(5) # adds a delay of 5 seconds to avoid overloading the CPU + asyncio.create_task( monitor_task_queue(status_dict) ) # pass status_dict to the next task diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 30601b8de9..43f36dd8e7 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -246,22 +246,20 @@ async def authenticate( response_model=HealthMetrics, summary="Server's health", ) -async def check_health( - -): - # 4 running tasks + new scheduled request - if 0 <= task_queue_status.get("running_tasks_count", 0) <= 5: +async def check_health(): + # 3 running tasks + new scheduled request + if 0 <= task_queue_status.get("running_tasks_count", 0) <= 4: return JSONResponse( content={"status": "OK", "task_queue_status": task_queue_status} ) - # 1 - 6 scheduled requests - elif 5 < task_queue_status.get("running_tasks_count", 0) <= 10: + # 2 - 6 scheduled requests + elif 4 < task_queue_status.get("running_tasks_count", 0) < 10: return JSONResponse( content={"status": "Warning", "task_queue_status": task_queue_status} ) # 7+ scheduled requests # TODO: Evaluate if in this case we should manually stop the execution of certain tasks to clear the queue - elif task_queue_status.get("running_tasks_count", 0) > 10: + elif task_queue_status.get("running_tasks_count", 0) >= 10: return JSONResponse( content={"status": "Critical", "task_queue_status": task_queue_status} ) From 138665f3daf6d165229727003cb20c50e01c1115 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:11:57 -0700 Subject: [PATCH 22/28] Converts running_tasks into Histogram --- llama_cpp/llama_metrics.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py index 0f9b34a42e..a7ffc7a094 100644 --- a/llama_cpp/llama_metrics.py +++ b/llama_cpp/llama_metrics.py @@ -342,10 +342,33 @@ def __init__(self): documentation="KV-cache usage. 1 means 100 percent usage", labelnames=self.labels, ) - self._gauge_running_tasks = Gauge( + self._gauge_running_tasks = Histogram( name="llama_cpp_python:running_tasks", documentation="Number of running tasks in the task queue", labelnames=self.labels, + buckets=[ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 14, + 16, + 18, + 20, + 25, + 30, + 35, + 40, + 45, + 50, + ], ) # Server metadata @@ -399,4 +422,4 @@ def log_queue_metrics(self, metrics: QueueMetrics, labels: Dict[str, str]): """ Log the metrics for the task queue. """ - self._gauge_running_tasks.labels(**labels).set(metrics.running_tasks_count) + self._gauge_running_tasks.labels(**labels).observe(metrics.running_tasks_count) From 62c9ac089fc7aaff7b6f220d9c1f7df201a684d0 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:13:45 -0700 Subject: [PATCH 23/28] Decouples MetricsExporter instance from Llama object --- llama_cpp/_utils.py | 27 ++++++++++++++++++++++----- llama_cpp/llama.py | 8 ++------ llama_cpp/server/app.py | 29 ++++++++++++++++++----------- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index 2a10f861a6..14d0542fcc 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -6,6 +6,9 @@ from typing import Any, Dict, List, Tuple, Union +from llama_cpp.llama_metrics import QueueMetrics, MetricsExporter + + # Avoid "LookupError: unknown encoding: ascii" when open() called in a destructor outnull_file = open(os.devnull, "w") errnull_file = open(os.devnull, "w") @@ -147,7 +150,9 @@ def get_gpu_general_info() -> Tuple[float, float, float]: return 0.0, 0.0, 0.0 -async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): +async def monitor_task_queue( + status_dict: Dict[str, Union[int, float]], metrics_exporter: MetricsExporter +): """ An asynchronous function that monitors the task queue and updates a shared status dictionary with the number of tasks that have not @@ -160,6 +165,9 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): Any upcoming requests will be added to the task queue in the form of another RequestReponseCycle.run_asgi coroutine. """ + if not isinstance(metrics_exporter, MetricsExporter): + raise ValueError("metrics_exporter must be an instance of MetricsExporter") + all_tasks = asyncio.all_tasks() # Get count of all running tasks @@ -168,14 +176,23 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): # Get basic metadata of all running tasks status_dict["running_tasks"] = { task.get_name(): str(task.get_coro()) - .encode("ascii", errors="ignore") - .strip() - .decode("ascii") + .lstrip("\u003C") + .rstrip("\u003E") for task in all_tasks } + assert status_dict is not None + + # Register current running tasks as a Prometheus metric + _labels = { + "service": "general", + "request_type": "health_check", + } + _queue_metrics = QueueMetrics(**status_dict) + metrics_exporter.log_queue_metrics(_queue_metrics, _labels) + await asyncio.sleep(5) # adds a delay of 5 seconds to avoid overloading the CPU asyncio.create_task( - monitor_task_queue(status_dict) + monitor_task_queue(status_dict, metrics_exporter) ) # pass status_dict to the next task diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 2a062747ea..4d10eaaeb8 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -43,7 +43,7 @@ import llama_cpp.llama_cpp as llama_cpp import llama_cpp.llama_chat_format as llama_chat_format -from llama_cpp.llama_metrics import RequestMetrics, MetricsExporter +from llama_cpp.llama_metrics import RequestMetrics from llama_cpp._utils import ( get_cpu_usage, @@ -74,7 +74,6 @@ class Llama: """High-level Python wrapper for a llama.cpp model.""" __backend_initialized = False - __prometheus_metrics = MetricsExporter() def __init__( self, @@ -488,10 +487,7 @@ def __init__( self.chat_format = "llama-2" if self.verbose: print(f"Using fallback chat format: {self.chat_format}", file=sys.stderr) - - # Prometheus metrics - self.metrics = self.__prometheus_metrics - + @property def ctx(self) -> llama_cpp.llama_context_p: assert self._ctx.ctx is not None diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 43f36dd8e7..6af7620312 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -48,7 +48,7 @@ ) from llama_cpp.server.errors import RouteErrorHandler from llama_cpp._utils import monitor_task_queue -from llama_cpp.llama_metrics import QueueMetrics +from llama_cpp.llama_metrics import MetricsExporter router = APIRouter(route_class=RouteErrorHandler) @@ -102,14 +102,26 @@ def set_ping_message_factory(factory): _ping_message_factory = factory +def set_metrics_exporter(): + global metrics_exporter + try: + metrics_exporter + except NameError: + metrics_exporter = MetricsExporter() + + return metrics_exporter + task_queue_status = {} + @asynccontextmanager async def lifespan(app: FastAPI): """ A context manager that launches tasks to be run during the application's lifespan. """ - await monitor_task_queue(task_queue_status) + metrics_exporter = set_metrics_exporter() + + await monitor_task_queue(task_queue_status, metrics_exporter) yield @@ -514,7 +526,7 @@ async def create_chat_completion( # Adds the ai_service value from the request body to the kwargs # to be passed downstream to the llama_cpp.ChatCompletion object kwargs["ai_service"] = body.ai_service - + llama = llama_proxy(body.model) if body.logit_bias is not None: kwargs["logit_bias"] = ( @@ -523,14 +535,6 @@ async def create_chat_completion( else body.logit_bias ) - # Register current running tasks as a Prometheus metric - _labels = { - "service": "general", - "request_type": "chat/completions", - } - _queue_metrics = QueueMetrics(**task_queue_status) - llama.metrics.log_queue_metrics(_queue_metrics, _labels) - if body.grammar is not None: kwargs["grammar"] = llama_cpp.LlamaGrammar.from_string(body.grammar) @@ -543,6 +547,9 @@ async def create_chat_completion( else: kwargs["logits_processor"].extend(_min_tokens_logits_processor) + # Set the metrics exporter for the llama object + llama.metrics = set_metrics_exporter() + iterator_or_completion: Union[ llama_cpp.ChatCompletion, Iterator[llama_cpp.ChatCompletionChunk] ] = await run_in_threadpool(llama.create_chat_completion, **kwargs) From aead3af03dd707b9c9a76432caa5dab73b6cc579 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:34:13 -0700 Subject: [PATCH 24/28] Updated unit tests with changes to MetricsExporter init --- tests/test_llama.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/test_llama.py b/tests/test_llama.py index aca1745a86..49150c9f2b 100644 --- a/tests/test_llama.py +++ b/tests/test_llama.py @@ -5,9 +5,18 @@ from scipy.special import log_softmax import llama_cpp +from llama_cpp.llama_metrics import MetricsExporter MODEL = "./vendor/llama.cpp/models/ggml-vocab-llama-spm.gguf" +def set_test_metrics_exporter(): + global metrics_exporter + try: + metrics_exporter + except NameError: + metrics_exporter = MetricsExporter() + + return metrics_exporter def test_llama_cpp_tokenization(): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, verbose=False) @@ -156,7 +165,8 @@ def test_llama_patch(mock_llama): ai_service_completion = "test-label-suggestions" ai_service_streaming = "test-acceptance-criteria" llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, n_ctx=n_ctx) - + llama.metrics = set_test_metrics_exporter() + n_vocab = llama_cpp.llama_n_vocab(llama._model.model) assert n_vocab == 32000 @@ -231,6 +241,7 @@ def test_llama_pickle(): def test_utf8(mock_llama): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, logits_all=True) + llama.metrics = set_test_metrics_exporter() output_text = "😀" ai_service = "label-suggestions" From 0f25f39773b22499bcfa7b0cabb0f1810af69c9e Mon Sep 17 00:00:00 2001 From: juanroesel Date: Tue, 2 Jul 2024 10:29:40 -0700 Subject: [PATCH 25/28] Updated repo sync instructions --- INSTRUCTIONS.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 INSTRUCTIONS.md diff --git a/INSTRUCTIONS.md b/INSTRUCTIONS.md new file mode 100644 index 0000000000..96b23e2408 --- /dev/null +++ b/INSTRUCTIONS.md @@ -0,0 +1,30 @@ +## Syncing with upstream repo + +See [here](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork#syncing-a-fork-branch-from-the-web-ui) for more details. + +1. On the GitHub UI, create a new branch `repo-sync`, if the branch doesn't exist already. + +2. Click on the "Sync fork" button and then click on the "Update branch" button. This will import all the commits from the upstream repo. + +3. Create a local branch `repo-sync` and pull the contents from the remote `repo-sync` branch. + +4. Solve for any conflicts if they arise. Otherwise, proceed to the next step. +5. + +6. Since changes have probably been made to the vendor libraries (`llama_cpp`, `kompute`), we need to recompile the `llama_cpp` package. Navigate to the `vendor/llama.cpp` folder and clean the build cache: +git +``` +make clean +``` +1. Navigate back to the root directory and type the following to recompile the `llama_cpp` package and build the dependenies again: + +``` +make deps && make build +``` +7. Launch the `llama_cpp_python` server using the following command: +``` +python -m llama_cpp.server --model $MODEL --n_gpu_layers -1 +``` +NOTE: Modify the launch arguments as needed. Make sure the `MODEL` environment variable points to an absolute path containing a `.gguf` model. + +8. If the server launches without issues, then you can proceed to create a PR with the latest changes \ No newline at end of file From dc2fcf54e7c3b700f88f8b6ac9a160cc7aff8880 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Tue, 2 Jul 2024 11:02:00 -0700 Subject: [PATCH 26/28] Minor updates to instructions and comment in app.py --- INSTRUCTIONS.md | 8 +++++++- llama_cpp/server/app.py | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/INSTRUCTIONS.md b/INSTRUCTIONS.md index a562384cfb..da84dda5c8 100644 --- a/INSTRUCTIONS.md +++ b/INSTRUCTIONS.md @@ -10,7 +10,13 @@ See [here](https://docs.github.com/en/pull-requests/collaborating-with-pull-requ 4. Solve for any conflicts if they arise. Otherwise, proceed to the next step. -5. Since changes have probably been made to the vendor libraries (`llama_cpp`, `kompute`), we need to recompile the `llama_cpp` package. Navigate to the `vendor/llama.cpp` folder and clean the build cache: +5. Update all the git submodles: + +``` +git submodule update --recursive +``` + +6. Since changes have probably been made to the vendor libraries (`llama_cpp`, `kompute`), we need to recompile the `llama_cpp` package. Navigate to the `vendor/llama.cpp` folder and clean the build cache: ``` make clean diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index ab69b23fe4..4a00bf0e2f 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -528,7 +528,6 @@ async def create_chat_completion( } ), ) -> llama_cpp.ChatCompletion: - # Extract relevant kwargs from the request body # This is a workaround for an issue in FastAPI dependencies # where the dependency is cleaned up before a StreamingResponse # is complete. @@ -549,6 +548,7 @@ async def create_chat_completion( "min_tokens", } + # Extract relevant kwargs from the request body kwargs = body.model_dump(exclude=exclude) # Adds the ai_service value from the request body to the kwargs From 90854a8ff2e9cfb4057c11831a96d34daa383f94 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Tue, 2 Jul 2024 15:50:58 -0700 Subject: [PATCH 27/28] Uses logger with formatted output across function calls --- llama_cpp/_logger.py | 13 ++++++++++++- llama_cpp/llama.py | 23 ++++++++++++----------- llama_cpp/server/__main__.py | 3 ++- llama_cpp/server/app.py | 5 +++-- llama_cpp/server/errors.py | 3 ++- llama_cpp/server/model.py | 5 +++-- 6 files changed, 34 insertions(+), 18 deletions(-) diff --git a/llama_cpp/_logger.py b/llama_cpp/_logger.py index 7638170a97..c4f1bad6c8 100644 --- a/llama_cpp/_logger.py +++ b/llama_cpp/_logger.py @@ -17,7 +17,12 @@ 5: logging.DEBUG, } +# Set up logger with custom handler logger = logging.getLogger("llama-cpp-python") +handler = logging.StreamHandler() +formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) @llama_cpp.llama_log_callback @@ -27,7 +32,13 @@ def llama_log_callback( user_data: ctypes.c_void_p, ): if logger.level <= GGML_LOG_LEVEL_TO_LOGGING_LEVEL[level]: - print(text.decode("utf-8"), end="", flush=True, file=sys.stderr) + _text = text.decode("utf-8") + if _text.endswith("\n"): + _text = _text[:-1] + + # Skip if the message only contains "." + if not _text == ".": + logger.log(GGML_LOG_LEVEL_TO_LOGGING_LEVEL[level], _text) llama_cpp.llama_log_set(llama_log_callback, ctypes.c_void_p(0)) diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 4d10eaaeb8..476d75d54e 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -66,7 +66,7 @@ _LlamaSamplingContext, # type: ignore _normalize_embedding, # type: ignore ) -from ._logger import set_verbose +from ._logger import set_verbose, logger from ._utils import suppress_stdout_stderr @@ -403,7 +403,7 @@ def __init__( ) if self.verbose: - print(llama_cpp.llama_print_system_info().decode("utf-8"), file=sys.stderr) + logger.info(f'System info: {llama_cpp.llama_print_system_info().decode("utf-8")}') self.chat_format = chat_format self.chat_handler = chat_handler @@ -434,10 +434,10 @@ def __init__( except Exception as e: self.metadata = {} if self.verbose: - print(f"Failed to load metadata: {e}", file=sys.stderr) + logger.error(f"Failed to load metadata: {e}") if self.verbose: - print(f"Model metadata: {self.metadata}", file=sys.stderr) + logger.info(f"Model metadata: {self.metadata}") eos_token_id = self.token_eos() bos_token_id = self.token_bos() @@ -452,7 +452,7 @@ def __init__( template_choices["chat_template.default"] = self.metadata["tokenizer.chat_template"] if self.verbose and template_choices: - print(f"Available chat formats from metadata: {', '.join(template_choices.keys())}", file=sys.stderr) + logger.info(f"Available chat formats from metadata: {', '.join(template_choices.keys())}") for name, template in template_choices.items(): self._chat_handlers[name] = llama_chat_format.Jinja2ChatFormatter( @@ -474,19 +474,19 @@ def __init__( if chat_format is not None: self.chat_format = chat_format if self.verbose: - print(f"Guessed chat format: {chat_format}", file=sys.stderr) + logger.info(f"Guessed chat format: {chat_format}") else: if self.verbose: - print(f"Using gguf chat template: {template_choices['chat_template.default']}", file=sys.stderr) - print(f"Using chat eos_token: {eos_token}", file=sys.stderr) - print(f"Using chat bos_token: {bos_token}", file=sys.stderr) + logger.info(f"Using gguf chat template: {template_choices['chat_template.default']}") + logger.info(f"Using chat eos_token: {eos_token}") + logger.info(f"Using chat bos_token: {bos_token}") self.chat_format = "chat_template.default" if self.chat_format is None and self.chat_handler is None: self.chat_format = "llama-2" if self.verbose: - print(f"Using fallback chat format: {self.chat_format}", file=sys.stderr) + logger.info(f"Using fallback chat format: {self.chat_format}") @property def ctx(self) -> llama_cpp.llama_context_p: @@ -728,7 +728,8 @@ def generate( break if longest_prefix > 0: if self.verbose: - print("Llama.generate: prefix-match hit", file=sys.stderr) + # print("Llama.generate: prefix-match hit", file=sys.stderr) + logger.info("Llama.generate: prefix-match hit") reset = False tokens = tokens[longest_prefix:] self.n_tokens = longest_prefix diff --git a/llama_cpp/server/__main__.py b/llama_cpp/server/__main__.py index a6f1f4e9ca..a9e5ddb71b 100644 --- a/llama_cpp/server/__main__.py +++ b/llama_cpp/server/__main__.py @@ -37,6 +37,7 @@ ConfigFileSettings, ) from llama_cpp.server.cli import add_args_from_model, parse_model_from_args +from llama_cpp._logger import logger def main(): @@ -75,7 +76,7 @@ def main(): server_settings = parse_model_from_args(ServerSettings, args) model_settings = [parse_model_from_args(ModelSettings, args)] except Exception as e: - print(e, file=sys.stderr) + logger.error(e) parser.print_help() sys.exit(1) assert server_settings is not None diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 4a00bf0e2f..ed90fa155e 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -51,6 +51,7 @@ from llama_cpp.server.errors import RouteErrorHandler from llama_cpp._utils import monitor_task_queue from llama_cpp.llama_metrics import MetricsExporter +from llama_cpp._logger import logger router = APIRouter(route_class=RouteErrorHandler) @@ -211,9 +212,9 @@ async def get_event_publisher( raise anyio.get_cancelled_exc_class()() await inner_send_chan.send(dict(data="[DONE]")) except anyio.get_cancelled_exc_class() as e: - print("disconnected") + logger.warning(f"Disconnected from client {request.client}") with anyio.move_on_after(1, shield=True): - print(f"Disconnected from client (via refresh/close) {request.client}") + logger.error(f"Disconnected from client (via refresh/close) {request.client}") raise e finally: if on_complete: diff --git a/llama_cpp/server/errors.py b/llama_cpp/server/errors.py index fbf9fd80d5..826e7ed945 100644 --- a/llama_cpp/server/errors.py +++ b/llama_cpp/server/errors.py @@ -21,6 +21,7 @@ CreateEmbeddingRequest, CreateChatCompletionRequest, ) +from llama_cpp._logger import logger class ErrorResponse(TypedDict): @@ -134,7 +135,7 @@ def error_message_wrapper( ] = None, ) -> Tuple[int, ErrorResponse]: """Wraps error message in OpenAI style error response""" - print(f"Exception: {str(error)}", file=sys.stderr) + logger.error(f"Exception: {str(error)}") traceback.print_exc(file=sys.stderr) if body is not None and isinstance( body, diff --git a/llama_cpp/server/model.py b/llama_cpp/server/model.py index ad39c1004b..c35fd37dde 100644 --- a/llama_cpp/server/model.py +++ b/llama_cpp/server/model.py @@ -9,6 +9,7 @@ import llama_cpp.llama_tokenizer as llama_tokenizer from llama_cpp.server.settings import ModelSettings +from llama_cpp._logger import logger class LlamaProxy: @@ -272,11 +273,11 @@ def load_llama_from_model_settings(settings: ModelSettings) -> llama_cpp.Llama: if settings.cache: if settings.cache_type == "disk": if settings.verbose: - print(f"Using disk cache with size {settings.cache_size}") + logger.info(f"Using disk cache with size {settings.cache_size}") cache = llama_cpp.LlamaDiskCache(capacity_bytes=settings.cache_size) else: if settings.verbose: - print(f"Using ram cache with size {settings.cache_size}") + logger.info(f"Using ram cache with size {settings.cache_size}") cache = llama_cpp.LlamaRAMCache(capacity_bytes=settings.cache_size) _model.set_cache(cache) return _model From 8bb7ee58a145cfb422d493dc57e48ecf4a10fe99 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Tue, 2 Jul 2024 16:36:47 -0700 Subject: [PATCH 28/28] Matched format between uvicorn and llama-cpp loggers --- llama_cpp/_logger.py | 32 +++++++++++++++++++++++++++++--- llama_cpp/server/__main__.py | 3 ++- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/llama_cpp/_logger.py b/llama_cpp/_logger.py index c4f1bad6c8..f3407c92bb 100644 --- a/llama_cpp/_logger.py +++ b/llama_cpp/_logger.py @@ -1,6 +1,7 @@ import sys import ctypes import logging +import logging.config import llama_cpp @@ -17,14 +18,39 @@ 5: logging.DEBUG, } -# Set up logger with custom handler +UVICORN_LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "standard": {"format": "%(asctime)s [%(levelname)s] %(message)s"}, + }, + "handlers": { + "default": { + "level": "INFO", + "formatter": "standard", + "class": "logging.StreamHandler", + "stream": "ext://sys.stdout", # Default is stderr + }, + }, + "loggers": { + "uvicorn.error": { + "level": "DEBUG", + "handlers": ["default"], + }, + "uvicorn.access": { + "level": "DEBUG", + "handlers": ["default"], + }, + }, +} + +# Set up llama-cpp-python logger matching the format of uvicorn logger logger = logging.getLogger("llama-cpp-python") handler = logging.StreamHandler() -formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") +formatter = logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) - @llama_cpp.llama_log_callback def llama_log_callback( level: int, diff --git a/llama_cpp/server/__main__.py b/llama_cpp/server/__main__.py index a9e5ddb71b..6ced7bc106 100644 --- a/llama_cpp/server/__main__.py +++ b/llama_cpp/server/__main__.py @@ -37,7 +37,7 @@ ConfigFileSettings, ) from llama_cpp.server.cli import add_args_from_model, parse_model_from_args -from llama_cpp._logger import logger +from llama_cpp._logger import logger, UVICORN_LOGGING_CONFIG def main(): @@ -91,6 +91,7 @@ def main(): port=int(os.getenv("PORT", server_settings.port)), ssl_keyfile=server_settings.ssl_keyfile, ssl_certfile=server_settings.ssl_certfile, + log_config=UVICORN_LOGGING_CONFIG )