Moderate Example
Moderate example for v3/jobs/query API - Complete pagination workflow.
This example demonstrates using OpenSearch filters and field selection to retrieve specific job records across multiple pages using Point in Time (PIT) pagination.
Code Example
import requests
import logging
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
BASE_URL = "https://your-api-base-url.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]
def fetch_jobs_with_pagination(access_token):
"""Fetch jobs using the query API with PIT pagination.
Returns all jobs matching the query by following pit_id/search_after pagination.
"""
url = f"{BASE_URL}/v3/jobs/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
# Calculate time range (last 30 days)
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)
# Initial request payload with filters and field selection
payload = {
"application": "atr",
"app_type": ["ec2", "kubernetes"],
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 1000,
"includes_eu": False,
"fields": [
"job.name",
"job.state",
"job.execution_time.elapsed",
"workflow.name",
"atr.domain_name"
],
"filters": [
{
"term": {"job.state": "FAILED"}
}
]
}
all_jobs = []
page = 1
logger.info(f"Fetching jobs from {start_time} to {end_time}...")
logger.info("Filter: failed jobs only")
while True:
logger.info(f"Fetching page {page}...")
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
hits = data.get("data", {}).get("hits", [])
all_jobs.extend(hits)
logger.info(f"Retrieved {len(hits)} jobs (total: {len(all_jobs)})")
# Check pagination
pagination = data.get("meta", {}).get("pagination", {})
pit_id = pagination.get("pit_id")
search_after = pagination.get("search_after")
if not pit_id or not search_after:
logger.info("Last page reached.")
break
# Build pagination request
payload = {
"pit_id": pit_id,
"search_after": search_after,
"size": 1000,
"fields": [
"job.name",
"job.state",
"job.execution_time.elapsed",
"workflow.name",
"atr.domain_name"
]
}
page += 1
return all_jobs
def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v3/jobs/query API Example")
logger.info("=" * 60)
# Step 1: Get access token
logger.info("1. Authenticating...")
try:
access_token = get_access_token()
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
return
# Step 2: Fetch jobs with pagination
logger.info("2. Fetching jobs...")
try:
jobs = fetch_jobs_with_pagination(access_token)
logger.info(f"Successfully retrieved {len(jobs)} total jobs")
except Exception as e:
logger.error(f"Failed to fetch jobs: {e}")
return
# Step 3: Process results
logger.info("3. Sample results:")
for i, hit in enumerate(jobs[:3], 1):
source = hit.get("_source", {})
job = source.get("job", {})
workflow = source.get("workflow", {})
atr = source.get("atr", {})
logger.info(f"Job {i}:")
logger.info(f" - Name: {job.get('name', 'N/A')}")
logger.info(f" - State: {job.get('state', 'N/A')}")
logger.info(f" - Duration: {job.get('execution_time', {}).get('elapsed', 'N/A')}s")
logger.info(f" - Workflow: {workflow.get('name', 'N/A')}")
logger.info(f" - Domain: {atr.get('domain_name', 'N/A')}")
if len(jobs) > 3:
logger.info(f"... and {len(jobs) - 3} more jobs")
logger.info("=" * 60)
logger.info("Complete!")
logger.info("=" * 60)
if __name__ == "__main__":
main()