Skip to main content

Advanced Example

Advanced example for v3/jobs/aggs API - Complex nested aggregations and multiple query patterns.

This example demonstrates various aggregation patterns including nested aggregations, filter sub-aggregations, filters, and combining multiple analysis techniques for job execution data analysis.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

BASE_URL = "https://your-api-base-url.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]

def example_1_simple_terms():
"""Example 1: Simple terms aggregation - Count by job state."""
logger.info("=" * 60)
logger.info("Example 1: Simple Terms Aggregation - Job States")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"by_state": {
"terms": {
"field": "job.state",
"size": 20
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/jobs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Total jobs: {data.get('data', {}).get('total', 0):,}")

buckets = data.get('data', {}).get('aggregations', {}).get('by_state', {}).get('buckets', [])
logger.info("\nJob states:")
for i, bucket in enumerate(buckets, 1):
logger.info(f" {i:2d}. {bucket['key']}: {bucket['doc_count']:,}")

def example_2_date_histogram():
"""Example 2: Date histogram - Jobs per day."""
logger.info("=" * 60)
logger.info("Example 2: Date Histogram - Daily Job Volume")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=14)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["your-domain.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"jobs_per_day": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/jobs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

buckets = data.get('data', {}).get('aggregations', {}).get('jobs_per_day', {}).get('buckets', [])
logger.info("\nDaily distribution:")
for bucket in buckets:
date = bucket['key_as_string'][:10]
count = bucket['doc_count']
logger.info(f" {date}: {count:,} jobs")

def example_3_nested_aggregation():
"""Example 3: Nested aggregation - Domains with job name breakdown."""
logger.info("=" * 60)
logger.info("Example 3: Nested Aggregation - Domains with Job Names")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
},
"aggs": {
"by_job_name": {
"terms": {
"field": "job.name",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/jobs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

domain_buckets = data.get('data', {}).get('aggregations', {}).get('by_domain', {}).get('buckets', [])
for domain_bucket in domain_buckets:
domain = domain_bucket['key']
total = domain_bucket['doc_count']
logger.info(f"\nDomain: {domain} ({total:,} jobs)")

job_buckets = domain_bucket.get('by_job_name', {}).get('buckets', [])
for job_bucket in job_buckets:
job_name = job_bucket['key']
count = job_bucket['doc_count']
percentage = (count / total * 100) if total > 0 else 0
logger.info(f" {job_name:30s}: {count:>8,} ({percentage:>5.1f}%)")

def example_4_filter_with_subaggs():
"""Example 4: Filter with sub-aggregations - Failed job analysis over time."""
logger.info("=" * 60)
logger.info("Example 4: Filter with Sub-aggregations - Failed Job Timeline")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"failed_analysis": {
"filter": {
"term": {"job.state": "FAILED"}
},
"aggs": {
"daily_counts": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/jobs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

failed_agg = data.get('data', {}).get('aggregations', {}).get('failed_analysis', {})
total_failed = failed_agg.get('doc_count', 0)
logger.info(f"Total failed jobs: {total_failed:,}")

daily_buckets = failed_agg.get('daily_counts', {}).get('buckets', [])
logger.info("\nDaily failure distribution:")
for bucket in daily_buckets[-7:]:
date = bucket['key_as_string'][:10]
count = bucket['doc_count']
logger.info(f" {date}: {count:,} failures")

def example_5_with_filters():
"""Example 5: Using request-level filters with aggregations."""
logger.info("=" * 60)
logger.info("Example 5: Request Filters - Long-running Failed Job Analysis")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=90)

payload = {
"application": "atr",
"app_type": ["kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": True,
"filters": [
{"term": {"job.state": "FAILED"}},
{"range": {"job.execution_time.elapsed": {"gte": 60}}}
],
"aggs": {
"by_workflow": {
"terms": {
"field": "workflow.name",
"size": 10
}
},
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 20
},
"aggs": {
"by_job_name": {
"terms": {
"field": "job.name",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/jobs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
aggs = data.get('data', {}).get('aggregations', {})
total = data.get('data', {}).get('total', 0)

logger.info(f"Status: {response.status_code}")
logger.info(f"Total long-running failed jobs: {total:,}")

# Workflow breakdown
logger.info("\nBy workflow:")
for bucket in aggs.get('by_workflow', {}).get('buckets', []):
logger.info(f" {bucket['key']}: {bucket['doc_count']:,}")

# Domain breakdown with job names
logger.info("\nBy domain:")
for domain_bucket in aggs.get('by_domain', {}).get('buckets', [])[:5]:
domain = domain_bucket['key']
count = domain_bucket['doc_count']
logger.info(f"\n {domain}: {count:,} jobs")
for job_bucket in domain_bucket.get('by_job_name', {}).get('buckets', []):
logger.info(f" {job_bucket['key']:30s}: {job_bucket['doc_count']:,}")

def example_6_complex_multi_agg():
"""Example 6: Complex multi-aggregation analysis."""
logger.info("=" * 60)
logger.info("Example 6: Complex Multi-Aggregation Analysis")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"by_state": {
"terms": {
"field": "job.state",
"size": 10
}
},
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
}
},
"daily_volume": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
},
"failed_count": {
"filter": {
"term": {"job.state": "FAILED"}
}
},
"failed_by_domain": {
"filter": {
"term": {"job.state": "FAILED"}
},
"aggs": {
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/jobs/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
aggs = data.get('data', {}).get('aggregations', {})
total = data.get('data', {}).get('total', 0)

logger.info(f"Status: {response.status_code}")
logger.info(f"Total jobs: {total:,}")

# Job states
logger.info("\nTop job states:")
for i, bucket in enumerate(aggs.get('by_state', {}).get('buckets', [])[:5], 1):
count = bucket['doc_count']
pct = (count / total * 100) if total > 0 else 0
logger.info(f" {i}. {bucket['key']:20s}: {count:>8,} ({pct:>5.2f}%)")

# Domains
logger.info("\nTop domains:")
for bucket in aggs.get('by_domain', {}).get('buckets', []):
logger.info(f" {bucket['key']:30s}: {bucket['doc_count']:>8,}")

# Failed
failed = aggs.get('failed_count', {}).get('doc_count', 0)
failed_pct = (failed / total * 100) if total > 0 else 0
logger.info(f"\nFailed: {failed:,} ({failed_pct:.2f}%)")

# Peak day
daily_buckets = aggs.get('daily_volume', {}).get('buckets', [])
if daily_buckets:
peak_day = max(daily_buckets, key=lambda x: x['doc_count'])
logger.info(f"Peak day: {peak_day['key_as_string'][:10]} with {peak_day['doc_count']:,} jobs")

# Failed by domain
failed_agg = aggs.get('failed_by_domain', {})
failed_total = failed_agg.get('doc_count', 0)
logger.info(f"\nFailed jobs: {failed_total:,} total")
logger.info(" By domain:")
for bucket in failed_agg.get('by_domain', {}).get('buckets', []):
logger.info(f" {bucket['key']:28s}: {bucket['doc_count']:>8,}")

def main():
"""Run all examples."""
logger.info("=" * 60)
logger.info("v3/jobs/aggs API - Advanced Examples")
logger.info("=" * 60)

try:
example_1_simple_terms()
example_2_date_histogram()
example_3_nested_aggregation()
example_4_filter_with_subaggs()
example_5_with_filters()
example_6_complex_multi_agg()

logger.info("=" * 60)
logger.info("All examples completed!")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Error: {e}")

if __name__ == "__main__":
main()