Skip to main content

Moderate Example

Moderate example for v3/metrics/query API - Complete pagination workflow.

This example demonstrates using OpenSearch filters and field selection to retrieve specific metrics across multiple pages using scroll-based pagination.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
BASE_URL = "https://your-api-base-url.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]

def fetch_metrics_with_pagination(access_token):
"""Fetch metrics using the query API with scroll pagination.

Returns all metrics matching the query by following scroll_id pagination.
"""
url = f"{BASE_URL}/v3/metrics/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

# Calculate time range (last 30 days)
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)

# Initial request payload with filters and field selection
payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 1000,
"includes_eu": False,
"fields": [
"cloud.availability_zone",
"atr.client",
"atr.infra_type",
"atr.domain_name",
"@timestamp"
],
"filters": [
{
"term": {"cloud.availability_zone": "ap-southeast-2"}
}
]
}

all_metrics = []
page = 1

logger.info(f"Fetching metrics from {start_time} to {end_time}...")
logger.info("Filter: ap-southeast-2 metrics only")

while True:
logger.info(f"Fetching page {page}...")

response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()

hits = data.get("data", {}).get("hits", [])
all_metrics.extend(hits)
logger.info(f"Retrieved {len(hits)} metrics (total: {len(all_metrics)})")

# Check pagination
pagination = data.get("meta", {}).get("pagination", {})
scroll_id = pagination.get("scroll_id")

if not scroll_id:
logger.info("Last page reached.")
break

# Build pagination request
payload = {
"scroll_id": scroll_id,
"size": 1000,
"fields": [
"cloud.availability_zone",
"atr.client",
"atr.infra_type",
"atr.domain_name",
"@timestamp"
]
}

page += 1

return all_metrics

def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v3/metrics/query API Example")
logger.info("=" * 60)

# Step 1: Get access token
logger.info("1. Authenticating...")
try:
access_token = get_access_token()
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
return

# Step 2: Fetch metrics with pagination
logger.info("2. Fetching metrics...")
try:
metrics = fetch_metrics_with_pagination(access_token)
logger.info(f"Successfully retrieved {len(metrics)} total metrics")
except Exception as e:
logger.error(f"Failed to fetch metrics: {e}")
return

# Step 3: Process results
logger.info("3. Sample results:")
for i, hit in enumerate(metrics[:3], 1):
source = hit.get("_source", {})
cloud = source.get("cloud", {})
atr = source.get("atr", {})
logger.info(f"Metric {i}:")
logger.info(f" - Availability Zone: {cloud.get('availability_zone', 'N/A')}")
logger.info(f" - Client: {atr.get('client', 'N/A')}")
logger.info(f" - Infra Type: {atr.get('infra_type', 'N/A')}")
logger.info(f" - Domain: {atr.get('domain_name', 'N/A')}")
logger.info(f" - Timestamp: {source.get('@timestamp', 'N/A')}")

if len(metrics) > 3:
logger.info(f"... and {len(metrics) - 3} more metrics")

logger.info("=" * 60)
logger.info("Complete!")
logger.info("=" * 60)

if __name__ == "__main__":
main()