Skip to main content

Advanced Example

Advanced example for v3/tickets/aggs API - Complex nested aggregations and multiple query patterns.

This example demonstrates various aggregation patterns including nested aggregations, filter sub-aggregations, filters, and combining multiple analysis techniques for ticket data analysis.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

BASE_URL = "https://your-api-base-url.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]

def example_1_simple_terms():
"""Example 1: Simple terms aggregation - Count by ticket type."""
logger.info("=" * 60)
logger.info("Example 1: Simple Terms Aggregation - Ticket Types")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"by_type": {
"terms": {
"field": "ticket.type",
"size": 20
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/tickets/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Total tickets: {data.get('data', {}).get('total', 0):,}")

buckets = data.get('data', {}).get('aggregations', {}).get('by_type', {}).get('buckets', [])
logger.info("\nTicket types:")
for i, bucket in enumerate(buckets, 1):
logger.info(f" {i:2d}. {bucket['key']}: {bucket['doc_count']:,}")

def example_2_date_histogram():
"""Example 2: Date histogram - Tickets per day."""
logger.info("=" * 60)
logger.info("Example 2: Date Histogram - Daily Ticket Volume")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=14)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["your-domain.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"tickets_per_day": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/tickets/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

buckets = data.get('data', {}).get('aggregations', {}).get('tickets_per_day', {}).get('buckets', [])
logger.info("\nDaily distribution:")
for bucket in buckets:
date = bucket['key_as_string'][:10]
count = bucket['doc_count']
logger.info(f" {date}: {count:,} tickets")

def example_3_nested_aggregation():
"""Example 3: Nested aggregation - Domains with ticket type breakdown."""
logger.info("=" * 60)
logger.info("Example 3: Nested Aggregation - Domains with Ticket Types")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
},
"aggs": {
"by_type": {
"terms": {
"field": "ticket.type",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/tickets/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

domain_buckets = data.get('data', {}).get('aggregations', {}).get('by_domain', {}).get('buckets', [])
for domain_bucket in domain_buckets:
domain = domain_bucket['key']
total = domain_bucket['doc_count']
logger.info(f"\nDomain: {domain} ({total:,} tickets)")

type_buckets = domain_bucket.get('by_type', {}).get('buckets', [])
for type_bucket in type_buckets:
ticket_type = type_bucket['key']
count = type_bucket['doc_count']
percentage = (count / total * 100) if total > 0 else 0
logger.info(f" {ticket_type:20s}: {count:>8,} ({percentage:>5.1f}%)")

def example_4_filter_with_subaggs():
"""Example 4: Filter with sub-aggregations - Incident analysis over time."""
logger.info("=" * 60)
logger.info("Example 4: Filter with Sub-aggregations - Incident Timeline")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"incident_analysis": {
"filter": {
"term": {"ticket.type": "INCIDENT"}
},
"aggs": {
"daily_counts": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/tickets/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")

incident_agg = data.get('data', {}).get('aggregations', {}).get('incident_analysis', {})
total_incidents = incident_agg.get('doc_count', 0)
logger.info(f"Total incidents: {total_incidents:,}")

daily_buckets = incident_agg.get('daily_counts', {}).get('buckets', [])
logger.info("\nDaily incident distribution:")
for bucket in daily_buckets[-7:]:
date = bucket['key_as_string'][:10]
count = bucket['doc_count']
logger.info(f" {date}: {count:,} incidents")

def example_5_with_filters():
"""Example 5: Using request-level filters with aggregations."""
logger.info("=" * 60)
logger.info("Example 5: Request Filters - High Priority Incident Analysis")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=90)

payload = {
"application": "atr",
"app_type": ["kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": True,
"filters": [
{"term": {"ticket.type": "INCIDENT"}},
{"range": {"ticket.priority": {"lte": 2}}}
],
"aggs": {
"by_priority": {
"terms": {
"field": "ticket.priority",
"size": 10
}
},
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 20
},
"aggs": {
"by_state": {
"terms": {
"field": "ticket.state",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/tickets/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
aggs = data.get('data', {}).get('aggregations', {})
total = data.get('data', {}).get('total', 0)

logger.info(f"Status: {response.status_code}")
logger.info(f"Total high-priority incidents: {total:,}")

# Priority breakdown
logger.info("\nBy priority:")
for bucket in aggs.get('by_priority', {}).get('buckets', []):
logger.info(f" {bucket['key']}: {bucket['doc_count']:,}")

# Domain breakdown with state
logger.info("\nBy domain:")
for domain_bucket in aggs.get('by_domain', {}).get('buckets', [])[:5]:
domain = domain_bucket['key']
count = domain_bucket['doc_count']
logger.info(f"\n {domain}: {count:,} tickets")
for state_bucket in domain_bucket.get('by_state', {}).get('buckets', []):
logger.info(f" {state_bucket['key']:15s}: {state_bucket['doc_count']:,}")

def example_6_complex_multi_agg():
"""Example 6: Complex multi-aggregation analysis."""
logger.info("=" * 60)
logger.info("Example 6: Complex Multi-Aggregation Analysis")
logger.info("=" * 60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"by_type": {
"terms": {
"field": "ticket.type",
"size": 10
}
},
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
}
},
"daily_volume": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
},
"resolved_count": {
"filter": {
"term": {"ticket.resolved": True}
}
},
"incident_by_domain": {
"filter": {
"term": {"ticket.type": "INCIDENT"}
},
"aggs": {
"by_domain": {
"terms": {
"field": "atr.domain_name",
"size": 5
}
}
}
}
}
}

response = requests.post(
f"{BASE_URL}/v3/tickets/aggs",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
aggs = data.get('data', {}).get('aggregations', {})
total = data.get('data', {}).get('total', 0)

logger.info(f"Status: {response.status_code}")
logger.info(f"Total tickets: {total:,}")

# Ticket types
logger.info("\nTop ticket types:")
for i, bucket in enumerate(aggs.get('by_type', {}).get('buckets', [])[:5], 1):
count = bucket['doc_count']
pct = (count / total * 100) if total > 0 else 0
logger.info(f" {i}. {bucket['key']:20s}: {count:>8,} ({pct:>5.2f}%)")

# Domains
logger.info("\nTop domains:")
for bucket in aggs.get('by_domain', {}).get('buckets', []):
logger.info(f" {bucket['key']:30s}: {bucket['doc_count']:>8,}")

# Resolved
resolved = aggs.get('resolved_count', {}).get('doc_count', 0)
resolved_pct = (resolved / total * 100) if total > 0 else 0
logger.info(f"\nResolved: {resolved:,} ({resolved_pct:.2f}%)")

# Peak day
daily_buckets = aggs.get('daily_volume', {}).get('buckets', [])
if daily_buckets:
peak_day = max(daily_buckets, key=lambda x: x['doc_count'])
logger.info(f"Peak day: {peak_day['key_as_string'][:10]} with {peak_day['doc_count']:,} tickets")

# Incidents by domain
incident_agg = aggs.get('incident_by_domain', {})
incident_total = incident_agg.get('doc_count', 0)
logger.info(f"\nIncidents: {incident_total:,} total")
logger.info(" By domain:")
for bucket in incident_agg.get('by_domain', {}).get('buckets', []):
logger.info(f" {bucket['key']:28s}: {bucket['doc_count']:>8,}")

def main():
"""Run all examples."""
logger.info("=" * 60)
logger.info("v3/tickets/aggs API - Advanced Examples")
logger.info("=" * 60)

try:
example_1_simple_terms()
example_2_date_histogram()
example_3_nested_aggregation()
example_4_filter_with_subaggs()
example_5_with_filters()
example_6_complex_multi_agg()

logger.info("=" * 60)
logger.info("All examples completed!")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Error: {e}")

if __name__ == "__main__":
main()