File size: 5,353 Bytes
df6c67d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import time
import requests
from inference.core.devices.utils import GLOBAL_DEVICE_ID
from inference.core.env import API_KEY, METRICS_INTERVAL, METRICS_URL, TAGS
from inference.core.logger import logger
from inference.core.managers.metrics import get_model_metrics, get_system_info
from inference.core.utils.requests import api_key_safe_raise_for_status
from inference.core.version import __version__
from inference.enterprise.device_manager.command_handler import handle_command
from inference.enterprise.device_manager.container_service import (
get_container_by_id,
get_container_ids,
)
from inference.enterprise.device_manager.helpers import get_cache_model_items
def aggregate_model_stats(container_id):
"""
Aggregate statistics for models within a specified container.
This function retrieves and aggregates performance metrics for all models
associated with the given container within a specified time interval.
Args:
container_id (str): The unique identifier of the container for which
model statistics are to be aggregated.
Returns:
list: A list of dictionaries, where each dictionary represents a model's
statistics with the following keys:
- "dataset_id" (str): The ID of the dataset associated with the model.
- "version" (str): The version of the model.
- "api_key" (str): The API key that was used to make an inference against this model
- "metrics" (dict): A dictionary containing performance metrics for the model:
- "num_inferences" (int): Number of inferences made
- "num_errors" (int): Number of errors
- "avg_inference_time" (float): Average inference time in seconds
Notes:
- The function calculates statistics over a time interval defined by
the global constant METRICS_INTERVAL, passed in when starting up the container.
"""
now = time.time()
start = now - METRICS_INTERVAL
models = []
api_keys = get_cache_model_items().get(container_id, dict()).keys()
for api_key in api_keys:
model_ids = get_cache_model_items().get(container_id, dict()).get(api_key, [])
for model_id in model_ids:
model = {
"dataset_id": model_id.split("/")[0],
"version": model_id.split("/")[1],
"api_key": api_key,
"metrics": get_model_metrics(
container_id, model_id, min=start, max=now
),
}
models.append(model)
return models
def build_container_stats():
"""
Build statistics for containers and their associated models.
Returns:
list: A list of dictionaries, where each dictionary represents statistics
for a container and its associated models with the following keys:
- "uuid" (str): The unique identifier (UUID) of the container.
- "startup_time" (float): The timestamp representing the container's startup time.
- "models" (list): A list of dictionaries representing statistics for each
model associated with the container (see `aggregate_model_stats` for format).
Notes:
- This method relies on a singleton `container_service` for container information.
"""
containers = []
for id in get_container_ids():
container = get_container_by_id(id)
if container:
container_stats = {}
models = aggregate_model_stats(id)
container_stats["uuid"] = container.id
container_stats["version"] = container.version
container_stats["startup_time"] = container.startup_time
container_stats["models"] = models
if container.status == "running" or container.status == "restarting":
container_stats["status"] = "running"
elif container.status == "exited":
container_stats["status"] = "stopped"
elif container.status == "paused":
container_stats["status"] = "idle"
else:
container_stats["status"] = "processing"
containers.append(container_stats)
return containers
def aggregate_device_stats():
"""
Aggregate statistics for the device.
"""
window_start_timestamp = str(int(time.time()))
all_data = {
"api_key": API_KEY,
"timestamp": window_start_timestamp,
"device": {
"id": GLOBAL_DEVICE_ID,
"name": GLOBAL_DEVICE_ID,
"type": f"roboflow-device-manager=={__version__}",
"tags": TAGS,
"system_info": get_system_info(),
"containers": build_container_stats(),
},
}
return all_data
def report_metrics_and_handle_commands():
"""
Report metrics to Roboflow.
This function aggregates statistics for the device and its containers and
sends them to Roboflow. If Roboflow sends back any commands, they are
handled by the `handle_command` function.
"""
all_data = aggregate_device_stats()
logger.info(f"Sending metrics to Roboflow {str(all_data)}.")
res = requests.post(METRICS_URL, json=all_data)
api_key_safe_raise_for_status(response=res)
response = res.json()
for cmd in response.get("data", []):
if cmd:
handle_command(cmd)
|