|
import time |
|
|
|
import requests |
|
|
|
from inference.core.devices.utils import GLOBAL_DEVICE_ID |
|
from inference.core.env import API_KEY, METRICS_INTERVAL, METRICS_URL, TAGS |
|
from inference.core.logger import logger |
|
from inference.core.managers.metrics import get_model_metrics, get_system_info |
|
from inference.core.utils.requests import api_key_safe_raise_for_status |
|
from inference.core.version import __version__ |
|
from inference.enterprise.device_manager.command_handler import handle_command |
|
from inference.enterprise.device_manager.container_service import ( |
|
get_container_by_id, |
|
get_container_ids, |
|
) |
|
from inference.enterprise.device_manager.helpers import get_cache_model_items |
|
|
|
|
|
def aggregate_model_stats(container_id): |
|
""" |
|
Aggregate statistics for models within a specified container. |
|
|
|
This function retrieves and aggregates performance metrics for all models |
|
associated with the given container within a specified time interval. |
|
|
|
Args: |
|
container_id (str): The unique identifier of the container for which |
|
model statistics are to be aggregated. |
|
|
|
Returns: |
|
list: A list of dictionaries, where each dictionary represents a model's |
|
statistics with the following keys: |
|
- "dataset_id" (str): The ID of the dataset associated with the model. |
|
- "version" (str): The version of the model. |
|
- "api_key" (str): The API key that was used to make an inference against this model |
|
- "metrics" (dict): A dictionary containing performance metrics for the model: |
|
- "num_inferences" (int): Number of inferences made |
|
- "num_errors" (int): Number of errors |
|
- "avg_inference_time" (float): Average inference time in seconds |
|
|
|
Notes: |
|
- The function calculates statistics over a time interval defined by |
|
the global constant METRICS_INTERVAL, passed in when starting up the container. |
|
""" |
|
now = time.time() |
|
start = now - METRICS_INTERVAL |
|
models = [] |
|
api_keys = get_cache_model_items().get(container_id, dict()).keys() |
|
for api_key in api_keys: |
|
model_ids = get_cache_model_items().get(container_id, dict()).get(api_key, []) |
|
for model_id in model_ids: |
|
model = { |
|
"dataset_id": model_id.split("/")[0], |
|
"version": model_id.split("/")[1], |
|
"api_key": api_key, |
|
"metrics": get_model_metrics( |
|
container_id, model_id, min=start, max=now |
|
), |
|
} |
|
models.append(model) |
|
return models |
|
|
|
|
|
def build_container_stats(): |
|
""" |
|
Build statistics for containers and their associated models. |
|
|
|
Returns: |
|
list: A list of dictionaries, where each dictionary represents statistics |
|
for a container and its associated models with the following keys: |
|
- "uuid" (str): The unique identifier (UUID) of the container. |
|
- "startup_time" (float): The timestamp representing the container's startup time. |
|
- "models" (list): A list of dictionaries representing statistics for each |
|
model associated with the container (see `aggregate_model_stats` for format). |
|
|
|
Notes: |
|
- This method relies on a singleton `container_service` for container information. |
|
""" |
|
containers = [] |
|
for id in get_container_ids(): |
|
container = get_container_by_id(id) |
|
if container: |
|
container_stats = {} |
|
models = aggregate_model_stats(id) |
|
container_stats["uuid"] = container.id |
|
container_stats["version"] = container.version |
|
container_stats["startup_time"] = container.startup_time |
|
container_stats["models"] = models |
|
if container.status == "running" or container.status == "restarting": |
|
container_stats["status"] = "running" |
|
elif container.status == "exited": |
|
container_stats["status"] = "stopped" |
|
elif container.status == "paused": |
|
container_stats["status"] = "idle" |
|
else: |
|
container_stats["status"] = "processing" |
|
containers.append(container_stats) |
|
return containers |
|
|
|
|
|
def aggregate_device_stats(): |
|
""" |
|
Aggregate statistics for the device. |
|
""" |
|
window_start_timestamp = str(int(time.time())) |
|
all_data = { |
|
"api_key": API_KEY, |
|
"timestamp": window_start_timestamp, |
|
"device": { |
|
"id": GLOBAL_DEVICE_ID, |
|
"name": GLOBAL_DEVICE_ID, |
|
"type": f"roboflow-device-manager=={__version__}", |
|
"tags": TAGS, |
|
"system_info": get_system_info(), |
|
"containers": build_container_stats(), |
|
}, |
|
} |
|
return all_data |
|
|
|
|
|
def report_metrics_and_handle_commands(): |
|
""" |
|
Report metrics to Roboflow. |
|
|
|
This function aggregates statistics for the device and its containers and |
|
sends them to Roboflow. If Roboflow sends back any commands, they are |
|
handled by the `handle_command` function. |
|
""" |
|
all_data = aggregate_device_stats() |
|
logger.info(f"Sending metrics to Roboflow {str(all_data)}.") |
|
res = requests.post(METRICS_URL, json=all_data) |
|
api_key_safe_raise_for_status(response=res) |
|
response = res.json() |
|
for cmd in response.get("data", []): |
|
if cmd: |
|
handle_command(cmd) |
|
|