Simple Example
Simple example for v2/logs/aggs API - Basic aggregation query.
Use this when you want to get aggregated statistics from your logs without complex filtering.
Code Example
import requests
import logging
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
# Step 1: Get access token
logger.info("Getting access token...")
token_response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
access_token = token_response.json()["access_token"]
logger.info("Access token retrieved")
# Step 2: Prepare request
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"aggs": {
"log_levels": {
"terms": {
"field": "regex.extractions.logLevel",
"size": 10
}
}
}
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
logger.info("Payload and headers constructed")
# Step 3: Make request
logger.info("Fetching log aggregations...")
response = requests.post(
f"{BASE_URL}/v2/logs/aggs",
json=payload,
headers=headers
)
# Step 4: Process response
if response.status_code == 200:
data = response.json()
aggregations = data.get("data", {}).get("aggregations", {})
total_docs = data.get("data", {}).get("total_documents", 0)
logger.info(f"Total documents analyzed: {total_docs:,}")
# Display log level distribution
log_levels = aggregations.get("log_levels", {}).get("buckets", [])
logger.info("\nLog Level Distribution:")
for bucket in log_levels:
level = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total_docs * 100) if total_docs > 0 else 0
logger.info(f" {level}: {count:,} ({percentage:.2f}%)")
else:
logger.error(f"Request failed: {response.status_code}")
logger.error(f"{response.text}")