Simple Example
Simple example for v2/workloads/aggs API - Basic aggregation query.
Use this when you want to get aggregated statistics from your workloads without complex filtering.
Code Example
import requests
import logging
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
# Step 1: Get access token
logger.info("Getting access token...")
token_response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
access_token = token_response.json()["access_token"]
logger.info("Authenticated")
# Step 2: Prepare request
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"by_workflow": {
"terms": {
"field": "workflow.name",
"size": 20
}
}
}
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
# Step 3: Make request
logger.info("Fetching workload aggregations...")
response = requests.post(
f"{BASE_URL}/v2/workloads/aggs",
json=payload,
headers=headers
)
# Step 4: Process response
if response.status_code == 200:
data = response.json()
aggregations = data.get("data", {}).get("aggregations", {})
total_docs = data.get("data", {}).get("total_documents", 0)
logger.info(f"Total workloads analyzed: {total_docs:,}")
# Display workflow distribution
workflows = aggregations.get("by_workflow", {}).get("buckets", [])
logger.info("\nWorkflow Distribution:")
for bucket in workflows:
workflow = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" {workflow}: {count:,} ({percentage:.2f}%)")
else:
logger.error(f"Request failed: {response.status_code}")
logger.error(f" {response.text}")