Skip to main content

Advanced Example

Advanced examples for v2/logs/batch API showing different use cases.

Demonstrates:

  • Basic batch request
  • Batch with filters
  • Batch with complex bool queries and field selection
  • Serverless logs batch
  • Handling large time ranges with multiple partitions

Code Example

import requests
import logging
import time
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]

def poll_until_complete(access_token, token, max_wait=300):
"""Poll batch/status endpoint until complete or timeout."""
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

start = time.time()
while time.time() - start < max_wait:
time.sleep(5)
response = requests.post(
f"{BASE_URL}/v2/logs/batch/status",
json={"token": token},
headers=headers
)

if response.status_code == 200:
data = response.json()
partitions = data.get("presigned_urls", [])
completed = sum(1 for p in partitions if p.get("completed"))
if completed == len(partitions):
return partitions

return None

def example_1_basic_batch():
"""Example 1: Basic batch request for all logs in a domain."""
logger.info("="*60)
logger.info("Example 1: Basic Batch - All logs from specific domain")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=12)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["prod.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
}

response = requests.post(
f"{BASE_URL}/v2/logs/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Note: Use v2/logs/batch/status with this token to check status and get download URLs")

def example_2_with_filters():
"""Example 2: Batch request with filters for specific log patterns."""
logger.info("="*60)
logger.info("Example 2: Batch with Filters - Specific error and memory size")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"filters": [
{"term": {"message.keyword": "KeyError occurred"}},
{"term": {"function_memory_size": "1024"}}
]
}

response = requests.post(
f"{BASE_URL}/v2/logs/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Filters applied: KeyError messages with 1024MB memory")

def example_3_complex_bool_query():
"""Example 3: Batch with complex bool query and field selection."""
logger.info("="*60)
logger.info("Example 3: Complex Bool Query - SR_TASK tickets that didn't fail")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=48)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"fields": [
"@timestamp",
"ticket.type",
"workload.trigger",
"workload.state",
"message"
],
"filters": [
{
"bool": {
"must": [
{"term": {"ticket.type": "SR_TASK"}},
{"term": {"workload.trigger": "TICKET"}}
],
"must_not": [
{"term": {"workload.state": "FAILED"}}
]
}
}
]
}

response = requests.post(
f"{BASE_URL}/v2/logs/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Complex bool query with field selection applied")

def example_4_serverless_batch():
"""Example 4: Batch request for serverless (EventOps) logs."""
logger.info("="*60)
logger.info("Example 4: Serverless Batch - EventOps application")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)

payload = {
"application": "eventops",
"app_type": "serverless",
"log_group": ["*"],
"tenant": ["*"],
"stage": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
}

response = requests.post(
f"{BASE_URL}/v2/logs/batch",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Status: {response.status_code}")
logger.info(f"Token: {token}")
logger.info("Serverless logs batch request submitted")

def example_5_complete_workflow():
"""Example 5: Complete workflow with polling and download."""
logger.info("="*60)
logger.info("Example 5: Complete Workflow - Submit, poll, and download")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=6)

# Submit request
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 5000
}

headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

logger.info("Submitting batch request...")
response = requests.post(f"{BASE_URL}/v2/logs/batch", json=payload, headers=headers)
data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Token received: {token}")

logger.info("Polling for completion...")
# Poll for completion
partitions = poll_until_complete(access_token, token)

if partitions:
logger.info(f"Processing complete! {len(partitions)} partitions ready")
total_logs = sum(p.get("log_count", 0) for p in partitions)
logger.info(f"Total logs available: {total_logs}")

for i, partition in enumerate(partitions, 1):
logger.info(f"Partition {i}:")
logger.info(f" - Time: {partition.get('start_time')} to {partition.get('end_time')}")
logger.info(f" - Logs: {partition.get('log_count')}")
logger.info(f" - URL: {partition.get('presigned_url')[:60]}...")
else:
logger.warning("Polling timeout or error")

def main():
"""Run all examples."""
logger.info("="*60)
logger.info("v2/logs/batch API - Advanced Examples")
logger.info("="*60)

try:
example_1_basic_batch()
logger.info("")
example_2_with_filters()
logger.info("")
example_3_complex_bool_query()
logger.info("")
example_4_serverless_batch()
logger.info("")
example_5_complete_workflow()

logger.info("="*60)
logger.info("All examples completed!")
logger.info("="*60)
except Exception as e:
logger.error(f"Error: {e}")

if __name__ == "__main__":
main()