Moderate Example
Moderate example for v2/logs/batch API - Complete batch workflow with download.
This example shows the full workflow:
- Submit batch request
- Poll for completion
- Download results from presigned URLs
- Process the downloaded logs
Code Example
import requests
import logging
import time
import json
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]
def submit_batch_request(access_token):
"""Submit a batch logs request and return the token."""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=6)
payload = {
"application": "eventops",
"app_type": "serverless",
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"log_group": ["*"],
"tenant": ["tenant1"],
"stage": ["prod"],
"fields": [
"@timestamp",
"logLevel",
"message",
"request.user_id",
"request.endpoint",
"response.status_code"
],
"filters": [
{
"exists": {
"field": "user.id"
}
}
]
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
logger.info(f"Submitting batch request for time range: {start_time} to {end_time}")
response = requests.post(
f"{BASE_URL}/v2/logs/batch",
json=payload,
headers=headers
)
response.raise_for_status()
data = response.json()
token = data.get("data", {}).get("token")
logger.info(f"Batch request submitted. Token: {token}")
return token
def poll_for_completion(access_token, token, max_wait_seconds=300):
"""Poll the batch/status endpoint until all partitions are complete."""
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
start_time = time.time()
attempt = 0
logger.info("Polling for completion...")
while time.time() - start_time < max_wait_seconds:
attempt += 1
time.sleep(5) # Wait 5 seconds between polls
response = requests.post(
f"{BASE_URL}/v2/logs/batch/status",
json={"token": token},
headers=headers
)
if response.status_code == 404:
logger.error("Token not found or expired")
return None
response.raise_for_status()
data = response.json()
partitions = data.get("presigned_urls", [])
completed = sum(1 for p in partitions if p.get("completed"))
total = len(partitions)
logger.info(f"Attempt {attempt}: {completed}/{total} partitions completed")
if completed == total:
logger.info("All partitions completed!")
return partitions
logger.warning("Polling timeout reached")
return None
def download_partition_logs(presigned_url):
"""Download logs from a presigned URL."""
response = requests.get(presigned_url)
response.raise_for_status()
return response.json()
def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v2/logs/batch API Example")
logger.info("=" * 60)
# Step 1: Authenticate
logger.info("1. Authenticating...")
try:
access_token = get_access_token()
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
return
# Step 2: Submit batch request
logger.info("2. Submitting batch request...")
try:
token = submit_batch_request(access_token)
except Exception as e:
logger.error(f"Batch request failed: {e}")
return
# Step 3: Poll for completion
logger.info("3. Waiting for processing to complete...")
try:
partitions = poll_for_completion(access_token, token)
if not partitions:
logger.error("Failed to get completed partitions")
return
except Exception as e:
logger.error(f"Polling failed: {e}")
return
# Step 4: Download and process results
logger.info("4. Downloading results...")
total_logs = 0
for i, partition in enumerate(partitions, 1):
log_count = partition.get("log_count", 0)
presigned_url = partition.get("presigned_url")
logger.info(f"Partition {i}: {log_count} logs")
if presigned_url and presigned_url != "null":
try:
logs = download_partition_logs(presigned_url)
total_logs += len(logs)
logger.info(f" Downloaded {len(logs)} logs")
# Display first log from first partition
if i == 1 and logs:
logger.info(" Sample log:")
source = logs[0].get('_source', {})
logger.info(f" - Timestamp: {source.get('@timestamp', 'N/A')}")
logger.info(f" - Log Level: {source.get('logLevel', 'N/A')}")
logger.info(f" - Message: {source.get('message', 'N/A')[:80]}...")
except Exception as e:
logger.error(f" Failed to download: {e}")
logger.info("=" * 60)
logger.info(f"Complete! Downloaded {total_logs} total logs from {len(partitions)} partitions")
logger.info("=" * 60)
if __name__ == "__main__":
main()