Moderate Example
Moderate example for v2/logs/aggs API - Multiple aggregations with time-based analysis.
Useful for analyzing log patterns over time and combining multiple aggregation types.
This example demonstrates using date histograms alongside terms aggregations to understand temporal patterns.
Code Example
import requests
import json
import logging
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]
def fetch_log_aggregations(access_token):
"""Fetch log aggregations with multiple aggregation types."""
url = f"{BASE_URL}/v2/logs/aggs"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
# Calculate time range (last 7 days)
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=7)
# Request payload with multiple aggregations
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["domain1.com", "domain2.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"log_levels": {
"terms": {
"field": "regex.extractions.logLevel",
"size": 10
}
},
"logs_over_time": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
},
"error_analysis": {
"filter": {
"term": {
"regex.extractions.logLevel": "ERROR"
}
}
}
}
}
logger.info(f"Fetching aggregations from {start_time} to {end_time}...")
# Make request
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
def display_results(data):
"""Display aggregation results in a readable format."""
aggregations = data.get("data", {}).get("aggregations", {})
total_docs = data.get("data", {}).get("total_documents", 0)
logger.info("="*60)
logger.info(f"Total Documents Analyzed: {total_docs:,}")
logger.info("="*60)
# Display log level distribution
logger.info("\n1. Log Level Distribution:")
log_levels = aggregations.get("log_levels", {}).get("buckets", [])
for bucket in log_levels:
level = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" {level:10s}: {count:>10,} ({percentage:>5.2f}%)")
# Display time-based distribution
logger.info("\n2. Daily Log Volume:")
time_buckets = aggregations.get("logs_over_time", {}).get("buckets", [])
for bucket in time_buckets:
date = bucket.get("key_as_string", "")[:10] # Extract date only
count = bucket.get("doc_count")
logger.info(f" {date}: {count:>10,} logs")
# Display error analysis
logger.info("\n3. Error Analysis:")
error_count = aggregations.get("error_analysis", {}).get("doc_count", 0)
error_percentage = (error_count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" Total Errors: {error_count:,} ({error_percentage:.2f}%)")
def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v2/logs/aggs API Example")
logger.info("=" * 60)
# Step 1: Get access token
logger.info("\n1. Authenticating...")
try:
access_token = get_access_token()
logger.info(" Authentication successful")
except Exception as e:
logger.error(f" Authentication failed: {e}")
return
# Step 2: Fetch aggregations
logger.info("\n2. Fetching log aggregations...")
try:
data = fetch_log_aggregations(access_token)
logger.info(" Successfully retrieved aggregations")
except Exception as e:
logger.error(f" Failed to fetch aggregations: {e}")
return
# Step 3: Display results
logger.info("\n3. Results:")
display_results(data)
logger.info("\n" + "="*60)
logger.info("Complete!")
logger.info("="*60)
if __name__ == "__main__":
main()