Advanced Example
Advanced example for v2/logs/aggs API - Complex nested aggregations and multiple query patterns.
This example demonstrates various aggregation patterns including nested aggregations, filter sub-aggregations, and combining multiple analysis techniques.
Code Example
import requests
import logging
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]
def example_1_simple_terms():
"""Example 1: Simple terms aggregation - Count by log level."""
logger.info("="*60)
logger.info("Example 1: Simple Terms Aggregation")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"log_levels": {
"terms": {
"field": "regex.extractions.logLevel",
"size": 10
}
}
}
}
response = requests.post(
f"{BASE_URL}/v2/logs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Total documents: {data.get('data', {}).get('total_documents', 0):,}")
buckets = data.get('data', {}).get('aggregations', {}).get('log_levels', {}).get('buckets', [])
logger.info("Log levels:")
for bucket in buckets:
logger.info(f" {bucket['key']}: {bucket['doc_count']:,}")
def example_2_date_histogram():
"""Example 2: Date histogram - Logs per hour."""
logger.info("="*60)
logger.info("Example 2: Date Histogram - Hourly Log Volume")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=12)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["prod.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"logs_per_hour": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1h"
}
}
}
}
response = requests.post(
f"{BASE_URL}/v2/logs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
logger.info(f"Status: {response.status_code}")
buckets = data.get('data', {}).get('aggregations', {}).get('logs_per_hour', {}).get('buckets', [])
logger.info("Hourly distribution:")
for bucket in buckets[:5]: # Show first 5 hours
time = bucket['key_as_string']
count = bucket['doc_count']
logger.info(f" {time}: {count:,} logs")
def example_3_nested_aggregation():
"""Example 3: Nested aggregation - Domains with log level breakdown."""
logger.info("="*60)
logger.info("Example 3: Nested Aggregation - Domains with Log Levels")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=1)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
},
"aggs": {
"by_level": {
"terms": {
"field": "regex.extractions.logLevel",
"size": 5
}
}
}
}
}
}
response = requests.post(
f"{BASE_URL}/v2/logs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
logger.info(f"Status: {response.status_code}")
domain_buckets = data.get('data', {}).get('aggregations', {}).get('by_domain', {}).get('buckets', [])
for domain_bucket in domain_buckets:
domain = domain_bucket['key']
total = domain_bucket['doc_count']
logger.info(f"\nDomain: {domain} ({total:,} logs)")
level_buckets = domain_bucket.get('by_level', {}).get('buckets', [])
for level_bucket in level_buckets:
level = level_bucket['key']
count = level_bucket['doc_count']
percentage = (count / total * 100) if total > 0 else 0
logger.info(f" {level}: {count:,} ({percentage:.1f}%)")
def example_4_filter_with_subaggs():
"""Example 4: Filter with sub-aggregations - Error analysis over time."""
logger.info("="*60)
logger.info("Example 4: Filter with Sub-aggregations - Error Timeline")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"error_analysis": {
"filter": {
"term": {
"regex.extractions.logLevel": "ERROR"
}
},
"aggs": {
"errors_per_day": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}
}
}
response = requests.post(
f"{BASE_URL}/v2/logs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
logger.info(f"Status: {response.status_code}")
error_agg = data.get('data', {}).get('aggregations', {}).get('error_analysis', {})
total_errors = error_agg.get('doc_count', 0)
logger.info(f"Total errors in period: {total_errors:,}")
daily_buckets = error_agg.get('errors_per_day', {}).get('buckets', [])
logger.info("\nDaily error distribution:")
for bucket in daily_buckets:
date = bucket['key_as_string'][:10]
count = bucket['doc_count']
logger.info(f" {date}: {count:,} errors")
def example_5_serverless_aggregations():
"""Example 5: Serverless (EventOps) aggregations."""
logger.info("="*60)
logger.info("Example 5: Serverless Aggregations - EventOps")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=6)
payload = {
"application": "eventops",
"app_type": "serverless",
"log_group": ["*"],
"tenant": ["tenant1"],
"stage": ["prod"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"event_origins": {
"terms": {
"field": "event.origin",
"size": 20
}
},
"error_count": {
"filter": {
"term": {
"logLevel": "ERROR"
}
}
}
}
}
response = requests.post(
f"{BASE_URL}/v2/logs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Total documents: {data.get('data', {}).get('total_documents', 0):,}")
# Event origins
origin_buckets = data.get('data', {}).get('aggregations', {}).get('event_origins', {}).get('buckets', [])
logger.info("\nEvent origins:")
for bucket in origin_buckets[:5]:
logger.info(f" {bucket['key']}: {bucket['doc_count']:,}")
# Error count
error_count = data.get('data', {}).get('aggregations', {}).get('error_count', {}).get('doc_count', 0)
logger.info(f"\nTotal errors: {error_count:,}")
def example_6_complex_multi_agg():
"""Example 6: Complex multi-aggregation analysis."""
logger.info("="*60)
logger.info("Example 6: Complex Multi-Aggregation Analysis")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=1)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"log_levels": {
"terms": {
"field": "regex.extractions.logLevel",
"size": 10
}
},
"hourly_volume": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1h"
}
},
"error_analysis": {
"filter": {
"term": {
"regex.extractions.logLevel": "ERROR"
}
},
"aggs": {
"hourly_errors": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1h"
}
}
}
},
"warn_analysis": {
"filter": {
"term": {
"regex.extractions.logLevel": "WARN"
}
}
}
}
}
response = requests.post(
f"{BASE_URL}/v2/logs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
aggs = data.get('data', {}).get('aggregations', {})
total_docs = data.get('data', {}).get('total_documents', 0)
logger.info(f"Status: {response.status_code}")
logger.info(f"Total documents: {total_docs:,}")
# Log levels
logger.info("\nLog level distribution:")
for bucket in aggs.get('log_levels', {}).get('buckets', []):
count = bucket['doc_count']
pct = (count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" {bucket['key']}: {count:,} ({pct:.2f}%)")
# Error and warn counts
error_count = aggs.get('error_analysis', {}).get('doc_count', 0)
warn_count = aggs.get('warn_analysis', {}).get('doc_count', 0)
logger.info(f"\nTotal errors: {error_count:,}")
logger.info(f"Total warnings: {warn_count:,}")
# Peak hour
hourly_buckets = aggs.get('hourly_volume', {}).get('buckets', [])
if hourly_buckets:
peak_hour = max(hourly_buckets, key=lambda x: x['doc_count'])
logger.info(f"\nPeak hour: {peak_hour['key_as_string']} with {peak_hour['doc_count']:,} logs")
def main():
"""Run all examples."""
logger.info("="*60)
logger.info("v2/logs/aggs API - Advanced Examples")
logger.info("="*60)
try:
example_1_simple_terms()
example_2_date_histogram()
example_3_nested_aggregation()
example_4_filter_with_subaggs()
example_5_serverless_aggregations()
example_6_complex_multi_agg()
logger.info("="*60)
logger.info("All examples completed!")
logger.info("="*60)
except Exception as e:
logger.error(f"Error: {e}")
if __name__ == "__main__":
main()