Moderate Example
Moderate example for v2/workloads/scroll API - Complete pagination workflow.
This example demonstrates using OpenSearch filters and field selection to retrieve specific workloads across multiple pages.
Code Example
import requests
import json
import logging
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]
def fetch_workloads_with_scroll(access_token):
"""Fetch workloads using the scroll API with pagination.
Returns all workloads matching the query by following scroll_id pagination.
"""
url = f"{BASE_URL}/v2/workloads/scroll"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
# Calculate time range (last 3 hours)
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=3)
# Initial request payload with filters and field selection
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 100,
"fields": [
"@timestamp",
"workload_id",
"status",
"workflow.name"
],
"filters": [
{
"terms": {
"workflow.name": ["ATA-Hook_INCIDENT", "ATR_Scheduled_Workflow"]
}
}
]
}
all_workloads = []
scroll_id = None
page = 1
logger.info(f"Fetching workloads from {start_time} to {end_time}...")
logger.info(f"Filters: workflow.name in ['ATA-Hook_INCIDENT', 'ATR_Scheduled_Workflow']")
while True:
# Use scroll_id for subsequent requests
if scroll_id:
payload = {"scroll_id": scroll_id}
logger.info(f"Fetching page {page}...")
else:
logger.info("Fetching initial page...")
# Make request
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
# Extract workloads from response
workloads = data.get("data", {}).get("workloads", [])
all_workloads.extend(workloads)
logger.info(f"Retrieved {len(workloads)} workloads (total: {len(all_workloads)})")
# Check if there are more pages
scroll_id = data.get("meta", {}).get("scroll_id")
if not scroll_id or len(workloads) == 0:
logger.info("No more pages available.")
break
page += 1
return all_workloads
def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v2/workloads/scroll API Example")
logger.info("=" * 60)
# Step 1: Get access token
logger.info("1. Authenticating...")
try:
access_token = get_access_token()
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
return
# Step 2: Fetch workloads with pagination
logger.info("2. Fetching workloads...")
try:
workloads = fetch_workloads_with_scroll(access_token)
logger.info(f"Successfully retrieved {len(workloads)} total workloads")
except Exception as e:
logger.error(f"Failed to fetch workloads: {e}")
return
# Step 3: Process results
logger.info("3. Sample results:")
for i, workload in enumerate(workloads[:3], 1):
source = workload.get('_source', {})
logger.info(f"Workload {i}:")
logger.info(f" - Timestamp: {source.get('@timestamp', 'N/A')}")
logger.info(f" - Workload ID: {source.get('workload_id', 'N/A')}")
logger.info(f" - Status: {source.get('status', 'N/A')}")
logger.info(f" - Workflow: {source.get('workflow', {}).get('name', 'N/A')}")
if len(workloads) > 3:
logger.info(f"... and {len(workloads) - 3} more workloads")
logger.info("=" * 60)
logger.info("Complete!")
logger.info("=" * 60)
if __name__ == "__main__":
main()