Skip to main content

Moderate Example

Moderate example for v3/tickets/aggs API - Multiple aggregations with time-based analysis.

Useful for analyzing ticket patterns over time and combining multiple aggregation types.

This example demonstrates using date histograms alongside terms aggregations and filter aggregations to understand ticket trends.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
BASE_URL = "https://your-api-base-url.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]

def fetch_ticket_aggregations(access_token):
"""Fetch ticket aggregations with multiple aggregation types."""
url = f"{BASE_URL}/v3/tickets/aggs"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

# Calculate time range (last 90 days)
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=90)

# Request payload with multiple aggregations
payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["your-domain-1.example.com", "your-domain-2.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"includes_eu": False,
"aggs": {
"tickets_by_type": {
"terms": {
"field": "ticket.type",
"size": 20
}
},
"tickets_over_time": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "1d"
}
},
"resolved_count": {
"filter": {
"term": {"ticket.resolved": True}
}
}
}
}

logger.info(f"Fetching aggregations from {start_time} to {end_time}...")

response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()

return response.json()

def display_results(data):
"""Display aggregation results in a readable format."""
aggregations = data.get("data", {}).get("aggregations", {})
total = data.get("data", {}).get("total", 0)

logger.info("=" * 60)
logger.info(f"Total Tickets Analyzed: {total:,}")
logger.info("=" * 60)

# Display ticket type distribution
logger.info("\n1. Ticket Types:")
buckets = aggregations.get("tickets_by_type", {}).get("buckets", [])
for i, bucket in enumerate(buckets, 1):
ticket_type = bucket.get("key")
count = bucket.get("doc_count")
percentage = (count / total * 100) if total > 0 else 0
logger.info(f" {i:2d}. {ticket_type:20s}: {count:>10,} ({percentage:>5.2f}%)")

# Display resolved count
resolved = aggregations.get("resolved_count", {}).get("doc_count", 0)
resolved_pct = (resolved / total * 100) if total > 0 else 0
logger.info(f"\n2. Resolved Tickets: {resolved:,} ({resolved_pct:.2f}%)")

# Display time-based distribution
logger.info("\n3. Daily Ticket Volume (last 7 days):")
time_buckets = aggregations.get("tickets_over_time", {}).get("buckets", [])
for bucket in time_buckets[-7:]:
date = bucket.get("key_as_string", "")[:10]
count = bucket.get("doc_count")
logger.info(f" {date}: {count:>10,} tickets")

def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v3/tickets/aggs API Example")
logger.info("=" * 60)

# Step 1: Get access token
logger.info("\n1. Authenticating...")
try:
access_token = get_access_token()
logger.info(" Authentication successful")
except Exception as e:
logger.error(f" Authentication failed: {e}")
return

# Step 2: Fetch aggregations
logger.info("\n2. Fetching ticket aggregations...")
try:
data = fetch_ticket_aggregations(access_token)
logger.info(" Successfully retrieved aggregations")
except Exception as e:
logger.error(f" Failed to fetch aggregations: {e}")
return

# Step 3: Display results
logger.info("\n3. Results:")
display_results(data)

logger.info("\n" + "=" * 60)
logger.info("Complete!")
logger.info("=" * 60)

if __name__ == "__main__":
main()