--- # Configuration for Celery Metrics Exporter apiVersion: v1 kind: ConfigMap metadata: name: celery-exporter-config namespace: celery-monitoring labels: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: config data: config.yaml: | # Auto-discovery settings auto_discovery: enabled: true scan_databases: false # Only scan known databases, not all 0-15 scan_queues: true # Auto-discover queues in each database monitor_all_lists: false # If true, monitor ALL Redis lists, not just queue-like ones use_known_queues: true # Monitor known queues even if they don't exist as lists yet # Queue patterns to look for (Redis list keys that are likely Celery queues) queue_patterns: - "celery" - "*_priority" # high_priority, medium_priority, low_priority - "default" - "mailers" - "push" - "scheduler" - "broadcast" - "federation" - "media" - "user_dir" # BookWyrm specific queues - "streams" - "images" - "suggested_users" - "email" - "connectors" - "lists" - "inbox" - "imports" - "import_triggered" - "misc" # PieFed specific queues - "background" - "send" # Pixelfed/Laravel specific queues - "high" - "mmo" # Common queue patterns - "*_queue" - "queue_*" # Known application configurations (monitored even when queues are empty) known_applications: - name: "piefed" db: 0 queues: ["celery", "background", "send"] - name: "bookwyrm" db: 3 queues: ["high_priority", "medium_priority", "low_priority", "streams", "images", "suggested_users", "email", "connectors", "lists", "inbox", "imports", "import_triggered", "broadcast", "misc"] - name: "mastodon" db: 1 queues: ["default", "mailers", "push", "scheduler"] # Optional: Database name mapping (if you want friendly names) # If not specified, databases will be named "db_0", "db_1", etc. database_names: 0: "piefed" 1: "mastodon" 2: "matrix" 3: "bookwyrm" # Minimum queue length to report (avoid noise from empty queues) min_queue_length: 0 # Maximum number of databases to scan (safety limit) max_databases: 4 --- # Custom Celery Metrics Exporter Script apiVersion: v1 kind: ConfigMap metadata: name: celery-metrics-script namespace: celery-monitoring data: celery_metrics.py: | #!/usr/bin/env python3 import redis import time import os import yaml import fnmatch from prometheus_client import start_http_server, Gauge, Counter, Info import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Prometheus metrics celery_queue_length = Gauge('celery_queue_length', 'Length of Celery queue', ['queue_name', 'database', 'db_number']) celery_queue_info = Info('celery_queue_info', 'Information about Celery queues') redis_connection_status = Gauge('redis_connection_status', 'Redis connection status (1=connected, 0=disconnected)', ['database', 'db_number']) databases_discovered = Gauge('celery_databases_discovered', 'Number of databases with queues discovered') queues_discovered = Gauge('celery_queues_discovered', 'Total number of queues discovered', ['database']) # Redis connection REDIS_HOST = os.getenv('REDIS_HOST', 'redis-ha-haproxy.redis-system.svc.cluster.local') REDIS_PORT = int(os.getenv('REDIS_PORT', '6379')) REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', '') def get_redis_client(db=0): return redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=db, decode_responses=True ) def load_config(): """Load configuration from YAML file""" config_path = '/config/config.yaml' default_config = { 'auto_discovery': { 'enabled': True, 'scan_databases': True, 'scan_queues': True }, 'queue_patterns': [ 'celery', '*_priority', 'default', 'mailers', 'push', 'scheduler', 'broadcast', 'federation', 'media', 'user_dir' ], 'database_names': {}, 'min_queue_length': 0, 'max_databases': 16 } try: if os.path.exists(config_path): with open(config_path, 'r') as f: config = yaml.safe_load(f) logger.info("Loaded configuration from file") return {**default_config, **config} else: logger.info("No config file found, using defaults") return default_config except Exception as e: logger.error(f"Error loading config: {e}, using defaults") return default_config def discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists=False): """Discover all potential Celery queues in a Redis database""" try: # Get all keys in the database all_keys = redis_client.keys('*') discovered_queues = [] for key in all_keys: # Check if key is a list (potential queue) try: key_type = redis_client.type(key) if key_type == 'list': if monitor_all_lists: # Monitor ALL Redis lists discovered_queues.append(key) else: # Smart filtering: Check if key matches any of our queue patterns for pattern in queue_patterns: if fnmatch.fnmatch(key, pattern): discovered_queues.append(key) break else: # Also include keys that look like queues (contain common queue words) queue_indicators = ['queue', 'celery', 'task', 'job', 'work'] if any(indicator in key.lower() for indicator in queue_indicators): discovered_queues.append(key) except Exception as e: logger.debug(f"Error checking key {key} in DB {db_number}: {e}") continue # Remove duplicates and sort discovered_queues = sorted(list(set(discovered_queues))) if discovered_queues: mode = "all lists" if monitor_all_lists else "filtered queues" logger.info(f"DB {db_number}: Discovered {len(discovered_queues)} {mode}: {discovered_queues}") return discovered_queues except Exception as e: logger.error(f"Error discovering queues in DB {db_number}: {e}") return [] def get_known_applications(config): """Get known application configurations""" return config.get('known_applications', []) def discover_databases_and_queues(config): """Hybrid approach: Use known applications + auto-discovery""" max_databases = config.get('max_databases', 16) queue_patterns = config.get('queue_patterns', ['celery', '*_priority']) database_names = config.get('database_names', {}) monitor_all_lists = config.get('auto_discovery', {}).get('monitor_all_lists', False) use_known_queues = config.get('auto_discovery', {}).get('use_known_queues', True) discovered_databases = [] known_apps = get_known_applications(config) if use_known_queues else [] # Track which databases we've already processed from known apps processed_dbs = set() # First, add known applications (these are always monitored) for app_config in known_apps: db_number = app_config['db'] app_name = app_config['name'] known_queues = app_config['queues'] try: redis_client = get_redis_client(db_number) redis_client.ping() # Test connection # For known apps, we monitor the queues even if they don't exist yet discovered_databases.append({ 'name': app_name, 'db_number': db_number, 'queues': known_queues, 'total_keys': redis_client.dbsize(), 'source': 'known_application' }) processed_dbs.add(db_number) logger.info(f"Known app {app_name} (DB {db_number}): {len(known_queues)} configured queues") except Exception as e: logger.error(f"Error connecting to known app {app_name} (DB {db_number}): {e}") continue # Then, do auto-discovery for remaining databases for db_number in range(max_databases): if db_number in processed_dbs: continue # Skip databases we already processed try: redis_client = get_redis_client(db_number) # Test connection and check if database has any keys redis_client.ping() db_size = redis_client.dbsize() if db_size > 0: # Discover queues in this database queues = discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists) if queues: # Only include databases that have queues/lists db_name = database_names.get(db_number, f"db_{db_number}") discovered_databases.append({ 'name': db_name, 'db_number': db_number, 'queues': queues, 'total_keys': db_size, 'source': 'auto_discovery' }) mode = "lists" if monitor_all_lists else "queues" logger.info(f"Auto-discovered DB {db_number} ({db_name}): {len(queues)} {mode}, {db_size} total keys") except redis.ConnectionError: logger.debug(f"Cannot connect to database {db_number}") continue except Exception as e: logger.debug(f"Error checking database {db_number}: {e}") continue known_count = len([db for db in discovered_databases if db.get('source') == 'known_application']) discovered_count = len([db for db in discovered_databases if db.get('source') == 'auto_discovery']) logger.info(f"Hybrid discovery complete: {known_count} known applications, {discovered_count} auto-discovered databases") return discovered_databases def collect_metrics(): config = load_config() if not config['auto_discovery']['enabled']: logger.error("Auto-discovery is disabled in configuration") return # Discover databases and queues databases = discover_databases_and_queues(config) if not databases: logger.warning("No databases with queues discovered") databases_discovered.set(0) return databases_discovered.set(len(databases)) queue_info = {} total_queues = 0 min_queue_length = config.get('min_queue_length', 0) for db_config in databases: db_name = db_config['name'] db_number = db_config['db_number'] queues = db_config['queues'] try: redis_client = get_redis_client(db_number) # Test connection redis_client.ping() redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(1) total_queue_length = 0 active_queues = 0 for queue_name in queues: try: queue_length = redis_client.llen(queue_name) # Only report queues that meet minimum length threshold if queue_length >= min_queue_length: celery_queue_length.labels( queue_name=queue_name, database=db_name, db_number=str(db_number) ).set(queue_length) total_queue_length += queue_length if queue_length > 0: active_queues += 1 logger.info(f"{db_name} (DB {db_number}) {queue_name}: {queue_length} tasks") except Exception as e: logger.warning(f"Error checking {db_name} queue {queue_name}: {e}") # Set total queue length for this database celery_queue_length.labels( queue_name='_total', database=db_name, db_number=str(db_number) ).set(total_queue_length) # Track queues discovered per database queues_discovered.labels(database=db_name).set(len(queues)) queue_info[f'{db_name}_total_length'] = str(total_queue_length) queue_info[f'{db_name}_active_queues'] = str(active_queues) queue_info[f'{db_name}_total_queues'] = str(len(queues)) queue_info[f'{db_name}_source'] = db_config.get('source', 'unknown') total_queues += len(queues) source_info = f" ({db_config.get('source', 'unknown')})" if 'source' in db_config else "" if total_queue_length > 0: logger.info(f"{db_name} (DB {db_number}){source_info}: {total_queue_length} total tasks in {active_queues}/{len(queues)} queues") except Exception as e: logger.error(f"Error collecting metrics for {db_name} (DB {db_number}): {e}") redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(0) # Update global queue info queue_info.update({ 'redis_host': REDIS_HOST, 'last_update': str(int(time.time())), 'databases_monitored': str(len(databases)), 'total_queues_discovered': str(total_queues), 'auto_discovery_enabled': 'true' }) celery_queue_info.info(queue_info) if __name__ == '__main__': # Start Prometheus metrics server start_http_server(8000) logger.info("Celery metrics exporter started on port 8000") # Collect metrics every 60 seconds while True: collect_metrics() time.sleep(60) --- # Celery Metrics Exporter Deployment apiVersion: apps/v1 kind: Deployment metadata: name: celery-metrics-exporter namespace: celery-monitoring labels: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: metrics spec: replicas: 1 selector: matchLabels: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: metrics template: metadata: labels: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: metrics spec: containers: - name: celery-metrics-exporter image: python:3.11-slim command: - /bin/sh - -c - | pip install redis prometheus_client pyyaml python /scripts/celery_metrics.py ports: - containerPort: 8000 name: metrics env: - name: REDIS_HOST value: "redis-ha-haproxy.redis-system.svc.cluster.local" - name: REDIS_PORT value: "6379" - name: REDIS_PASSWORD valueFrom: secretKeyRef: name: redis-credentials key: redis-password volumeMounts: - name: script mountPath: /scripts - name: config mountPath: /config resources: requests: cpu: 50m memory: 128Mi limits: cpu: 200m memory: 256Mi livenessProbe: httpGet: path: /metrics port: 8000 initialDelaySeconds: 60 periodSeconds: 30 readinessProbe: httpGet: path: /metrics port: 8000 initialDelaySeconds: 30 periodSeconds: 10 volumes: - name: script configMap: name: celery-metrics-script defaultMode: 0755 - name: config configMap: name: celery-exporter-config --- # Service for Celery Metrics Exporter apiVersion: v1 kind: Service metadata: name: celery-metrics-exporter namespace: celery-monitoring labels: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: metrics spec: selector: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: metrics ports: - port: 8000 targetPort: 8000 name: metrics --- # ServiceMonitor for OpenTelemetry Collection apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: name: celery-metrics-exporter namespace: celery-monitoring labels: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: metrics spec: selector: matchLabels: app.kubernetes.io/name: celery-metrics-exporter app.kubernetes.io/component: metrics endpoints: - port: metrics interval: 60s path: /metrics