Advanced Example
Advanced examples for v2/workloads/batch API showing different use cases.
Demonstrates:
- Basic batch request
- Batch with terms filter
- Batch with complex bool queries and field selection
- Handling large time ranges with multiple partitions
Code Example
import requests
import logging
import time
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]
def poll_until_complete(access_token, token, max_wait=300):
"""Poll batch/status endpoint until complete or timeout."""
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
start = time.time()
while time.time() - start < max_wait:
time.sleep(5)
response = requests.post(
f"{BASE_URL}/v2/workloads/batch/status",
json={"token": token},
headers=headers
)
if response.status_code == 200:
data = response.json()
partitions = data.get("presigned_urls", [])
completed = sum(1 for p in partitions if p.get("completed"))
if completed == len(partitions):
return partitions
return None
def example_1_basic_batch():
"""Example 1: Basic batch request for all workloads in a domain."""
logger.info("="*60)
logger.info("Example 1: Basic Batch - All workloads from specific domain")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["prod.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
response = requests.post(
f"{BASE_URL}/v2/workloads/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Note: Use v2/workloads/batch/status with this token to check status and get download URLs")
def example_2_with_terms_filter():
"""Example 2: Batch request with terms filter for specific workflows."""
logger.info("="*60)
logger.info("Example 2: Batch with Terms Filter - Specific workflow names")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=48)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"filters": [
{
"terms": {
"workflow.name": ["ATA-Hook_INCIDENT", "ATR_Scheduled_Workflow"]
}
}
]
}
response = requests.post(
f"{BASE_URL}/v2/workloads/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Filters applied: workflow.name in ['ATA-Hook_INCIDENT', 'ATR_Scheduled_Workflow']")
def example_3_complex_bool_query():
"""Example 3: Batch with complex bool query and field selection."""
logger.info("="*60)
logger.info("Example 3: Complex Bool Query - SR_TASK tickets that didn't fail")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=3)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"fields": [
"@timestamp",
"workload_id",
"status",
"workflow.name",
"ticket.type",
"workload.trigger",
"workload.state"
],
"filters": [
{
"bool": {
"must": [
{"term": {"ticket.type": "SR_TASK"}},
{"term": {"workload.trigger": "TICKET"}}
],
"must_not": [
{"term": {"workload.state": "FAILED"}}
]
}
}
]
}
response = requests.post(
f"{BASE_URL}/v2/workloads/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Complex bool query with field selection applied")
def example_4_field_selection():
"""Example 4: Batch with field selection to reduce response size."""
logger.info("="*60)
logger.info("Example 4: Field Selection - Only essential fields")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"fields": [
"@timestamp",
"workload_id",
"status",
"workflow.name"
]
}
response = requests.post(
f"{BASE_URL}/v2/workloads/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)
data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Field selection applied: Only timestamp, workload_id, status, and workflow.name")
def example_5_complete_workflow():
"""Example 5: Complete workflow with polling and download."""
logger.info("="*60)
logger.info("Example 5: Complete Workflow - Submit, poll, and download")
logger.info("="*60)
access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=12)
# Submit request
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 10000
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
logger.info("Submitting batch request...")
response = requests.post(f"{BASE_URL}/v2/workloads/batch", json=payload, headers=headers)
data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Token received: {token}")
logger.info("Polling for completion...")
# Poll for completion
partitions = poll_until_complete(access_token, token)
if partitions:
logger.info(f"Processing complete! {len(partitions)} partitions ready")
total_workloads = sum(p.get("workload_count", 0) for p in partitions)
logger.info(f"Total workloads available: {total_workloads}")
for i, partition in enumerate(partitions, 1):
logger.info(f"Partition {i}:")
logger.info(f" - Time: {partition.get('start_time')} to {partition.get('end_time')}")
logger.info(f" - Workloads: {partition.get('workload_count')}")
logger.info(f" - URL: {partition.get('presigned_url')[:60]}...")
else:
logger.warning("Polling timeout or error")
def main():
"""Run all examples."""
logger.info("="*60)
logger.info("v2/workloads/batch API - Advanced Examples")
logger.info("="*60)
try:
example_1_basic_batch()
logger.info("")
example_2_with_terms_filter()
logger.info("")
example_3_complex_bool_query()
logger.info("")
example_4_field_selection()
logger.info("")
example_5_complete_workflow()
logger.info("="*60)
logger.info("All examples completed!")
logger.info("="*60)
except Exception as e:
logger.error(f"Error: {e}")
if __name__ == "__main__":
main()