Skip to main content

Simple Example

Simple example for v3/jobs/aggs API - Basic aggregation query.

Use this when you want to get aggregated statistics from your job executions without complex filtering.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
BASE_URL = "https://your-api-base-url.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

# Step 1: Get access token
logger.info("Getting access token...")
token_response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
access_token = token_response.json()["access_token"]
logger.info("Authenticated")

# Step 2: Prepare request
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"jobs_by_state": {
"terms": {
"field": "job.state",
"size": 20
}
}
}
}

headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

# Step 3: Make request
logger.info("Fetching job aggregations...")
response = requests.post(
f"{BASE_URL}/v3/jobs/aggs",
json=payload,
headers=headers
)

# Step 4: Process response
if response.status_code == 200:
data = response.json()
aggregations = data.get("data", {}).get("aggregations", {})
total = data.get("data", {}).get("total", 0)

logger.info(f"Total jobs analyzed: {total:,}")
logger.info(f"Trace ID: {data.get('meta', {}).get('trace_id', 'N/A')}")
logger.info(f"Remaining: {data.get('meta', {}).get('rate_limit', {}).get('daily_remaining', 'N/A')}")

# Display job state distribution
buckets = aggregations.get("jobs_by_state", {}).get("buckets", [])
logger.info("\nJob State Distribution:")
for bucket in buckets:
state = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total * 100) if total > 0 else 0
logger.info(f" {state}: {count:,} ({percentage:.2f}%)")
else:
logger.error(f"Request failed: {response.status_code}")
logger.error(f" {response.text}")