Google Agent Development Kit (ADK) Introduction (7): Monitoring
Jonathan Huang

Jonathan Huang @jnth

About: Happy Coding

Location:
Taipei
Joined:
Mar 11, 2021

Google Agent Development Kit (ADK) Introduction (7): Monitoring

Publish Date: May 20
1 0

Application Monitoring & Visualization

Direct Prometheus integration with Cloud Run can be intricate due to its serverless architecture. Google Cloud offers Managed Service for Prometheus (GMP) using a sidecar for streamlined metric collection. However, for maximum simplicity and deep Google Cloud integration, native monitoring tools are the primary path.


Collecting Application Metrics (via Cloud Monitoring)

Option A: Structured Logging & Log-based Metrics (Most Direct)

  • Output structured (JSON) logs directly to stdout in your Streamlit app.
  • Cloud Run automatically forwards stdout/stderr to Cloud Logging; JSON strings printed to stdout are parsed as jsonPayload. This is the simplest mechanism.

Sample code snippet:

import json
import time # For realistic duration

# Example: In your Streamlit application code
def some_function_processing_a_task(task_type, success, duration_ms_value):
    log_data = {
        "message": f"Processed task: {task_type}",
        "task_type": task_type,
        "duration_ms": duration_ms_value,
        "success": success,
        # Severity is often inferred by Cloud Logging (stdout=INFO, stderr=ERROR).
        # Explicitly add "severity": "INFO" or "ERROR" if precise control is needed
        # for filtering or log-based metrics derived from severity.
    }
    # Print JSON to stdout; Cloud Run forwards this to Cloud Logging.
    # Cloud Logging will parse this into jsonPayload.
    print(json.dumps(log_data))

# Example usage:
# start_time = time.monotonic()
# # ... processing logic ...
# success_status = True # or False
# duration = (time.monotonic() - start_time) * 1000
# some_function_processing_a_task("schedule_meeting", success_status, int(duration))
Enter fullscreen mode Exit fullscreen mode
  • In Cloud Monitoring, create log-based metrics:

  • Navigate: Google Cloud Console → Logging → Log-based Metrics → Create Metric.

  • Choose Metric Type: Counter (for occurrences) or Distribution (for values like latency).

  • Define Filter to isolate relevant logs, e.g.:

resource.type="cloud_run_revision"
resource.labels.service_name="meeting-workflow-agent"
jsonPayload.task_type="schedule_meeting"        

Enter fullscreen mode Exit fullscreen mode
  • Specify Field Name (for Distribution, e.g., jsonPayload.duration_ms), units (e.g., ms), and labels for drill-down.

Option B: Using Cloud Monitoring API (Maximum Flexibility, More Code)

  • Add google-cloud-monitoring to your requirements.txt.
  • Utilize google.cloud.monitoring_v3 to programmatically write custom metrics. This offers granular control.

Sample code:

from google.cloud import monitoring_v3
import time
import os

project_id = "adk-learning-journey" # Ensure this is configured or dynamically fetched
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"

def write_custom_metric(metric_type, value, labels=None):
    series = monitoring_v3.types.TimeSeries()
    series.metric.type = f"custom.googleapis.com/{metric_type}" # Standard prefix for custom metrics
    series.resource.type = "cloud_run_revision"
    # These labels are crucial for associating the metric with the correct Cloud Run resource
    series.resource.labels["service_name"] = os.environ.get("K_SERVICE", "unknown") # K_SERVICE is the Cloud Run service name
    series.resource.labels["revision_name"] = os.environ.get("K_REVISION", "unknown") # K_REVISION is the specific revision
    series.resource.labels["configuration_name"] = os.environ.get("K_CONFIGURATION", "unknown") # K_CONFIGURATION is the configuration name

    if labels:
        for k, v in labels.items():
            series.metric.labels[k] = str(v) # Metric labels must be strings

    point = monitoring_v3.types.Point()
    # Ensure value is appropriate type (int64_value, double_value, etc.)
    if isinstance(value, int):
        point.value.int64_value = value
    elif isinstance(value, float):
        point.value.double_value = value
    else:
        # Handle other types or raise error, e.g. for Distribution for complex types
        # For simplicity, this example assumes int or float.
        point.value.int64_value = int(value) 


    now_ts = time.time()
    point.interval.end_time.seconds = int(now_ts)
    point.interval.end_time.nanos = int((now_ts - point.interval.end_time.seconds) * 10**9)

    series.points.append(point)

    # Call the API to create the time series
    try:
        client.create_time_series(name=project_name, time_series=[series])
    except Exception as e:
        # Implement proper error handling/logging for production
        print(f"Error writing custom metric {metric_type}: {e}", file=sys.stderr)


# Example usage:
# write_custom_metric("streamlit/successful_meetings", 1, {"agent_type": "manager"})
# write_custom_metric("streamlit/processing_latency_ms", 150.5, {"task_category": "report_generation"})
Enter fullscreen mode Exit fullscreen mode

Set Up Grafana

  • Deploy Grafana:
    • Option 1 (Recommended for GKE/GCE): Deploy via Google Cloud Marketplace if operating within these environments.
    • Option 2: Install Grafana in your designated environment.
  • Connect Grafana to Google Cloud Monitoring:
    • Access Grafana.
    • Navigate: Configuration → Data Sources → Add data source.
    • Select "Google Cloud Monitoring".
    • Authentication:
      • If Grafana runs on GCE/GKE, leverage the attached service account (grant roles/monitoring.viewer).
      • Otherwise, create a service account, assign roles/monitoring.viewer, download its JSON key, and upload to Grafana.
    • Set Default Project: adk-learning-journey.
    • Save & Test. A successful test validates the connection.

Create Grafana Dashboards

  • Create → Dashboard → Add new panel.
  • Query Configuration:
    • Select the "Google Cloud Monitoring" data source.
    • Service: Choose "Cloud Run" for standard metrics or "Custom Metrics" for your defined metrics.
    • Metric: Select your log-based metric, standard Cloud Run metrics (e.g., Request Count, Latency, Instance Count), or your custom metric name (e.g., custom.googleapis.com/streamlit/request_count).
    • Utilize the query editor to filter and aggregate data (e.g., by resource.labels.service_name, resource.labels.revision_name, or custom metric labels).
  • Visualization: Choose the optimal chart type for the data.
  • Alerting: Configure alert rules in Grafana for critical metrics to ensure operational awareness.

Cost Control & Optimization

  • Monitor Costs: Regularly audit Google Cloud Billing reports. Filter by service (Cloud Run, Secret Manager, Logging, Monitoring) for granular insight.
  • Set Budget Alerts: Implement budget alerts in Billing for proactive cost management.
  • Optimize Cloud Run Settings:
    • Tune CPU, memory, and min/max instances based on observed performance data from Grafana/Cloud Monitoring.
    • Employ --min-instances 0 for services with intermittent traffic to minimize idle costs.
  • Logging & Monitoring Costs:
    • Be aware: Logging ingestion, storage, and Monitoring (especially custom metrics & API calls) have associated costs.
    • Reduce log ingestion costs by setting appropriate application log levels at the source. Log-based metric filters refine insights from ingested logs; they do not reduce the initial volume of logs stored.
    • For custom metrics, govern write frequency to balance granularity with cost.
  • Secret Manager Costs: Costs are driven by the number of secrets and access frequency. Reading secrets primarily at application startup minimizes these operational costs.

Comments 0 total

    Add comment