506 lines
18 KiB
YAML
506 lines
18 KiB
YAML
---
|
|
# Configuration for Celery Metrics Exporter
|
|
apiVersion: v1
|
|
kind: ConfigMap
|
|
metadata:
|
|
name: celery-exporter-config
|
|
namespace: celery-monitoring
|
|
labels:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: config
|
|
data:
|
|
config.yaml: |
|
|
# Auto-discovery settings
|
|
auto_discovery:
|
|
enabled: true
|
|
scan_databases: false # Only scan known databases, not all 0-15
|
|
scan_queues: true # Auto-discover queues in each database
|
|
monitor_all_lists: false # If true, monitor ALL Redis lists, not just queue-like ones
|
|
use_known_queues: true # Monitor known queues even if they don't exist as lists yet
|
|
|
|
# Queue patterns to look for (Redis list keys that are likely Celery queues)
|
|
queue_patterns:
|
|
- "celery"
|
|
- "*_priority" # high_priority, medium_priority, low_priority
|
|
- "default"
|
|
- "mailers"
|
|
- "push"
|
|
- "scheduler"
|
|
- "broadcast"
|
|
- "federation"
|
|
- "media"
|
|
- "user_dir"
|
|
# BookWyrm specific queues
|
|
- "streams"
|
|
- "images"
|
|
- "suggested_users"
|
|
- "email"
|
|
- "connectors"
|
|
- "lists"
|
|
- "inbox"
|
|
- "imports"
|
|
- "import_triggered"
|
|
- "misc"
|
|
# PieFed specific queues
|
|
- "background"
|
|
- "send"
|
|
# Pixelfed/Laravel specific queues
|
|
- "high"
|
|
- "mmo"
|
|
# Common queue patterns
|
|
- "*_queue"
|
|
- "queue_*"
|
|
|
|
# Known application configurations (monitored even when queues are empty)
|
|
known_applications:
|
|
- name: "piefed"
|
|
db: 0
|
|
queues: ["celery", "background", "send"]
|
|
- name: "bookwyrm"
|
|
db: 3
|
|
queues: ["high_priority", "medium_priority", "low_priority", "streams", "images", "suggested_users", "email", "connectors", "lists", "inbox", "imports", "import_triggered", "broadcast", "misc"]
|
|
- name: "mastodon"
|
|
db: 1
|
|
queues: ["default", "mailers", "push", "scheduler"]
|
|
|
|
# Optional: Database name mapping (if you want friendly names)
|
|
# If not specified, databases will be named "db_0", "db_1", etc.
|
|
database_names:
|
|
0: "piefed"
|
|
1: "mastodon"
|
|
2: "matrix"
|
|
3: "bookwyrm"
|
|
|
|
# Minimum queue length to report (avoid noise from empty queues)
|
|
min_queue_length: 0
|
|
|
|
# Maximum number of databases to scan (safety limit)
|
|
max_databases: 4
|
|
|
|
---
|
|
# Custom Celery Metrics Exporter Script
|
|
apiVersion: v1
|
|
kind: ConfigMap
|
|
metadata:
|
|
name: celery-metrics-script
|
|
namespace: celery-monitoring
|
|
data:
|
|
celery_metrics.py: |
|
|
#!/usr/bin/env python3
|
|
import redis
|
|
import time
|
|
import os
|
|
import yaml
|
|
import fnmatch
|
|
from prometheus_client import start_http_server, Gauge, Counter, Info
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Prometheus metrics
|
|
celery_queue_length = Gauge('celery_queue_length', 'Length of Celery queue', ['queue_name', 'database', 'db_number'])
|
|
celery_queue_info = Info('celery_queue_info', 'Information about Celery queues')
|
|
redis_connection_status = Gauge('redis_connection_status', 'Redis connection status (1=connected, 0=disconnected)', ['database', 'db_number'])
|
|
databases_discovered = Gauge('celery_databases_discovered', 'Number of databases with queues discovered')
|
|
queues_discovered = Gauge('celery_queues_discovered', 'Total number of queues discovered', ['database'])
|
|
|
|
# Redis connection
|
|
REDIS_HOST = os.getenv('REDIS_HOST', 'redis-ha-haproxy.redis-system.svc.cluster.local')
|
|
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
|
|
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', '')
|
|
|
|
def get_redis_client(db=0):
|
|
return redis.Redis(
|
|
host=REDIS_HOST,
|
|
port=REDIS_PORT,
|
|
password=REDIS_PASSWORD,
|
|
db=db,
|
|
decode_responses=True
|
|
)
|
|
|
|
def load_config():
|
|
"""Load configuration from YAML file"""
|
|
config_path = '/config/config.yaml'
|
|
default_config = {
|
|
'auto_discovery': {
|
|
'enabled': True,
|
|
'scan_databases': True,
|
|
'scan_queues': True
|
|
},
|
|
'queue_patterns': [
|
|
'celery',
|
|
'*_priority',
|
|
'default',
|
|
'mailers',
|
|
'push',
|
|
'scheduler',
|
|
'broadcast',
|
|
'federation',
|
|
'media',
|
|
'user_dir'
|
|
],
|
|
'database_names': {},
|
|
'min_queue_length': 0,
|
|
'max_databases': 16
|
|
}
|
|
|
|
try:
|
|
if os.path.exists(config_path):
|
|
with open(config_path, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
logger.info("Loaded configuration from file")
|
|
return {**default_config, **config}
|
|
else:
|
|
logger.info("No config file found, using defaults")
|
|
return default_config
|
|
except Exception as e:
|
|
logger.error(f"Error loading config: {e}, using defaults")
|
|
return default_config
|
|
|
|
def discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists=False):
|
|
"""Discover all potential Celery queues in a Redis database"""
|
|
try:
|
|
# Get all keys in the database
|
|
all_keys = redis_client.keys('*')
|
|
discovered_queues = []
|
|
|
|
for key in all_keys:
|
|
# Check if key is a list (potential queue)
|
|
try:
|
|
key_type = redis_client.type(key)
|
|
if key_type == 'list':
|
|
if monitor_all_lists:
|
|
# Monitor ALL Redis lists
|
|
discovered_queues.append(key)
|
|
else:
|
|
# Smart filtering: Check if key matches any of our queue patterns
|
|
for pattern in queue_patterns:
|
|
if fnmatch.fnmatch(key, pattern):
|
|
discovered_queues.append(key)
|
|
break
|
|
else:
|
|
# Also include keys that look like queues (contain common queue words)
|
|
queue_indicators = ['queue', 'celery', 'task', 'job', 'work']
|
|
if any(indicator in key.lower() for indicator in queue_indicators):
|
|
discovered_queues.append(key)
|
|
except Exception as e:
|
|
logger.debug(f"Error checking key {key} in DB {db_number}: {e}")
|
|
continue
|
|
|
|
# Remove duplicates and sort
|
|
discovered_queues = sorted(list(set(discovered_queues)))
|
|
|
|
if discovered_queues:
|
|
mode = "all lists" if monitor_all_lists else "filtered queues"
|
|
logger.info(f"DB {db_number}: Discovered {len(discovered_queues)} {mode}: {discovered_queues}")
|
|
|
|
return discovered_queues
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error discovering queues in DB {db_number}: {e}")
|
|
return []
|
|
|
|
def get_known_applications(config):
|
|
"""Get known application configurations"""
|
|
return config.get('known_applications', [])
|
|
|
|
def discover_databases_and_queues(config):
|
|
"""Hybrid approach: Use known applications + auto-discovery"""
|
|
max_databases = config.get('max_databases', 16)
|
|
queue_patterns = config.get('queue_patterns', ['celery', '*_priority'])
|
|
database_names = config.get('database_names', {})
|
|
monitor_all_lists = config.get('auto_discovery', {}).get('monitor_all_lists', False)
|
|
use_known_queues = config.get('auto_discovery', {}).get('use_known_queues', True)
|
|
|
|
discovered_databases = []
|
|
known_apps = get_known_applications(config) if use_known_queues else []
|
|
|
|
# Track which databases we've already processed from known apps
|
|
processed_dbs = set()
|
|
|
|
# First, add known applications (these are always monitored)
|
|
for app_config in known_apps:
|
|
db_number = app_config['db']
|
|
app_name = app_config['name']
|
|
known_queues = app_config['queues']
|
|
|
|
try:
|
|
redis_client = get_redis_client(db_number)
|
|
redis_client.ping() # Test connection
|
|
|
|
# For known apps, we monitor the queues even if they don't exist yet
|
|
discovered_databases.append({
|
|
'name': app_name,
|
|
'db_number': db_number,
|
|
'queues': known_queues,
|
|
'total_keys': redis_client.dbsize(),
|
|
'source': 'known_application'
|
|
})
|
|
processed_dbs.add(db_number)
|
|
logger.info(f"Known app {app_name} (DB {db_number}): {len(known_queues)} configured queues")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error connecting to known app {app_name} (DB {db_number}): {e}")
|
|
continue
|
|
|
|
# Then, do auto-discovery for remaining databases
|
|
for db_number in range(max_databases):
|
|
if db_number in processed_dbs:
|
|
continue # Skip databases we already processed
|
|
|
|
try:
|
|
redis_client = get_redis_client(db_number)
|
|
|
|
# Test connection and check if database has any keys
|
|
redis_client.ping()
|
|
db_size = redis_client.dbsize()
|
|
|
|
if db_size > 0:
|
|
# Discover queues in this database
|
|
queues = discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists)
|
|
|
|
if queues: # Only include databases that have queues/lists
|
|
db_name = database_names.get(db_number, f"db_{db_number}")
|
|
discovered_databases.append({
|
|
'name': db_name,
|
|
'db_number': db_number,
|
|
'queues': queues,
|
|
'total_keys': db_size,
|
|
'source': 'auto_discovery'
|
|
})
|
|
mode = "lists" if monitor_all_lists else "queues"
|
|
logger.info(f"Auto-discovered DB {db_number} ({db_name}): {len(queues)} {mode}, {db_size} total keys")
|
|
|
|
except redis.ConnectionError:
|
|
logger.debug(f"Cannot connect to database {db_number}")
|
|
continue
|
|
except Exception as e:
|
|
logger.debug(f"Error checking database {db_number}: {e}")
|
|
continue
|
|
|
|
known_count = len([db for db in discovered_databases if db.get('source') == 'known_application'])
|
|
discovered_count = len([db for db in discovered_databases if db.get('source') == 'auto_discovery'])
|
|
|
|
logger.info(f"Hybrid discovery complete: {known_count} known applications, {discovered_count} auto-discovered databases")
|
|
return discovered_databases
|
|
|
|
def collect_metrics():
|
|
config = load_config()
|
|
|
|
if not config['auto_discovery']['enabled']:
|
|
logger.error("Auto-discovery is disabled in configuration")
|
|
return
|
|
|
|
# Discover databases and queues
|
|
databases = discover_databases_and_queues(config)
|
|
|
|
if not databases:
|
|
logger.warning("No databases with queues discovered")
|
|
databases_discovered.set(0)
|
|
return
|
|
|
|
databases_discovered.set(len(databases))
|
|
queue_info = {}
|
|
total_queues = 0
|
|
min_queue_length = config.get('min_queue_length', 0)
|
|
|
|
for db_config in databases:
|
|
db_name = db_config['name']
|
|
db_number = db_config['db_number']
|
|
queues = db_config['queues']
|
|
|
|
try:
|
|
redis_client = get_redis_client(db_number)
|
|
|
|
# Test connection
|
|
redis_client.ping()
|
|
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(1)
|
|
|
|
total_queue_length = 0
|
|
active_queues = 0
|
|
|
|
for queue_name in queues:
|
|
try:
|
|
queue_length = redis_client.llen(queue_name)
|
|
|
|
# Only report queues that meet minimum length threshold
|
|
if queue_length >= min_queue_length:
|
|
celery_queue_length.labels(
|
|
queue_name=queue_name,
|
|
database=db_name,
|
|
db_number=str(db_number)
|
|
).set(queue_length)
|
|
|
|
total_queue_length += queue_length
|
|
if queue_length > 0:
|
|
active_queues += 1
|
|
logger.info(f"{db_name} (DB {db_number}) {queue_name}: {queue_length} tasks")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error checking {db_name} queue {queue_name}: {e}")
|
|
|
|
# Set total queue length for this database
|
|
celery_queue_length.labels(
|
|
queue_name='_total',
|
|
database=db_name,
|
|
db_number=str(db_number)
|
|
).set(total_queue_length)
|
|
|
|
# Track queues discovered per database
|
|
queues_discovered.labels(database=db_name).set(len(queues))
|
|
|
|
queue_info[f'{db_name}_total_length'] = str(total_queue_length)
|
|
queue_info[f'{db_name}_active_queues'] = str(active_queues)
|
|
queue_info[f'{db_name}_total_queues'] = str(len(queues))
|
|
queue_info[f'{db_name}_source'] = db_config.get('source', 'unknown')
|
|
|
|
total_queues += len(queues)
|
|
|
|
source_info = f" ({db_config.get('source', 'unknown')})" if 'source' in db_config else ""
|
|
if total_queue_length > 0:
|
|
logger.info(f"{db_name} (DB {db_number}){source_info}: {total_queue_length} total tasks in {active_queues}/{len(queues)} queues")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting metrics for {db_name} (DB {db_number}): {e}")
|
|
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(0)
|
|
|
|
# Update global queue info
|
|
queue_info.update({
|
|
'redis_host': REDIS_HOST,
|
|
'last_update': str(int(time.time())),
|
|
'databases_monitored': str(len(databases)),
|
|
'total_queues_discovered': str(total_queues),
|
|
'auto_discovery_enabled': 'true'
|
|
})
|
|
|
|
celery_queue_info.info(queue_info)
|
|
|
|
if __name__ == '__main__':
|
|
# Start Prometheus metrics server
|
|
start_http_server(8000)
|
|
logger.info("Celery metrics exporter started on port 8000")
|
|
|
|
# Collect metrics every 60 seconds
|
|
while True:
|
|
collect_metrics()
|
|
time.sleep(60)
|
|
|
|
---
|
|
# Celery Metrics Exporter Deployment
|
|
apiVersion: apps/v1
|
|
kind: Deployment
|
|
metadata:
|
|
name: celery-metrics-exporter
|
|
namespace: celery-monitoring
|
|
labels:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: metrics
|
|
spec:
|
|
replicas: 1
|
|
selector:
|
|
matchLabels:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: metrics
|
|
template:
|
|
metadata:
|
|
labels:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: metrics
|
|
spec:
|
|
containers:
|
|
- name: celery-metrics-exporter
|
|
image: python:3.11-slim
|
|
command:
|
|
- /bin/sh
|
|
- -c
|
|
- |
|
|
pip install redis prometheus_client pyyaml
|
|
python /scripts/celery_metrics.py
|
|
ports:
|
|
- containerPort: 8000
|
|
name: metrics
|
|
env:
|
|
- name: REDIS_HOST
|
|
value: "redis-ha-haproxy.redis-system.svc.cluster.local"
|
|
- name: REDIS_PORT
|
|
value: "6379"
|
|
- name: REDIS_PASSWORD
|
|
valueFrom:
|
|
secretKeyRef:
|
|
name: redis-credentials
|
|
key: redis-password
|
|
|
|
volumeMounts:
|
|
- name: script
|
|
mountPath: /scripts
|
|
- name: config
|
|
mountPath: /config
|
|
resources:
|
|
requests:
|
|
cpu: 50m
|
|
memory: 128Mi
|
|
limits:
|
|
cpu: 200m
|
|
memory: 256Mi
|
|
livenessProbe:
|
|
httpGet:
|
|
path: /metrics
|
|
port: 8000
|
|
initialDelaySeconds: 60
|
|
periodSeconds: 30
|
|
readinessProbe:
|
|
httpGet:
|
|
path: /metrics
|
|
port: 8000
|
|
initialDelaySeconds: 30
|
|
periodSeconds: 10
|
|
volumes:
|
|
- name: script
|
|
configMap:
|
|
name: celery-metrics-script
|
|
defaultMode: 0755
|
|
- name: config
|
|
configMap:
|
|
name: celery-exporter-config
|
|
|
|
---
|
|
# Service for Celery Metrics Exporter
|
|
apiVersion: v1
|
|
kind: Service
|
|
metadata:
|
|
name: celery-metrics-exporter
|
|
namespace: celery-monitoring
|
|
labels:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: metrics
|
|
spec:
|
|
selector:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: metrics
|
|
ports:
|
|
- port: 8000
|
|
targetPort: 8000
|
|
name: metrics
|
|
|
|
---
|
|
# ServiceMonitor for OpenTelemetry Collection
|
|
apiVersion: monitoring.coreos.com/v1
|
|
kind: ServiceMonitor
|
|
metadata:
|
|
name: celery-metrics-exporter
|
|
namespace: celery-monitoring
|
|
labels:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: metrics
|
|
spec:
|
|
selector:
|
|
matchLabels:
|
|
app.kubernetes.io/name: celery-metrics-exporter
|
|
app.kubernetes.io/component: metrics
|
|
endpoints:
|
|
- port: metrics
|
|
interval: 60s
|
|
path: /metrics
|