Skip to main content

Moderate Example

Moderate example for v2/logs/batch API - Complete batch workflow with download.

This example shows the full workflow:

  1. Submit batch request
  2. Poll for completion
  3. Download results from presigned URLs
  4. Process the downloaded logs

Code Example

import requests
import logging
import time
import json
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]

def submit_batch_request(access_token):
"""Submit a batch logs request and return the token."""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=6)

payload = {
"application": "eventops",
"app_type": "serverless",
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"log_group": ["*"],
"tenant": ["tenant1"],
"stage": ["prod"],
"fields": [
"@timestamp",
"logLevel",
"message",
"request.user_id",
"request.endpoint",
"response.status_code"
],
"filters": [
{
"exists": {
"field": "user.id"
}
}
]
}

headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

logger.info(f"Submitting batch request for time range: {start_time} to {end_time}")
response = requests.post(
f"{BASE_URL}/v2/logs/batch",
json=payload,
headers=headers
)
response.raise_for_status()

data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Batch request submitted. Token: {token}")
return token

def poll_for_completion(access_token, token, max_wait_seconds=300):
"""Poll the batch/status endpoint until all partitions are complete."""
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

start_time = time.time()
attempt = 0

logger.info("Polling for completion...")

while time.time() - start_time < max_wait_seconds:
attempt += 1
time.sleep(5) # Wait 5 seconds between polls

response = requests.post(
f"{BASE_URL}/v2/logs/batch/status",
json={"token": token},
headers=headers
)

if response.status_code == 404:
logger.error("Token not found or expired")
return None

response.raise_for_status()
data = response.json()
partitions = data.get("presigned_urls", [])

completed = sum(1 for p in partitions if p.get("completed"))
total = len(partitions)

logger.info(f"Attempt {attempt}: {completed}/{total} partitions completed")

if completed == total:
logger.info("All partitions completed!")
return partitions

logger.warning("Polling timeout reached")
return None

def download_partition_logs(presigned_url):
"""Download logs from a presigned URL."""
response = requests.get(presigned_url)
response.raise_for_status()
return response.json()

def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v2/logs/batch API Example")
logger.info("=" * 60)

# Step 1: Authenticate
logger.info("1. Authenticating...")
try:
access_token = get_access_token()
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
return

# Step 2: Submit batch request
logger.info("2. Submitting batch request...")
try:
token = submit_batch_request(access_token)
except Exception as e:
logger.error(f"Batch request failed: {e}")
return

# Step 3: Poll for completion
logger.info("3. Waiting for processing to complete...")
try:
partitions = poll_for_completion(access_token, token)
if not partitions:
logger.error("Failed to get completed partitions")
return
except Exception as e:
logger.error(f"Polling failed: {e}")
return

# Step 4: Download and process results
logger.info("4. Downloading results...")
total_logs = 0

for i, partition in enumerate(partitions, 1):
log_count = partition.get("log_count", 0)
presigned_url = partition.get("presigned_url")

logger.info(f"Partition {i}: {log_count} logs")

if presigned_url and presigned_url != "null":
try:
logs = download_partition_logs(presigned_url)
total_logs += len(logs)
logger.info(f" Downloaded {len(logs)} logs")

# Display first log from first partition
if i == 1 and logs:
logger.info(" Sample log:")
source = logs[0].get('_source', {})
logger.info(f" - Timestamp: {source.get('@timestamp', 'N/A')}")
logger.info(f" - Log Level: {source.get('logLevel', 'N/A')}")
logger.info(f" - Message: {source.get('message', 'N/A')[:80]}...")
except Exception as e:
logger.error(f" Failed to download: {e}")

logger.info("=" * 60)
logger.info(f"Complete! Downloaded {total_logs} total logs from {len(partitions)} partitions")
logger.info("=" * 60)

if __name__ == "__main__":
main()