Skip to main content

Moderate Example

Moderate example for v3/jobs/aggs API - Multiple aggregations with time-based analysis.

Useful for analyzing job execution patterns over time and combining multiple aggregation types.

This example demonstrates using date histograms alongside terms aggregations and filter aggregations to understand job trends.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
BASE_URL = "https://your-api-base-url.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]

def fetch_job_aggregations(access_token):
"""Fetch job aggregations with multiple aggregation types."""
url = f"{BASE_URL}/v3/jobs/aggs"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

# Calculate time range (last 90 days)
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=90)

# Request payload with multiple aggregations
payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["your-domain-1.example.com", "your-domain-2.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"jobs_by_state": {
"terms": {
"field": "job.state",
"size": 20
}
},
"jobs_over_time": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
},
"failed_count": {
"filter": {
"term": {"job.state": "FAILED"}
}
}
}
}

logger.info(f"Fetching aggregations from {start_time} to {end_time}...")

response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()

return response.json()

def display_results(data):
"""Display aggregation results in a readable format."""
aggregations = data.get("data", {}).get("aggregations", {})
total = data.get("data", {}).get("total", 0)

logger.info("=" * 60)
logger.info(f"Total Jobs Analyzed: {total:,}")
logger.info("=" * 60)

# Display job state distribution
logger.info("\n1. Job States:")
buckets = aggregations.get("jobs_by_state", {}).get("buckets", [])
for i, bucket in enumerate(buckets, 1):
state = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total * 100) if total > 0 else 0
logger.info(f" {i:2d}. {state:20s}: {count:>10,} ({percentage:>5.2f}%)")

# Display failed count
failed = aggregations.get("failed_count", {}).get("doc_count", 0)
failed_pct = (failed / total * 100) if total > 0 else 0
logger.info(f"\n2. Failed Jobs: {failed:,} ({failed_pct:.2f}%)")

# Display time-based distribution
logger.info("\n3. Daily Job Volume (last 7 days):")
time_buckets = aggregations.get("jobs_over_time", {}).get("buckets", [])
for bucket in time_buckets[-7:]:
date = bucket.get("key_as_string", "")[:10]
count = bucket.get("doc_count")
logger.info(f" {date}: {count:>10,} jobs")

def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v3/jobs/aggs API Example")
logger.info("=" * 60)

# Step 1: Get access token
logger.info("\n1. Authenticating...")
try:
access_token = get_access_token()
logger.info(" Authentication successful")
except Exception as e:
logger.error(f" Authentication failed: {e}")
return

# Step 2: Fetch aggregations
logger.info("\n2. Fetching job aggregations...")
try:
data = fetch_job_aggregations(access_token)
logger.info(" Successfully retrieved aggregations")
except Exception as e:
logger.error(f" Failed to fetch aggregations: {e}")
return

# Step 3: Display results
logger.info("\n3. Results:")
display_results(data)

logger.info("\n" + "=" * 60)
logger.info("Complete!")
logger.info("=" * 60)

if __name__ == "__main__":
main()