Skip to main content

Advanced Example

Advanced example for v2/logs/scroll API - Multiple query patterns and use cases.

This example demonstrates various query patterns including basic queries, complex filters with bool logic, field selection, serverless logs, and pagination handling.


Code Example

import requests
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

BASE_URL = "https://central-monitoring-data-api.mywizard-aiops.com"
TOKEN_URL = "https://your-auth-endpoint.com/oauth2/token"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

def get_access_token():
"""Get JWT access token."""
response = requests.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.json()["access_token"]

def example_1_basic_query():
"""Example 1: Basic query for all logs in a domain."""
logger.info("="*60)
logger.info("Example 1: Basic Query - All logs from specific domain")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["prod.example.com"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 100
}

response = requests.post(
f"{BASE_URL}/v2/logs/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Logs retrieved: {len(data.get('data', {}).get('logs', []))}")
logger.info(f"Total available: {data.get('meta', {}).get('log_count', 0)}")

def example_2_with_filters():
"""Example 2: Query with filters for specific log patterns."""
logger.info("="*60)
logger.info("Example 2: Query with Filters - Error logs only")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=6)

payload = {
"application": "atr",
"app_type": "ec2",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 50,
"filters": [
{"term": {"container.name": "my_container-name"}},
{"term": {"atr.infra_data.region": "my_region"}}
]
}

response = requests.post(
f"{BASE_URL}/v2/logs/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Error logs found: {len(data.get('data', {}).get('logs', []))}")

def example_3_field_selection():
"""Example 3: Complex bool query with must and must_not clauses."""
logger.info("="*60)
logger.info("Example 3: Complex Bool Query - SR_TASK tickets that didn't fail")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=30)

payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 20,
"fields": [
"@timestamp",
"ticket.type",
"workload.trigger",
"workload.state",
"message"
],
"filters": [
{
"bool": {
"must": [
{"term": {"container.name": "my_container"}},
{"term": {"host.os.name": "host_name"}}
],
"must_not": [
{"term": {"regex.pattern": "token_error"}}
]
}
}
]
}

response = requests.post(
f"{BASE_URL}/v2/logs/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logs = data.get('data', {}).get('logs', [])
logger.info(f"Status: {response.status_code}")
logger.info(f"Logs retrieved: {len(logs)}")

if logs:
logger.info("First log sample (only selected fields returned):")
source = logs[0].get('_source', {})
logger.info(f" Timestamp: {source.get('@timestamp', 'N/A')}")
logger.info(f" Ticket Type: {source.get('ticket', {}).get('type', 'N/A')}")
logger.info(f" Workload Trigger: {source.get('workload', {}).get('trigger', 'N/A')}")
logger.info(f" Workload State: {source.get('workload', {}).get('state', 'N/A')}")
message = source.get('message', 'N/A')
message_preview = message[:80] + '...' if message != 'N/A' and len(message) > 80 else message
logger.info(f" Message: {message_preview}")

def example_4_serverless_logs():
"""Example 4: Query serverless (EventOps) logs."""
logger.info("="*60)
logger.info("Example 4: Serverless Logs - EventOps application")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=2)

payload = {
"application": "eventops",
"app_type": "serverless",
"log_group": ["*"],
"tenant": ["*"],
"stage": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 100
}

response = requests.post(
f"{BASE_URL}/v2/logs/scroll",
json=payload,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}
)

data = response.json()
logger.info(f"Status: {response.status_code}")
logger.info(f"Serverless logs retrieved: {len(data.get('data', {}).get('logs', []))}")

def example_5_pagination():
"""Example 5: Using scroll_id for pagination."""
logger.info("="*60)
logger.info("Example 5: Pagination - Using scroll_id")
logger.info("="*60)

access_token = get_access_token()
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)

# Initial request
payload = {
"application": "atr",
"app_type": "kubernetes",
"domain": ["*"],
"start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"size": 50
}

headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"client_id": CLIENT_ID
}

response = requests.post(f"{BASE_URL}/v2/logs/scroll", json=payload, headers=headers)
data = response.json()
logs_page_1 = data.get('data', {}).get('logs', [])
scroll_id = data.get('meta', {}).get('scroll_id')

logger.info(f"Page 1: Retrieved {len(logs_page_1)} logs")

if scroll_id:
# Follow-up request with scroll_id
logger.info("Fetching page 2 using scroll_id...")
payload = {"scroll_id": scroll_id}
response = requests.post(f"{BASE_URL}/v2/logs/scroll", json=payload, headers=headers)
data = response.json()
logs_page_2 = data.get('data', {}).get('logs', [])
logger.info(f"Page 2: Retrieved {len(logs_page_2)} logs")
logger.info(f"Total: {len(logs_page_1) + len(logs_page_2)} logs")
else:
logger.info("No more pages available")

def main():
"""Run all examples."""
logger.info("="*60)
logger.info("v2/logs/scroll API - Advanced Examples")
logger.info("="*60)

try:
example_1_basic_query()
example_2_with_filters()
example_3_field_selection()
example_4_serverless_logs()
example_5_pagination()

logger.info("="*60)
logger.info("All examples completed!")
logger.info("="*60)
except Exception as e:
logger.error(f"Error: {e}")

if __name__ == "__main__":
main()