Skip to main content

Moderate Example

Moderate example for v2/workloads/aggs API - Multiple aggregations with time-based analysis.

Useful for analyzing workload patterns over time and combining multiple aggregation types.

This example demonstrates using date histograms alongside terms aggregations to understand temporal patterns in workflow execution.


Code Example

import requests
import json
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]

def fetch_workload_aggregations(access_token):
"""Fetch workload aggregations with multiple aggregation types."""
url = f"{BASE_URL}/v2/workloads/aggs"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

# Calculate time range (last 30 days)
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

# Request payload with multiple aggregations
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["domain1.com", "domain2.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"by_workflow": {
"terms": {
"field": "workflow.name",
"size": 20
}
},
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 10
}
},
"over_time": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}

logger.info(f"Fetching aggregations from {start_time} to {end_time}...")

# Make request
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()

return response.json()

def display_results(data):
"""Display aggregation results in a readable format."""
aggregations = data.get("data", {}).get("aggregations", {})
total_docs = data.get("data", {}).get("total_documents", 0)

logger.info("="*60)
logger.info(f"Total Workloads Analyzed: {total_docs:,}")
logger.info("="*60)

# Display workflow distribution
logger.info("\n1. Top Workflows:")
workflows = aggregations.get("by_workflow", {}).get("buckets", [])
for i, bucket in enumerate(workflows[:10], 1):
workflow = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" {i:2d}. {workflow[:50]:50s}: {count:>10,} ({percentage:>5.2f}%)")

# Display domain distribution
logger.info("\n2. Workloads by Domain:")
domains = aggregations.get("by_domain", {}).get("buckets", [])
for bucket in domains:
domain = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" {domain:30s}: {count:>10,} ({percentage:>5.2f}%)")

# Display time-based distribution
logger.info("\n3. Daily Workload Volume (last 7 days):")
time_buckets = aggregations.get("over_time", {}).get("buckets", [])
for bucket in time_buckets[-7:]: # Show last 7 days
date = bucket.get("key_as_string", "")[:10] # Extract date only
count = bucket.get("doc_count")
logger.info(f" {date}: {count:>10,} workloads")

def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v2/workloads/aggs API Example")
logger.info("=" * 60)

# Step 1: Get access token
logger.info("\n1. Authenticating...")
try:
access_token = get_access_token()
logger.info(" Authentication successful")
except Exception as e:
logger.error(f" Authentication failed: {e}")
return

# Step 2: Fetch aggregations
logger.info("\n2. Fetching workload aggregations...")
try:
data = fetch_workload_aggregations(access_token)
logger.info(" Successfully retrieved aggregations")
except Exception as e:
logger.error(f" Failed to fetch aggregations: {e}")
return

# Step 3: Display results
logger.info("\n3. Results:")
display_results(data)

logger.info("\n" + "="*60)
logger.info("Complete!")
logger.info("="*60)

if __name__ == "__main__":
main()