Skip to main content

Moderate Example

Moderate example for v2/logs/scroll API - Retrieve logs with pagination support using scroll_id.

Useful for retrieving large datasets that exceed single-request limits, but are less than 100,000.

This example demonstrates using OpenSearch filters and field selection to refine your query.


Code Example

import requests
import json
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Authenticate and get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()["access_token"]

def fetch_logs_with_scroll(access_token):
"""Fetch logs using the scroll API with pagination.

Returns all logs matching the query by following scroll_id pagination.
"""
url = f"{BASE_URL}/v2/logs/scroll"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

# Calculate time range (last 1 hour)
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)

# Initial request payload
payload = {
"application": "eventops",
"app_type": "serverless",
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 100,
"log_group": ["*"],
"tenant": ["tenant1"],
"stage": ["prod"],
"fields": [
"@timestamp",
"logLevel",
"message",
"request.user_id",
"request.endpoint",
"response.status_code"
],
"filters": [
{
"exists": {
"field": "user.id"
}
}
]
}

all_logs = []
scroll_id = None
page = 1

logger.info(f"Fetching logs from {start_time} to {end_time}...")

while True:
# Use scroll_id for subsequent requests
if scroll_id:
payload = {"scroll_id": scroll_id}
logger.info(f"Fetching page {page}...")
else:
logger.info("Fetching initial page...")

# Make request
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()

# Extract logs from response
logs = data.get("data", {}).get("logs", [])
all_logs.extend(logs)
logger.info(f"Retrieved {len(logs)} logs (total: {len(all_logs)})")

# Check if there are more pages
scroll_id = data.get("meta", {}).get("scroll_id")
if not scroll_id or len(logs) == 0:
logger.info("No more pages available.")
break

page += 1

return all_logs

def main():
"""Main execution."""
logger.info("=" * 60)
logger.info("v2/logs/scroll API Example")
logger.info("=" * 60)

# Step 1: Get access token
logger.info("1. Authenticating...")
try:
access_token = get_access_token()
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
return

# Step 2: Fetch logs with pagination
logger.info("2. Fetching logs...")
try:
logs = fetch_logs_with_scroll(access_token)
logger.info(f"Successfully retrieved {len(logs)} total logs")
except Exception as e:
logger.error(f"Failed to fetch logs: {e}")
return

# Step 3: Process results
logger.info("3. Sample results:")
for i, log in enumerate(logs[:3], 1):
source = log.get('_source', {})
logger.info(f"Log {i}:")
logger.info(f" - Timestamp: {source.get('@timestamp', 'N/A')}")
logger.info(f" - Log Level: {source.get('logLevel', 'N/A')}")
logger.info(f" - Message: {source.get('message', 'N/A')[:100]}")
logger.info(f" - User ID: {source.get('request', {}).get('user_id', 'N/A')}")
logger.info(f" - Endpoint: {source.get('request', {}).get('endpoint', 'N/A')}")
logger.info(f" - Status Code: {source.get('response', {}).get('status_code', 'N/A')}")

if len(logs) > 3:
logger.info(f"... and {len(logs) - 3} more logs")

logger.info("=" * 60)
logger.info("Complete!")
logger.info("=" * 60)

if __name__ == "__main__":
main()