Skip to main content

Advanced Example

Advanced example for v2/workloads/aggs API - Complex nested aggregations and multiple query patterns.

This example demonstrates various aggregation patterns including nested aggregations, filter sub-aggregations, and combining multiple analysis techniques for workflow execution analysis.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]

def example_1_simple_terms():
"""Example 1: Simple terms aggregation - Count by workflow."""
logger.info("="*60)
logger.info("Example 1: Simple Terms Aggregation - Top Workflows")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"by_workflow": {
"terms": {
"field": "workflow.name",
"size": 20
}
}
}
}

response = requests.post(
f"{BASE_URL}/v2/workloads/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Total workloads: {data.get('data', {}).get('total_documents', 0):,}")

buckets = data.get('data', {}).get('aggregations', {}).get('by_workflow', {}).get('buckets', [])
logger.info("\nTop workflows:")
for i, bucket in enumerate(buckets[:10], 1):
logger.info(f" {i:2d}. {bucket['key'][:50]}: {bucket['doc_count']:,}")

def example_2_date_histogram():
"""Example 2: Date histogram - Workloads per day."""
logger.info("="*60)
logger.info("Example 2: Date Histogram - Daily Workload Volume")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=14)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["prod.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"workloads_per_day": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}

response = requests.post(
f"{BASE_URL}/v2/workloads/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

buckets = data.get('data', {}).get('aggregations', {}).get('workloads_per_day', {}).get('buckets', [])
logger.info("\nDaily distribution:")
for bucket in buckets:
date = bucket['key_as_string'][:10]
count = bucket['doc_count']
logger.info(f" {date}: {count:,} workloads")

def example_3_nested_aggregation():
"""Example 3: Nested aggregation - Domains with workflow breakdown."""
logger.info("="*60)
logger.info("Example 3: Nested Aggregation - Domains with Workflows")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
},
"aggs": {
"by_workflow": {
"terms": {
"field": "workflow.name",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v2/workloads/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

domain_buckets = data.get('data', {}).get('aggregations', {}).get('by_domain', {}).get('buckets', [])
for domain_bucket in domain_buckets:
domain = domain_bucket['key']
total = domain_bucket['doc_count']
logger.info(f"\nDomain: {domain} ({total:,} workloads)")

workflow_buckets = domain_bucket.get('by_workflow', {}).get('buckets', [])
for workflow_bucket in workflow_buckets:
workflow = workflow_bucket['key']
count = workflow_bucket['doc_count']
percentage = (count / total * 100) if total > 0 else 0
logger.info(f" {workflow[:45]:45s}: {count:>8,} ({percentage:>5.1f}%)")

def example_4_filter_with_subaggs():
"""Example 4: Filter with sub-aggregations - Specific workflow analysis over time."""
logger.info("="*60)
logger.info("Example 4: Filter with Sub-aggregations - Workflow Timeline")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"incident_workflow_analysis": {
"filter": {
"term": {
"workflow.name": "Incident_Response_Workflow"
}
},
"aggs": {
"daily_counts": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v2/workloads/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

workflow_agg = data.get('data', {}).get('aggregations', {}).get('incident_workflow_analysis', {})
total_count = workflow_agg.get('doc_count', 0)
logger.info(f"Total 'Incident_Response_Workflow' executions: {total_count:,}")

daily_buckets = workflow_agg.get('daily_counts', {}).get('buckets', [])
logger.info("\nDaily execution distribution:")
for bucket in daily_buckets[-7:]: # Show last 7 days
date = bucket['key_as_string'][:10]
count = bucket['doc_count']
logger.info(f" {date}: {count:,} executions")

def example_5_multiple_filters():
"""Example 5: Multiple filter aggregations - Compare workflows."""
logger.info("="*60)
logger.info("Example 5: Multiple Filters - Compare Workflows")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"incident_workflow": {
"filter": {
"term": {
"workflow.name": "Incident_Response_Workflow"
}
}
},
"maintenance_workflow": {
"filter": {
"term": {
"workflow.name": "Scheduled_Maintenance_Workflow"
}
}
},
"enrichment_workflow": {
"filter": {
"term": {
"workflow.name": "Data_Enrichment_Workflow"
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v2/workloads/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
aggs = data.get('data', {}).get('aggregations', {})
total_docs = data.get('data', {}).get('total_documents', 0)

logger.info(f"Status: {response.status_code}")
logger.info(f"Total workloads: {total_docs:,}")

logger.info("\nWorkflow comparison:")
incident_count = aggs.get('incident_workflow', {}).get('doc_count', 0)
maintenance_count = aggs.get('maintenance_workflow', {}).get('doc_count', 0)
enrichment_count = aggs.get('enrichment_workflow', {}).get('doc_count', 0)

logger.info(f" Incident Response: {incident_count:>10,} ({(incident_count/total_docs*100) if total_docs > 0 else 0:>5.2f}%)")
logger.info(f" Scheduled Maintenance: {maintenance_count:>10,} ({(maintenance_count/total_docs*100) if total_docs > 0 else 0:>5.2f}%)")
logger.info(f" Data Enrichment: {enrichment_count:>10,} ({(enrichment_count/total_docs*100) if total_docs > 0 else 0:>5.2f}%)")

def example_6_complex_multi_agg():
"""Example 6: Complex multi-aggregation analysis."""
logger.info("="*60)
logger.info("Example 6: Complex Multi-Aggregation Analysis")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=14)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"top_workflows": {
"terms": {
"field": "workflow.name",
"size": 10
}
},
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
}
},
"daily_volume": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
},
"incident_analysis": {
"filter": {
"term": {
"workflow.name": "Incident_Response_Workflow"
}
},
"aggs": {
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v2/workloads/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
aggs = data.get('data', {}).get('aggregations', {})
total_docs = data.get('data', {}).get('total_documents', 0)

logger.info(f"Status: {response.status_code}")
logger.info(f"Total workloads: {total_docs:,}")

# Top workflows
logger.info("\nTop 5 workflows:")
for i, bucket in enumerate(aggs.get('top_workflows', {}).get('buckets', [])[:5], 1):
count = bucket['doc_count']
pct = (count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" {i}. {bucket['key'][:45]:45s}: {count:>8,} ({pct:>5.2f}%)")

# Domains
logger.info("\nTop domains:")
for bucket in aggs.get('by_domain', {}).get('buckets', []):
count = bucket['doc_count']
logger.info(f" {bucket['key']:30s}: {count:>8,}")

# Peak day
daily_buckets = aggs.get('daily_volume', {}).get('buckets', [])
if daily_buckets:
peak_day = max(daily_buckets, key=lambda x: x['doc_count'])
logger.info(f"\nPeak day: {peak_day['key_as_string'][:10]} with {peak_day['doc_count']:,} workloads")

# Incident workflow by domain
incident_agg = aggs.get('incident_analysis', {})
incident_total = incident_agg.get('doc_count', 0)
logger.info(f"\nIncident Response Workflow: {incident_total:,} total executions")
logger.info(" By domain:")
for bucket in incident_agg.get('by_domain', {}).get('buckets', []):
logger.info(f" {bucket['key']:28s}: {bucket['doc_count']:>8,}")

def main():
"""Run all examples."""
logger.info("="*60)
logger.info("v2/workloads/aggs API - Advanced Examples")
logger.info("="*60)

try:
example_1_simple_terms()
example_2_date_histogram()
example_3_nested_aggregation()
example_4_filter_with_subaggs()
example_5_multiple_filters()
example_6_complex_multi_agg()

logger.info("="*60)
logger.info("All examples completed!")
logger.info("="*60)
except Exception as e:
logger.error(f"Error: {e}")

if __name__ == "__main__":
main()