add source code and readme
This commit is contained in:
@@ -0,0 +1,505 @@
|
||||
---
|
||||
# Configuration for Celery Metrics Exporter
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: celery-exporter-config
|
||||
namespace: celery-monitoring
|
||||
labels:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: config
|
||||
data:
|
||||
config.yaml: |
|
||||
# Auto-discovery settings
|
||||
auto_discovery:
|
||||
enabled: true
|
||||
scan_databases: false # Only scan known databases, not all 0-15
|
||||
scan_queues: true # Auto-discover queues in each database
|
||||
monitor_all_lists: false # If true, monitor ALL Redis lists, not just queue-like ones
|
||||
use_known_queues: true # Monitor known queues even if they don't exist as lists yet
|
||||
|
||||
# Queue patterns to look for (Redis list keys that are likely Celery queues)
|
||||
queue_patterns:
|
||||
- "celery"
|
||||
- "*_priority" # high_priority, medium_priority, low_priority
|
||||
- "default"
|
||||
- "mailers"
|
||||
- "push"
|
||||
- "scheduler"
|
||||
- "broadcast"
|
||||
- "federation"
|
||||
- "media"
|
||||
- "user_dir"
|
||||
# BookWyrm specific queues
|
||||
- "streams"
|
||||
- "images"
|
||||
- "suggested_users"
|
||||
- "email"
|
||||
- "connectors"
|
||||
- "lists"
|
||||
- "inbox"
|
||||
- "imports"
|
||||
- "import_triggered"
|
||||
- "misc"
|
||||
# PieFed specific queues
|
||||
- "background"
|
||||
- "send"
|
||||
# Pixelfed/Laravel specific queues
|
||||
- "high"
|
||||
- "mmo"
|
||||
# Common queue patterns
|
||||
- "*_queue"
|
||||
- "queue_*"
|
||||
|
||||
# Known application configurations (monitored even when queues are empty)
|
||||
known_applications:
|
||||
- name: "piefed"
|
||||
db: 0
|
||||
queues: ["celery", "background", "send"]
|
||||
- name: "bookwyrm"
|
||||
db: 3
|
||||
queues: ["high_priority", "medium_priority", "low_priority", "streams", "images", "suggested_users", "email", "connectors", "lists", "inbox", "imports", "import_triggered", "broadcast", "misc"]
|
||||
- name: "mastodon"
|
||||
db: 1
|
||||
queues: ["default", "mailers", "push", "scheduler"]
|
||||
|
||||
# Optional: Database name mapping (if you want friendly names)
|
||||
# If not specified, databases will be named "db_0", "db_1", etc.
|
||||
database_names:
|
||||
0: "piefed"
|
||||
1: "mastodon"
|
||||
2: "matrix"
|
||||
3: "bookwyrm"
|
||||
|
||||
# Minimum queue length to report (avoid noise from empty queues)
|
||||
min_queue_length: 0
|
||||
|
||||
# Maximum number of databases to scan (safety limit)
|
||||
max_databases: 4
|
||||
|
||||
---
|
||||
# Custom Celery Metrics Exporter Script
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: celery-metrics-script
|
||||
namespace: celery-monitoring
|
||||
data:
|
||||
celery_metrics.py: |
|
||||
#!/usr/bin/env python3
|
||||
import redis
|
||||
import time
|
||||
import os
|
||||
import yaml
|
||||
import fnmatch
|
||||
from prometheus_client import start_http_server, Gauge, Counter, Info
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Prometheus metrics
|
||||
celery_queue_length = Gauge('celery_queue_length', 'Length of Celery queue', ['queue_name', 'database', 'db_number'])
|
||||
celery_queue_info = Info('celery_queue_info', 'Information about Celery queues')
|
||||
redis_connection_status = Gauge('redis_connection_status', 'Redis connection status (1=connected, 0=disconnected)', ['database', 'db_number'])
|
||||
databases_discovered = Gauge('celery_databases_discovered', 'Number of databases with queues discovered')
|
||||
queues_discovered = Gauge('celery_queues_discovered', 'Total number of queues discovered', ['database'])
|
||||
|
||||
# Redis connection
|
||||
REDIS_HOST = os.getenv('REDIS_HOST', 'redis-ha-haproxy.redis-system.svc.cluster.local')
|
||||
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
|
||||
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', '')
|
||||
|
||||
def get_redis_client(db=0):
|
||||
return redis.Redis(
|
||||
host=REDIS_HOST,
|
||||
port=REDIS_PORT,
|
||||
password=REDIS_PASSWORD,
|
||||
db=db,
|
||||
decode_responses=True
|
||||
)
|
||||
|
||||
def load_config():
|
||||
"""Load configuration from YAML file"""
|
||||
config_path = '/config/config.yaml'
|
||||
default_config = {
|
||||
'auto_discovery': {
|
||||
'enabled': True,
|
||||
'scan_databases': True,
|
||||
'scan_queues': True
|
||||
},
|
||||
'queue_patterns': [
|
||||
'celery',
|
||||
'*_priority',
|
||||
'default',
|
||||
'mailers',
|
||||
'push',
|
||||
'scheduler',
|
||||
'broadcast',
|
||||
'federation',
|
||||
'media',
|
||||
'user_dir'
|
||||
],
|
||||
'database_names': {},
|
||||
'min_queue_length': 0,
|
||||
'max_databases': 16
|
||||
}
|
||||
|
||||
try:
|
||||
if os.path.exists(config_path):
|
||||
with open(config_path, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
logger.info("Loaded configuration from file")
|
||||
return {**default_config, **config}
|
||||
else:
|
||||
logger.info("No config file found, using defaults")
|
||||
return default_config
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading config: {e}, using defaults")
|
||||
return default_config
|
||||
|
||||
def discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists=False):
|
||||
"""Discover all potential Celery queues in a Redis database"""
|
||||
try:
|
||||
# Get all keys in the database
|
||||
all_keys = redis_client.keys('*')
|
||||
discovered_queues = []
|
||||
|
||||
for key in all_keys:
|
||||
# Check if key is a list (potential queue)
|
||||
try:
|
||||
key_type = redis_client.type(key)
|
||||
if key_type == 'list':
|
||||
if monitor_all_lists:
|
||||
# Monitor ALL Redis lists
|
||||
discovered_queues.append(key)
|
||||
else:
|
||||
# Smart filtering: Check if key matches any of our queue patterns
|
||||
for pattern in queue_patterns:
|
||||
if fnmatch.fnmatch(key, pattern):
|
||||
discovered_queues.append(key)
|
||||
break
|
||||
else:
|
||||
# Also include keys that look like queues (contain common queue words)
|
||||
queue_indicators = ['queue', 'celery', 'task', 'job', 'work']
|
||||
if any(indicator in key.lower() for indicator in queue_indicators):
|
||||
discovered_queues.append(key)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error checking key {key} in DB {db_number}: {e}")
|
||||
continue
|
||||
|
||||
# Remove duplicates and sort
|
||||
discovered_queues = sorted(list(set(discovered_queues)))
|
||||
|
||||
if discovered_queues:
|
||||
mode = "all lists" if monitor_all_lists else "filtered queues"
|
||||
logger.info(f"DB {db_number}: Discovered {len(discovered_queues)} {mode}: {discovered_queues}")
|
||||
|
||||
return discovered_queues
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error discovering queues in DB {db_number}: {e}")
|
||||
return []
|
||||
|
||||
def get_known_applications(config):
|
||||
"""Get known application configurations"""
|
||||
return config.get('known_applications', [])
|
||||
|
||||
def discover_databases_and_queues(config):
|
||||
"""Hybrid approach: Use known applications + auto-discovery"""
|
||||
max_databases = config.get('max_databases', 16)
|
||||
queue_patterns = config.get('queue_patterns', ['celery', '*_priority'])
|
||||
database_names = config.get('database_names', {})
|
||||
monitor_all_lists = config.get('auto_discovery', {}).get('monitor_all_lists', False)
|
||||
use_known_queues = config.get('auto_discovery', {}).get('use_known_queues', True)
|
||||
|
||||
discovered_databases = []
|
||||
known_apps = get_known_applications(config) if use_known_queues else []
|
||||
|
||||
# Track which databases we've already processed from known apps
|
||||
processed_dbs = set()
|
||||
|
||||
# First, add known applications (these are always monitored)
|
||||
for app_config in known_apps:
|
||||
db_number = app_config['db']
|
||||
app_name = app_config['name']
|
||||
known_queues = app_config['queues']
|
||||
|
||||
try:
|
||||
redis_client = get_redis_client(db_number)
|
||||
redis_client.ping() # Test connection
|
||||
|
||||
# For known apps, we monitor the queues even if they don't exist yet
|
||||
discovered_databases.append({
|
||||
'name': app_name,
|
||||
'db_number': db_number,
|
||||
'queues': known_queues,
|
||||
'total_keys': redis_client.dbsize(),
|
||||
'source': 'known_application'
|
||||
})
|
||||
processed_dbs.add(db_number)
|
||||
logger.info(f"Known app {app_name} (DB {db_number}): {len(known_queues)} configured queues")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting to known app {app_name} (DB {db_number}): {e}")
|
||||
continue
|
||||
|
||||
# Then, do auto-discovery for remaining databases
|
||||
for db_number in range(max_databases):
|
||||
if db_number in processed_dbs:
|
||||
continue # Skip databases we already processed
|
||||
|
||||
try:
|
||||
redis_client = get_redis_client(db_number)
|
||||
|
||||
# Test connection and check if database has any keys
|
||||
redis_client.ping()
|
||||
db_size = redis_client.dbsize()
|
||||
|
||||
if db_size > 0:
|
||||
# Discover queues in this database
|
||||
queues = discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists)
|
||||
|
||||
if queues: # Only include databases that have queues/lists
|
||||
db_name = database_names.get(db_number, f"db_{db_number}")
|
||||
discovered_databases.append({
|
||||
'name': db_name,
|
||||
'db_number': db_number,
|
||||
'queues': queues,
|
||||
'total_keys': db_size,
|
||||
'source': 'auto_discovery'
|
||||
})
|
||||
mode = "lists" if monitor_all_lists else "queues"
|
||||
logger.info(f"Auto-discovered DB {db_number} ({db_name}): {len(queues)} {mode}, {db_size} total keys")
|
||||
|
||||
except redis.ConnectionError:
|
||||
logger.debug(f"Cannot connect to database {db_number}")
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.debug(f"Error checking database {db_number}: {e}")
|
||||
continue
|
||||
|
||||
known_count = len([db for db in discovered_databases if db.get('source') == 'known_application'])
|
||||
discovered_count = len([db for db in discovered_databases if db.get('source') == 'auto_discovery'])
|
||||
|
||||
logger.info(f"Hybrid discovery complete: {known_count} known applications, {discovered_count} auto-discovered databases")
|
||||
return discovered_databases
|
||||
|
||||
def collect_metrics():
|
||||
config = load_config()
|
||||
|
||||
if not config['auto_discovery']['enabled']:
|
||||
logger.error("Auto-discovery is disabled in configuration")
|
||||
return
|
||||
|
||||
# Discover databases and queues
|
||||
databases = discover_databases_and_queues(config)
|
||||
|
||||
if not databases:
|
||||
logger.warning("No databases with queues discovered")
|
||||
databases_discovered.set(0)
|
||||
return
|
||||
|
||||
databases_discovered.set(len(databases))
|
||||
queue_info = {}
|
||||
total_queues = 0
|
||||
min_queue_length = config.get('min_queue_length', 0)
|
||||
|
||||
for db_config in databases:
|
||||
db_name = db_config['name']
|
||||
db_number = db_config['db_number']
|
||||
queues = db_config['queues']
|
||||
|
||||
try:
|
||||
redis_client = get_redis_client(db_number)
|
||||
|
||||
# Test connection
|
||||
redis_client.ping()
|
||||
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(1)
|
||||
|
||||
total_queue_length = 0
|
||||
active_queues = 0
|
||||
|
||||
for queue_name in queues:
|
||||
try:
|
||||
queue_length = redis_client.llen(queue_name)
|
||||
|
||||
# Only report queues that meet minimum length threshold
|
||||
if queue_length >= min_queue_length:
|
||||
celery_queue_length.labels(
|
||||
queue_name=queue_name,
|
||||
database=db_name,
|
||||
db_number=str(db_number)
|
||||
).set(queue_length)
|
||||
|
||||
total_queue_length += queue_length
|
||||
if queue_length > 0:
|
||||
active_queues += 1
|
||||
logger.info(f"{db_name} (DB {db_number}) {queue_name}: {queue_length} tasks")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error checking {db_name} queue {queue_name}: {e}")
|
||||
|
||||
# Set total queue length for this database
|
||||
celery_queue_length.labels(
|
||||
queue_name='_total',
|
||||
database=db_name,
|
||||
db_number=str(db_number)
|
||||
).set(total_queue_length)
|
||||
|
||||
# Track queues discovered per database
|
||||
queues_discovered.labels(database=db_name).set(len(queues))
|
||||
|
||||
queue_info[f'{db_name}_total_length'] = str(total_queue_length)
|
||||
queue_info[f'{db_name}_active_queues'] = str(active_queues)
|
||||
queue_info[f'{db_name}_total_queues'] = str(len(queues))
|
||||
queue_info[f'{db_name}_source'] = db_config.get('source', 'unknown')
|
||||
|
||||
total_queues += len(queues)
|
||||
|
||||
source_info = f" ({db_config.get('source', 'unknown')})" if 'source' in db_config else ""
|
||||
if total_queue_length > 0:
|
||||
logger.info(f"{db_name} (DB {db_number}){source_info}: {total_queue_length} total tasks in {active_queues}/{len(queues)} queues")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error collecting metrics for {db_name} (DB {db_number}): {e}")
|
||||
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(0)
|
||||
|
||||
# Update global queue info
|
||||
queue_info.update({
|
||||
'redis_host': REDIS_HOST,
|
||||
'last_update': str(int(time.time())),
|
||||
'databases_monitored': str(len(databases)),
|
||||
'total_queues_discovered': str(total_queues),
|
||||
'auto_discovery_enabled': 'true'
|
||||
})
|
||||
|
||||
celery_queue_info.info(queue_info)
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Start Prometheus metrics server
|
||||
start_http_server(8000)
|
||||
logger.info("Celery metrics exporter started on port 8000")
|
||||
|
||||
# Collect metrics every 60 seconds
|
||||
while True:
|
||||
collect_metrics()
|
||||
time.sleep(60)
|
||||
|
||||
---
|
||||
# Celery Metrics Exporter Deployment
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: celery-metrics-exporter
|
||||
namespace: celery-monitoring
|
||||
labels:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: metrics
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: metrics
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: metrics
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-metrics-exporter
|
||||
image: python:3.11-slim
|
||||
command:
|
||||
- /bin/sh
|
||||
- -c
|
||||
- |
|
||||
pip install redis prometheus_client pyyaml
|
||||
python /scripts/celery_metrics.py
|
||||
ports:
|
||||
- containerPort: 8000
|
||||
name: metrics
|
||||
env:
|
||||
- name: REDIS_HOST
|
||||
value: "redis-ha-haproxy.redis-system.svc.cluster.local"
|
||||
- name: REDIS_PORT
|
||||
value: "6379"
|
||||
- name: REDIS_PASSWORD
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: redis-credentials
|
||||
key: redis-password
|
||||
|
||||
volumeMounts:
|
||||
- name: script
|
||||
mountPath: /scripts
|
||||
- name: config
|
||||
mountPath: /config
|
||||
resources:
|
||||
requests:
|
||||
cpu: 50m
|
||||
memory: 128Mi
|
||||
limits:
|
||||
cpu: 200m
|
||||
memory: 256Mi
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /metrics
|
||||
port: 8000
|
||||
initialDelaySeconds: 60
|
||||
periodSeconds: 30
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /metrics
|
||||
port: 8000
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 10
|
||||
volumes:
|
||||
- name: script
|
||||
configMap:
|
||||
name: celery-metrics-script
|
||||
defaultMode: 0755
|
||||
- name: config
|
||||
configMap:
|
||||
name: celery-exporter-config
|
||||
|
||||
---
|
||||
# Service for Celery Metrics Exporter
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: celery-metrics-exporter
|
||||
namespace: celery-monitoring
|
||||
labels:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: metrics
|
||||
spec:
|
||||
selector:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: metrics
|
||||
ports:
|
||||
- port: 8000
|
||||
targetPort: 8000
|
||||
name: metrics
|
||||
|
||||
---
|
||||
# ServiceMonitor for OpenTelemetry Collection
|
||||
apiVersion: monitoring.coreos.com/v1
|
||||
kind: ServiceMonitor
|
||||
metadata:
|
||||
name: celery-metrics-exporter
|
||||
namespace: celery-monitoring
|
||||
labels:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: metrics
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: celery-metrics-exporter
|
||||
app.kubernetes.io/component: metrics
|
||||
endpoints:
|
||||
- port: metrics
|
||||
interval: 60s
|
||||
path: /metrics
|
||||
Reference in New Issue
Block a user