Files
Keybard-Vagabond-Demo/manifests/infrastructure/celery-monitoring/celery-metrics-exporter.yaml

506 lines
18 KiB
YAML

---
# Configuration for Celery Metrics Exporter
apiVersion: v1
kind: ConfigMap
metadata:
name: celery-exporter-config
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: config
data:
config.yaml: |
# Auto-discovery settings
auto_discovery:
enabled: true
scan_databases: false # Only scan known databases, not all 0-15
scan_queues: true # Auto-discover queues in each database
monitor_all_lists: false # If true, monitor ALL Redis lists, not just queue-like ones
use_known_queues: true # Monitor known queues even if they don't exist as lists yet
# Queue patterns to look for (Redis list keys that are likely Celery queues)
queue_patterns:
- "celery"
- "*_priority" # high_priority, medium_priority, low_priority
- "default"
- "mailers"
- "push"
- "scheduler"
- "broadcast"
- "federation"
- "media"
- "user_dir"
# BookWyrm specific queues
- "streams"
- "images"
- "suggested_users"
- "email"
- "connectors"
- "lists"
- "inbox"
- "imports"
- "import_triggered"
- "misc"
# PieFed specific queues
- "background"
- "send"
# Pixelfed/Laravel specific queues
- "high"
- "mmo"
# Common queue patterns
- "*_queue"
- "queue_*"
# Known application configurations (monitored even when queues are empty)
known_applications:
- name: "piefed"
db: 0
queues: ["celery", "background", "send"]
- name: "bookwyrm"
db: 3
queues: ["high_priority", "medium_priority", "low_priority", "streams", "images", "suggested_users", "email", "connectors", "lists", "inbox", "imports", "import_triggered", "broadcast", "misc"]
- name: "mastodon"
db: 1
queues: ["default", "mailers", "push", "scheduler"]
# Optional: Database name mapping (if you want friendly names)
# If not specified, databases will be named "db_0", "db_1", etc.
database_names:
0: "piefed"
1: "mastodon"
2: "matrix"
3: "bookwyrm"
# Minimum queue length to report (avoid noise from empty queues)
min_queue_length: 0
# Maximum number of databases to scan (safety limit)
max_databases: 4
---
# Custom Celery Metrics Exporter Script
apiVersion: v1
kind: ConfigMap
metadata:
name: celery-metrics-script
namespace: celery-monitoring
data:
celery_metrics.py: |
#!/usr/bin/env python3
import redis
import time
import os
import yaml
import fnmatch
from prometheus_client import start_http_server, Gauge, Counter, Info
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheus metrics
celery_queue_length = Gauge('celery_queue_length', 'Length of Celery queue', ['queue_name', 'database', 'db_number'])
celery_queue_info = Info('celery_queue_info', 'Information about Celery queues')
redis_connection_status = Gauge('redis_connection_status', 'Redis connection status (1=connected, 0=disconnected)', ['database', 'db_number'])
databases_discovered = Gauge('celery_databases_discovered', 'Number of databases with queues discovered')
queues_discovered = Gauge('celery_queues_discovered', 'Total number of queues discovered', ['database'])
# Redis connection
REDIS_HOST = os.getenv('REDIS_HOST', 'redis-ha-haproxy.redis-system.svc.cluster.local')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', '')
def get_redis_client(db=0):
return redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
db=db,
decode_responses=True
)
def load_config():
"""Load configuration from YAML file"""
config_path = '/config/config.yaml'
default_config = {
'auto_discovery': {
'enabled': True,
'scan_databases': True,
'scan_queues': True
},
'queue_patterns': [
'celery',
'*_priority',
'default',
'mailers',
'push',
'scheduler',
'broadcast',
'federation',
'media',
'user_dir'
],
'database_names': {},
'min_queue_length': 0,
'max_databases': 16
}
try:
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info("Loaded configuration from file")
return {**default_config, **config}
else:
logger.info("No config file found, using defaults")
return default_config
except Exception as e:
logger.error(f"Error loading config: {e}, using defaults")
return default_config
def discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists=False):
"""Discover all potential Celery queues in a Redis database"""
try:
# Get all keys in the database
all_keys = redis_client.keys('*')
discovered_queues = []
for key in all_keys:
# Check if key is a list (potential queue)
try:
key_type = redis_client.type(key)
if key_type == 'list':
if monitor_all_lists:
# Monitor ALL Redis lists
discovered_queues.append(key)
else:
# Smart filtering: Check if key matches any of our queue patterns
for pattern in queue_patterns:
if fnmatch.fnmatch(key, pattern):
discovered_queues.append(key)
break
else:
# Also include keys that look like queues (contain common queue words)
queue_indicators = ['queue', 'celery', 'task', 'job', 'work']
if any(indicator in key.lower() for indicator in queue_indicators):
discovered_queues.append(key)
except Exception as e:
logger.debug(f"Error checking key {key} in DB {db_number}: {e}")
continue
# Remove duplicates and sort
discovered_queues = sorted(list(set(discovered_queues)))
if discovered_queues:
mode = "all lists" if monitor_all_lists else "filtered queues"
logger.info(f"DB {db_number}: Discovered {len(discovered_queues)} {mode}: {discovered_queues}")
return discovered_queues
except Exception as e:
logger.error(f"Error discovering queues in DB {db_number}: {e}")
return []
def get_known_applications(config):
"""Get known application configurations"""
return config.get('known_applications', [])
def discover_databases_and_queues(config):
"""Hybrid approach: Use known applications + auto-discovery"""
max_databases = config.get('max_databases', 16)
queue_patterns = config.get('queue_patterns', ['celery', '*_priority'])
database_names = config.get('database_names', {})
monitor_all_lists = config.get('auto_discovery', {}).get('monitor_all_lists', False)
use_known_queues = config.get('auto_discovery', {}).get('use_known_queues', True)
discovered_databases = []
known_apps = get_known_applications(config) if use_known_queues else []
# Track which databases we've already processed from known apps
processed_dbs = set()
# First, add known applications (these are always monitored)
for app_config in known_apps:
db_number = app_config['db']
app_name = app_config['name']
known_queues = app_config['queues']
try:
redis_client = get_redis_client(db_number)
redis_client.ping() # Test connection
# For known apps, we monitor the queues even if they don't exist yet
discovered_databases.append({
'name': app_name,
'db_number': db_number,
'queues': known_queues,
'total_keys': redis_client.dbsize(),
'source': 'known_application'
})
processed_dbs.add(db_number)
logger.info(f"Known app {app_name} (DB {db_number}): {len(known_queues)} configured queues")
except Exception as e:
logger.error(f"Error connecting to known app {app_name} (DB {db_number}): {e}")
continue
# Then, do auto-discovery for remaining databases
for db_number in range(max_databases):
if db_number in processed_dbs:
continue # Skip databases we already processed
try:
redis_client = get_redis_client(db_number)
# Test connection and check if database has any keys
redis_client.ping()
db_size = redis_client.dbsize()
if db_size > 0:
# Discover queues in this database
queues = discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists)
if queues: # Only include databases that have queues/lists
db_name = database_names.get(db_number, f"db_{db_number}")
discovered_databases.append({
'name': db_name,
'db_number': db_number,
'queues': queues,
'total_keys': db_size,
'source': 'auto_discovery'
})
mode = "lists" if monitor_all_lists else "queues"
logger.info(f"Auto-discovered DB {db_number} ({db_name}): {len(queues)} {mode}, {db_size} total keys")
except redis.ConnectionError:
logger.debug(f"Cannot connect to database {db_number}")
continue
except Exception as e:
logger.debug(f"Error checking database {db_number}: {e}")
continue
known_count = len([db for db in discovered_databases if db.get('source') == 'known_application'])
discovered_count = len([db for db in discovered_databases if db.get('source') == 'auto_discovery'])
logger.info(f"Hybrid discovery complete: {known_count} known applications, {discovered_count} auto-discovered databases")
return discovered_databases
def collect_metrics():
config = load_config()
if not config['auto_discovery']['enabled']:
logger.error("Auto-discovery is disabled in configuration")
return
# Discover databases and queues
databases = discover_databases_and_queues(config)
if not databases:
logger.warning("No databases with queues discovered")
databases_discovered.set(0)
return
databases_discovered.set(len(databases))
queue_info = {}
total_queues = 0
min_queue_length = config.get('min_queue_length', 0)
for db_config in databases:
db_name = db_config['name']
db_number = db_config['db_number']
queues = db_config['queues']
try:
redis_client = get_redis_client(db_number)
# Test connection
redis_client.ping()
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(1)
total_queue_length = 0
active_queues = 0
for queue_name in queues:
try:
queue_length = redis_client.llen(queue_name)
# Only report queues that meet minimum length threshold
if queue_length >= min_queue_length:
celery_queue_length.labels(
queue_name=queue_name,
database=db_name,
db_number=str(db_number)
).set(queue_length)
total_queue_length += queue_length
if queue_length > 0:
active_queues += 1
logger.info(f"{db_name} (DB {db_number}) {queue_name}: {queue_length} tasks")
except Exception as e:
logger.warning(f"Error checking {db_name} queue {queue_name}: {e}")
# Set total queue length for this database
celery_queue_length.labels(
queue_name='_total',
database=db_name,
db_number=str(db_number)
).set(total_queue_length)
# Track queues discovered per database
queues_discovered.labels(database=db_name).set(len(queues))
queue_info[f'{db_name}_total_length'] = str(total_queue_length)
queue_info[f'{db_name}_active_queues'] = str(active_queues)
queue_info[f'{db_name}_total_queues'] = str(len(queues))
queue_info[f'{db_name}_source'] = db_config.get('source', 'unknown')
total_queues += len(queues)
source_info = f" ({db_config.get('source', 'unknown')})" if 'source' in db_config else ""
if total_queue_length > 0:
logger.info(f"{db_name} (DB {db_number}){source_info}: {total_queue_length} total tasks in {active_queues}/{len(queues)} queues")
except Exception as e:
logger.error(f"Error collecting metrics for {db_name} (DB {db_number}): {e}")
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(0)
# Update global queue info
queue_info.update({
'redis_host': REDIS_HOST,
'last_update': str(int(time.time())),
'databases_monitored': str(len(databases)),
'total_queues_discovered': str(total_queues),
'auto_discovery_enabled': 'true'
})
celery_queue_info.info(queue_info)
if __name__ == '__main__':
# Start Prometheus metrics server
start_http_server(8000)
logger.info("Celery metrics exporter started on port 8000")
# Collect metrics every 60 seconds
while True:
collect_metrics()
time.sleep(60)
---
# Celery Metrics Exporter Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-metrics-exporter
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
template:
metadata:
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
containers:
- name: celery-metrics-exporter
image: python:3.11-slim
command:
- /bin/sh
- -c
- |
pip install redis prometheus_client pyyaml
python /scripts/celery_metrics.py
ports:
- containerPort: 8000
name: metrics
env:
- name: REDIS_HOST
value: "redis-ha-haproxy.redis-system.svc.cluster.local"
- name: REDIS_PORT
value: "6379"
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-credentials
key: redis-password
volumeMounts:
- name: script
mountPath: /scripts
- name: config
mountPath: /config
resources:
requests:
cpu: 50m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
livenessProbe:
httpGet:
path: /metrics
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /metrics
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: script
configMap:
name: celery-metrics-script
defaultMode: 0755
- name: config
configMap:
name: celery-exporter-config
---
# Service for Celery Metrics Exporter
apiVersion: v1
kind: Service
metadata:
name: celery-metrics-exporter
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
selector:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
ports:
- port: 8000
targetPort: 8000
name: metrics
---
# ServiceMonitor for OpenTelemetry Collection
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: celery-metrics-exporter
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
selector:
matchLabels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
endpoints:
- port: metrics
interval: 60s
path: /metrics