Advanced Example
Advanced examples for v2/workloads/scroll API showing different use cases.
Demonstrates:
- Basic workload query
- Terms filter for specific workflows
- Complex bool queries with must and must_not clauses
- Field selection to reduce response size
- Pagination handling
Code Example
import requests
import logging
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]
def example_1_basic_query():
"""Example 1: Basic query for all workloads in a domain."""
logger.info("="*60)
logger.info("Example 1: Basic Query - All workloads from specific domain")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=2)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["prod.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 100
}
response = requests.post(
f"{BASE_URL}/v2/workloads/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Workloads retrieved: {len(data.get('data', {}).get('workloads', []))}")
logger.info(f"Total available: {data.get('meta', {}).get('workloads_available', 0)}")
def example_2_with_terms_filter():
"""Example 2: Query with terms filter for specific workflows."""
logger.info("="*60)
logger.info("Example 2: Terms Filter - Specific workflow names")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=6)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 50,
"filters": [
{
"terms": {
"workflow.name": ["ATA-Hook_INCIDENT", "ATR_Scheduled_Workflow"]
}
}
]
}
response = requests.post(
f"{BASE_URL}/v2/workloads/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Filtered workloads found: {len(data.get('data', {}).get('workloads', []))}")
def example_3_complex_bool_query():
"""Example 3: Complex bool query with must and must_not clauses."""
logger.info("="*60)
logger.info("Example 3: Complex Bool Query - SR_TASK tickets that didn't fail")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=12)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 20,
"fields": [
"@timestamp",
"workload_id",
"status",
"workflow.name",
"ticket.type",
"workload.trigger",
"workload.state"
],
"filters": [
{
"bool": {
"must": [
{"term": {"ticket.type": "SR_TASK"}},
{"term": {"workload.trigger": "TICKET"}}
],
"must_not": [
{"term": {"workload.state": "FAILED"}}
]
}
}
]
}
response = requests.post(
f"{BASE_URL}/v2/workloads/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
workloads = data.get('data', {}).get('workloads', [])
logger.info(f"Status: {response.status_code}")
logger.info(f"Workloads retrieved: {len(workloads)}")
if workloads:
logger.info("First workload sample (only selected fields returned):")
source = workloads[0].get('_source', {})
logger.info(f" Timestamp: {source.get('@timestamp', 'N/A')}")
logger.info(f" Workload ID: {source.get('workload_id', 'N/A')}")
logger.info(f" Status: {source.get('status', 'N/A')}")
logger.info(f" Workflow: {source.get('workflow', {}).get('name', 'N/A')}")
logger.info(f" Ticket Type: {source.get('ticket', {}).get('type', 'N/A')}")
logger.info(f" Workload Trigger: {source.get('workload', {}).get('trigger', 'N/A')}")
logger.info(f" Workload State: {source.get('workload', {}).get('state', 'N/A')}")
def example_4_field_selection():
"""Example 4: Query with specific field selection to reduce response size."""
logger.info("="*60)
logger.info("Example 4: Field Selection - Only essential fields")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=30)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 20,
"fields": [
"@timestamp",
"workload_id",
"status",
"workflow.name"
]
}
response = requests.post(
f"{BASE_URL}/v2/workloads/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
workloads = data.get('data', {}).get('workloads', [])
logger.info(f"Status: {response.status_code}")
logger.info(f"Workloads retrieved: {len(workloads)}")
if workloads:
logger.info("First workload (only selected fields):")
logger.info(f" Fields returned: {list(workloads[0].get('_source', {}).keys())}")
def example_5_pagination():
"""Example 5: Using scroll_id for pagination."""
logger.info("="*60)
logger.info("Example 5: Pagination - Using scroll_id")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
# Initial request
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 50
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
response = requests.post(f"{BASE_URL}/v2/workloads/scroll", json=payload, headers=headers)
data = response.json()
workloads_page_1 = data.get('data', {}).get('workloads', [])
scroll_id = data.get('meta', {}).get('scroll_id')
logger.info(f"Page 1: Retrieved {len(workloads_page_1)} workloads")
if scroll_id:
# Follow-up request with scroll_id
logger.info("Fetching page 2 using scroll_id...")
payload = {"scroll_id": scroll_id}
response = requests.post(f"{BASE_URL}/v2/workloads/scroll", json=payload, headers=headers)
data = response.json()
workloads_page_2 = data.get('data', {}).get('workloads', [])
logger.info(f"Page 2: Retrieved {len(workloads_page_2)} workloads")
logger.info(f"Total: {len(workloads_page_1) + len(workloads_page_2)} workloads")
else:
logger.info("No more pages available")
def main():
"""Run all examples."""
logger.info("="*60)
logger.info("v2/workloads/scroll API - Advanced Examples")
logger.info("="*60)
try:
example_1_basic_query()
logger.info("")
example_2_with_terms_filter()
logger.info("")
example_3_complex_bool_query()
logger.info("")
example_4_field_selection()
logger.info("")
example_5_pagination()
logger.info("="*60)
logger.info("All examples completed!")
logger.info("="*60)
except Exception as e:
logger.error(f"Error: {e}")
if __name__ == "__main__":
main()