redaction (#1)

Add the redacted source file for demo purposes

Reviewed-on: https://source.michaeldileo.org/michael_dileo/Keybard-Vagabond-Demo/pulls/1
Co-authored-by: Michael DiLeo <michael_dileo@proton.me>
Co-committed-by: Michael DiLeo <michael_dileo@proton.me>
This commit was merged in pull request #1.
This commit is contained in:
2025-12-24 13:40:47 +00:00
committed by michael_dileo
parent 612235d52b
commit 7327d77dcd
333 changed files with 39286 additions and 1 deletions

View File

@@ -0,0 +1,298 @@
# Auto-Discovery Celery Metrics Exporter
The Celery metrics exporter now **automatically discovers** all Redis databases and their queues without requiring manual configuration. It scans all Redis databases (0-15) and identifies potential Celery queues based on patterns and naming conventions.
## How Auto-Discovery Works
### Automatic Database Scanning
- Scans Redis databases 0-15 by default
- Only monitors databases that contain keys
- Only includes databases that have identifiable queues
### Automatic Queue Discovery
The exporter supports two discovery modes:
#### Smart Filtering Mode (Default: `monitor_all_lists: false`)
Identifies queues using multiple strategies:
1. **Pattern Matching**: Matches known queue patterns from your applications:
- `celery`, `*_priority`, `default`, `mailers`, `push`, `scheduler`
- `streams`, `images`, `suggested_users`, `email`, `connectors`, `lists`, `inbox`, `imports`, `import_triggered`, `misc` (BookWyrm)
- `background`, `send` (PieFed)
- `high`, `mmo` (Pixelfed/Laravel)
2. **Heuristic Detection**: Identifies Redis lists containing queue-related keywords:
- Keys containing: `queue`, `celery`, `task`, `job`, `work`
3. **Type Checking**: Only considers Redis `list` type keys (Celery queues are Redis lists)
#### Monitor Everything Mode (`monitor_all_lists: true`)
- Monitors **ALL** Redis list-type keys in all databases
- No filtering or pattern matching
- Maximum visibility but potentially more noise
- Useful for debugging or comprehensive monitoring
### Which Mode Should You Use?
**Use Smart Filtering (default)** when:
- ✅ You want clean, relevant metrics
- ✅ You care about Prometheus cardinality limits
- ✅ Your applications use standard queue naming
- ✅ You want to avoid monitoring non-queue Redis lists
**Use Monitor Everything** when:
- ✅ You're debugging queue discovery issues
- ✅ You have non-standard queue names not covered by patterns
- ✅ You want absolute certainty you're not missing anything
- ✅ You have sufficient Prometheus storage/performance headroom
- ❌ You don't mind potential noise from non-queue lists
## Configuration (Optional)
While the exporter works completely automatically, you can customize its behavior via the `celery-exporter-config` ConfigMap:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: celery-exporter-config
namespace: celery-monitoring
data:
config.yaml: |
# Auto-discovery settings
auto_discovery:
enabled: true
scan_databases: true # Scan all Redis databases 0-15
scan_queues: true # Auto-discover queues in each database
monitor_all_lists: false # If true, monitor ALL Redis lists, not just queue-like ones
# Queue patterns to look for (Redis list keys that are likely Celery queues)
queue_patterns:
- "celery"
- "*_priority"
- "default"
- "mailers"
- "push"
- "scheduler"
- "broadcast"
- "federation"
- "media"
- "user_dir"
# Optional: Database name mapping (if you want friendly names)
# If not specified, databases will be named "db_0", "db_1", etc.
database_names:
0: "piefed"
1: "mastodon"
2: "matrix"
3: "bookwyrm"
# Minimum queue length to report (avoid noise from empty queues)
min_queue_length: 0
# Maximum number of databases to scan (safety limit)
max_databases: 16
```
## Adding New Applications
**No configuration needed!** New applications are automatically discovered when they:
1. **Use a Redis database** (any database 0-15)
2. **Create queues** that match common patterns or contain queue-related keywords
3. **Use Redis lists** for their queues (standard Celery behavior)
### Custom Queue Patterns
If your application uses non-standard queue names, add them to the `queue_patterns` list:
```bash
kubectl edit configmap celery-exporter-config -n celery-monitoring
```
Add your pattern:
```yaml
queue_patterns:
- "celery"
- "*_priority"
- "my_custom_queue_*" # Add your pattern here
```
### Friendly Database Names
To give databases friendly names instead of `db_0`, `db_1`, etc.:
```yaml
database_names:
0: "piefed"
1: "mastodon"
2: "matrix"
3: "bookwyrm"
4: "my_new_app" # Add your app here
```
## Metrics Produced
The exporter produces these metrics for each discovered database:
### `celery_queue_length`
- **Labels**: `queue_name`, `database`, `db_number`
- **Description**: Number of pending tasks in each queue
- **Example**: `celery_queue_length{queue_name="celery", database="piefed", db_number="0"} 1234`
- **Special**: `queue_name="_total"` shows total tasks across all queues in a database
### `redis_connection_status`
- **Labels**: `database`, `db_number`
- **Description**: Connection status per database (1=connected, 0=disconnected)
- **Example**: `redis_connection_status{database="piefed", db_number="0"} 1`
### `celery_databases_discovered`
- **Description**: Total number of databases with queues discovered
- **Example**: `celery_databases_discovered 4`
### `celery_queues_discovered`
- **Labels**: `database`
- **Description**: Number of queues discovered per database
- **Example**: `celery_queues_discovered{database="bookwyrm"} 5`
### `celery_queue_info`
- **Description**: General information about all monitored queues
- **Includes**: Total lengths, Redis host, last update timestamp, auto-discovery status
## PromQL Query Examples
### Discovery Overview
```promql
# How many databases were discovered
celery_databases_discovered
# How many queues per database
celery_queues_discovered
# Auto-discovery status
celery_queue_info
```
### All Applications Overview
```promql
# All queue lengths grouped by database
sum by (database) (celery_queue_length{queue_name!="_total"})
# Total tasks across all databases
sum(celery_queue_length{queue_name="_total"})
# Individual queues (excluding totals)
celery_queue_length{queue_name!="_total"}
# Only active queues (> 0 tasks)
celery_queue_length{queue_name!="_total"} > 0
```
### Specific Applications
```promql
# PieFed queues only
celery_queue_length{database="piefed", queue_name!="_total"}
# BookWyrm high priority queue (if it exists)
celery_queue_length{database="bookwyrm", queue_name="high_priority"}
# All applications' main celery queue
celery_queue_length{queue_name="celery"}
# Database totals only
celery_queue_length{queue_name="_total"}
```
### Processing Rates
```promql
# Tasks processed per minute (negative = queue decreasing)
rate(celery_queue_length{queue_name!="_total"}[5m]) * -60
# Processing rate by database (using totals)
rate(celery_queue_length{queue_name="_total"}[5m]) * -60
# Overall processing rate across all databases
sum(rate(celery_queue_length{queue_name="_total"}[5m]) * -60)
```
### Health Monitoring
```promql
# Databases with connection issues
redis_connection_status == 0
# Queues growing too fast
increase(celery_queue_length{queue_name!="_total"}[5m]) > 1000
# Stalled processing (no change in 15 minutes)
changes(celery_queue_length{queue_name="_total"}[15m]) == 0 and celery_queue_length{queue_name="_total"} > 100
# Databases that stopped being discovered
changes(celery_databases_discovered[10m]) < 0
```
## Troubleshooting
### Check Auto-Discovery Status
```bash
# View current configuration
kubectl get configmap celery-exporter-config -n celery-monitoring -o yaml
# Check exporter logs for discovery results
kubectl logs -n celery-monitoring deployment/celery-metrics-exporter
# Look for discovery messages like:
# "Database 0 (piefed): 1 queues, 245 total keys"
# "Auto-discovery complete: Found 3 databases with queues"
```
### Test Redis Connectivity
```bash
# Test connection to specific database
kubectl exec -n redis-system redis-master-0 -- redis-cli -a PASSWORD -n DB_NUMBER ping
# Check what keys exist in a database
kubectl exec -n redis-system redis-master-0 -- redis-cli -a PASSWORD -n DB_NUMBER keys '*'
# Check if a key is a list (queue)
kubectl exec -n redis-system redis-master-0 -- redis-cli -a PASSWORD -n DB_NUMBER type QUEUE_NAME
# Check queue length manually
kubectl exec -n redis-system redis-master-0 -- redis-cli -a PASSWORD -n DB_NUMBER llen QUEUE_NAME
```
### Validate Metrics
```bash
# Port forward and check metrics endpoint
kubectl port-forward -n celery-monitoring svc/celery-metrics-exporter 8000:8000
# Check discovery metrics
curl http://localhost:8000/metrics | grep celery_databases_discovered
curl http://localhost:8000/metrics | grep celery_queues_discovered
# Check queue metrics
curl http://localhost:8000/metrics | grep celery_queue_length
```
### Debug Discovery Issues
If queues aren't being discovered:
1. **Check queue patterns** - Add your queue names to `queue_patterns`
2. **Verify queue type** - Ensure queues are Redis lists: `redis-cli type queue_name`
3. **Check database numbers** - Verify your app uses the expected Redis database
4. **Review logs** - Look for discovery debug messages in exporter logs
### Force Restart Discovery
```bash
# Restart the exporter to re-run discovery
kubectl rollout restart deployment/celery-metrics-exporter -n celery-monitoring
```
## Security Notes
- The exporter connects to Redis using the shared `redis-credentials` secret
- All database connections use the same Redis host and password
- Only queue length information is exposed, not queue contents
- The exporter scans all databases but only reports queue-like keys
- Metrics are scraped via ServiceMonitor for OpenTelemetry collection

View File

@@ -0,0 +1,203 @@
# Celery Monitoring (Flower)
This directory contains the infrastructure for monitoring Celery tasks across all applications in the cluster using Flower.
## Overview
- **Flower**: Web-based tool for monitoring and administrating Celery clusters
- **Multi-Application**: Monitors both PieFed and BookWyrm Celery tasks
- **Namespace**: `celery-monitoring`
- **URL**: `https://flower.keyboardvagabond.com`
## Components
- `namespace.yaml` - Dedicated namespace for monitoring
- `flower-deployment.yaml` - Flower application deployment
- `service.yaml` - Internal service for Flower
- `ingress.yaml` - External access with TLS and basic auth
- `kustomization.yaml` - Kustomize configuration
## Redis Database Monitoring
Flower monitors multiple Redis databases:
- **Database 0**: PieFed Celery broker
- **Database 3**: BookWyrm Celery broker
## Access & Security
- **Access Method**: kubectl port-forward (local access only)
- **Command**: `kubectl port-forward -n celery-monitoring svc/celery-flower 8080:5555`
- **URL**: http://localhost:8080
- **Security**: No authentication required (local access only)
- **Network Policies**: Cilium policies allow cluster and health check access only
### Port-Forward Setup
1. **Prerequisites**:
- Valid kubeconfig with access to the cluster
- kubectl installed and configured
- RBAC permissions to create port-forwards in celery-monitoring namespace
2. **Network Policies**: Cilium policies ensure:
- Port 5555 access from cluster and host (for port-forward)
- Redis access for monitoring (DB 0 & 3)
- Cluster-internal health checks
3. **No Authentication Required**:
- Port-forward provides secure local access
- No additional credentials needed
## **🔒 Simplified Security Architecture**
**Current Status**: ✅ **Local access via kubectl port-forward**
### **Security Model**
**1. Local Access Only**
- **Port-Forward**: `kubectl port-forward` provides secure tunnel to the service
- **No External Exposure**: Service is not accessible from outside the cluster
- **Authentication**: Kubernetes RBAC controls who can create port-forwards
- **Encryption**: Traffic encrypted via Kubernetes API tunnel
**2. Network Layer (Cilium Network Policies)**
- **`celery-flower-ingress`**: Allows cluster and host access for port-forward and health checks
- **`celery-flower-egress`**: Restricts outbound to Redis and DNS only
- **DNS Resolution**: Explicit DNS access for service discovery
- **Redis Connectivity**: Targeted access to Redis master (DB 0 & 3)
**3. Pod-Level Security**
- Resource limits (CPU: 500m, Memory: 256Mi)
- Health checks (liveness/readiness probes)
- Non-root container execution
- Read-only root filesystem (where possible)
### **How It Works**
1. **Access Layer**: kubectl port-forward creates secure tunnel via Kubernetes API
2. **Network Layer**: Cilium policies ensure only cluster traffic reaches pods
3. **Application Layer**: Flower connects only to authorized Redis databases
4. **Monitoring Layer**: Health checks ensure service availability
5. **Local Security**: Access requires valid kubeconfig and RBAC permissions
## Features
- **Flower Web UI**: Real-time task monitoring and worker status
- **Prometheus Metrics**: Custom Celery queue metrics exported to OpenObserve
- **Automated Alerts**: Queue size and connection status monitoring
- **Dashboard**: Visual monitoring of queue trends and processing rates
## Monitoring & Alerts
### Metrics Exported
**From Celery Metrics Exporter** (celery-monitoring namespace):
1. **`celery_queue_length`**: Number of pending tasks in each queue
- Labels: `queue_name`, `database` (piefed/bookwyrm)
2. **`redis_connection_status`**: Redis connectivity status (1=connected, 0=disconnected)
3. **`celery_queue_info`**: General information about queue status
**From Redis Exporter** (redis-system namespace):
4. **`redis_list_length`**: General Redis list lengths including Celery queues
5. **`redis_memory_used_bytes`**: Redis memory usage
6. **`redis_connected_clients`**: Number of connected Redis clients
7. **`redis_commands_total`**: Total Redis commands executed
### Alert Thresholds
- **PieFed Warning**: > 10,000 pending tasks
- **PieFed Critical**: > 50,000 pending tasks
- **BookWyrm Warning**: > 1,000 pending tasks
- **Redis Connection**: Connection lost alert
### OpenObserve Setup
1. **Deploy the monitoring infrastructure**:
```bash
kubectl apply -k manifests/infrastructure/celery-monitoring/
```
2. **Import alerts and dashboard**:
- Access OpenObserve dashboard
- Import alert configurations from the `openobserve-alert-configs` ConfigMap
- Import dashboard from the same ConfigMap
- Configure webhook URLs for notifications
3. **Verify metrics collection**:
```sql
SELECT * FROM metrics WHERE __name__ LIKE 'celery_%' ORDER BY _timestamp DESC LIMIT 10
```
### Useful Monitoring Queries
**Current queue sizes**:
```sql
SELECT queue_name, database, celery_queue_length
FROM metrics
WHERE _timestamp >= now() - interval '5 minutes'
GROUP BY queue_name, database
ORDER BY celery_queue_length DESC
```
**Queue processing rate**:
```sql
SELECT _timestamp,
celery_queue_length - LAG(celery_queue_length, 1) OVER (ORDER BY _timestamp) as processing_rate
FROM metrics
WHERE queue_name='celery' AND database='piefed'
AND _timestamp >= now() - interval '1 hour'
```
- Queue length monitoring
- Task history and details
- Performance metrics
- Multi-broker support
## Dependencies
- Redis (for Celery brokers)
- kubectl (for port-forward access)
- Valid kubeconfig with cluster access
## Testing & Validation
### Quick Access
```bash
# Start port-forward (runs in background)
kubectl port-forward -n celery-monitoring svc/celery-flower 8080:5555 &
# Access Flower UI
open http://localhost:8080
# or visit http://localhost:8080 in your browser
# Stop port-forward when done
pkill -f "kubectl port-forward.*celery-flower"
```
### Manual Testing Checklist
1. **Port-Forward Access**: ✅ Can access http://localhost:8080 after port-forward
2. **No External Access**: ❌ Service not accessible from outside cluster
3. **Redis Connectivity**: 📊 Shows tasks from both PieFed (DB 0) and BookWyrm (DB 3)
4. **Health Checks**: ✅ Pod shows Ready status
5. **Network Policies**: 🛡️ Egress restricted to DNS and Redis only
### Troubleshooting Commands
```bash
# Check Flower pod status
kubectl get pods -n celery-monitoring -l app.kubernetes.io/name=celery-flower
# View Flower logs
kubectl logs -n celery-monitoring -l app.kubernetes.io/name=celery-flower
# Test Redis connectivity
kubectl exec -n celery-monitoring -it deployment/celery-flower -- wget -qO- http://localhost:5555
# Check network policies
kubectl get cnp -n celery-monitoring
# Test port-forward connectivity
kubectl port-forward -n celery-monitoring svc/celery-flower 8080:5555 --dry-run=client
```
## Deployment
Deployed automatically via Flux GitOps from `manifests/cluster/flux-system/celery-monitoring.yaml`.

View File

@@ -0,0 +1,505 @@
---
# Configuration for Celery Metrics Exporter
apiVersion: v1
kind: ConfigMap
metadata:
name: celery-exporter-config
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: config
data:
config.yaml: |
# Auto-discovery settings
auto_discovery:
enabled: true
scan_databases: false # Only scan known databases, not all 0-15
scan_queues: true # Auto-discover queues in each database
monitor_all_lists: false # If true, monitor ALL Redis lists, not just queue-like ones
use_known_queues: true # Monitor known queues even if they don't exist as lists yet
# Queue patterns to look for (Redis list keys that are likely Celery queues)
queue_patterns:
- "celery"
- "*_priority" # high_priority, medium_priority, low_priority
- "default"
- "mailers"
- "push"
- "scheduler"
- "broadcast"
- "federation"
- "media"
- "user_dir"
# BookWyrm specific queues
- "streams"
- "images"
- "suggested_users"
- "email"
- "connectors"
- "lists"
- "inbox"
- "imports"
- "import_triggered"
- "misc"
# PieFed specific queues
- "background"
- "send"
# Pixelfed/Laravel specific queues
- "high"
- "mmo"
# Common queue patterns
- "*_queue"
- "queue_*"
# Known application configurations (monitored even when queues are empty)
known_applications:
- name: "piefed"
db: 0
queues: ["celery", "background", "send"]
- name: "bookwyrm"
db: 3
queues: ["high_priority", "medium_priority", "low_priority", "streams", "images", "suggested_users", "email", "connectors", "lists", "inbox", "imports", "import_triggered", "broadcast", "misc"]
- name: "mastodon"
db: 1
queues: ["default", "mailers", "push", "scheduler"]
# Optional: Database name mapping (if you want friendly names)
# If not specified, databases will be named "db_0", "db_1", etc.
database_names:
0: "piefed"
1: "mastodon"
2: "matrix"
3: "bookwyrm"
# Minimum queue length to report (avoid noise from empty queues)
min_queue_length: 0
# Maximum number of databases to scan (safety limit)
max_databases: 4
---
# Custom Celery Metrics Exporter Script
apiVersion: v1
kind: ConfigMap
metadata:
name: celery-metrics-script
namespace: celery-monitoring
data:
celery_metrics.py: |
#!/usr/bin/env python3
import redis
import time
import os
import yaml
import fnmatch
from prometheus_client import start_http_server, Gauge, Counter, Info
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheus metrics
celery_queue_length = Gauge('celery_queue_length', 'Length of Celery queue', ['queue_name', 'database', 'db_number'])
celery_queue_info = Info('celery_queue_info', 'Information about Celery queues')
redis_connection_status = Gauge('redis_connection_status', 'Redis connection status (1=connected, 0=disconnected)', ['database', 'db_number'])
databases_discovered = Gauge('celery_databases_discovered', 'Number of databases with queues discovered')
queues_discovered = Gauge('celery_queues_discovered', 'Total number of queues discovered', ['database'])
# Redis connection
REDIS_HOST = os.getenv('REDIS_HOST', 'redis-ha-haproxy.redis-system.svc.cluster.local')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', '')
def get_redis_client(db=0):
return redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
db=db,
decode_responses=True
)
def load_config():
"""Load configuration from YAML file"""
config_path = '/config/config.yaml'
default_config = {
'auto_discovery': {
'enabled': True,
'scan_databases': True,
'scan_queues': True
},
'queue_patterns': [
'celery',
'*_priority',
'default',
'mailers',
'push',
'scheduler',
'broadcast',
'federation',
'media',
'user_dir'
],
'database_names': {},
'min_queue_length': 0,
'max_databases': 16
}
try:
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info("Loaded configuration from file")
return {**default_config, **config}
else:
logger.info("No config file found, using defaults")
return default_config
except Exception as e:
logger.error(f"Error loading config: {e}, using defaults")
return default_config
def discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists=False):
"""Discover all potential Celery queues in a Redis database"""
try:
# Get all keys in the database
all_keys = redis_client.keys('*')
discovered_queues = []
for key in all_keys:
# Check if key is a list (potential queue)
try:
key_type = redis_client.type(key)
if key_type == 'list':
if monitor_all_lists:
# Monitor ALL Redis lists
discovered_queues.append(key)
else:
# Smart filtering: Check if key matches any of our queue patterns
for pattern in queue_patterns:
if fnmatch.fnmatch(key, pattern):
discovered_queues.append(key)
break
else:
# Also include keys that look like queues (contain common queue words)
queue_indicators = ['queue', 'celery', 'task', 'job', 'work']
if any(indicator in key.lower() for indicator in queue_indicators):
discovered_queues.append(key)
except Exception as e:
logger.debug(f"Error checking key {key} in DB {db_number}: {e}")
continue
# Remove duplicates and sort
discovered_queues = sorted(list(set(discovered_queues)))
if discovered_queues:
mode = "all lists" if monitor_all_lists else "filtered queues"
logger.info(f"DB {db_number}: Discovered {len(discovered_queues)} {mode}: {discovered_queues}")
return discovered_queues
except Exception as e:
logger.error(f"Error discovering queues in DB {db_number}: {e}")
return []
def get_known_applications(config):
"""Get known application configurations"""
return config.get('known_applications', [])
def discover_databases_and_queues(config):
"""Hybrid approach: Use known applications + auto-discovery"""
max_databases = config.get('max_databases', 16)
queue_patterns = config.get('queue_patterns', ['celery', '*_priority'])
database_names = config.get('database_names', {})
monitor_all_lists = config.get('auto_discovery', {}).get('monitor_all_lists', False)
use_known_queues = config.get('auto_discovery', {}).get('use_known_queues', True)
discovered_databases = []
known_apps = get_known_applications(config) if use_known_queues else []
# Track which databases we've already processed from known apps
processed_dbs = set()
# First, add known applications (these are always monitored)
for app_config in known_apps:
db_number = app_config['db']
app_name = app_config['name']
known_queues = app_config['queues']
try:
redis_client = get_redis_client(db_number)
redis_client.ping() # Test connection
# For known apps, we monitor the queues even if they don't exist yet
discovered_databases.append({
'name': app_name,
'db_number': db_number,
'queues': known_queues,
'total_keys': redis_client.dbsize(),
'source': 'known_application'
})
processed_dbs.add(db_number)
logger.info(f"Known app {app_name} (DB {db_number}): {len(known_queues)} configured queues")
except Exception as e:
logger.error(f"Error connecting to known app {app_name} (DB {db_number}): {e}")
continue
# Then, do auto-discovery for remaining databases
for db_number in range(max_databases):
if db_number in processed_dbs:
continue # Skip databases we already processed
try:
redis_client = get_redis_client(db_number)
# Test connection and check if database has any keys
redis_client.ping()
db_size = redis_client.dbsize()
if db_size > 0:
# Discover queues in this database
queues = discover_queues_in_database(redis_client, db_number, queue_patterns, monitor_all_lists)
if queues: # Only include databases that have queues/lists
db_name = database_names.get(db_number, f"db_{db_number}")
discovered_databases.append({
'name': db_name,
'db_number': db_number,
'queues': queues,
'total_keys': db_size,
'source': 'auto_discovery'
})
mode = "lists" if monitor_all_lists else "queues"
logger.info(f"Auto-discovered DB {db_number} ({db_name}): {len(queues)} {mode}, {db_size} total keys")
except redis.ConnectionError:
logger.debug(f"Cannot connect to database {db_number}")
continue
except Exception as e:
logger.debug(f"Error checking database {db_number}: {e}")
continue
known_count = len([db for db in discovered_databases if db.get('source') == 'known_application'])
discovered_count = len([db for db in discovered_databases if db.get('source') == 'auto_discovery'])
logger.info(f"Hybrid discovery complete: {known_count} known applications, {discovered_count} auto-discovered databases")
return discovered_databases
def collect_metrics():
config = load_config()
if not config['auto_discovery']['enabled']:
logger.error("Auto-discovery is disabled in configuration")
return
# Discover databases and queues
databases = discover_databases_and_queues(config)
if not databases:
logger.warning("No databases with queues discovered")
databases_discovered.set(0)
return
databases_discovered.set(len(databases))
queue_info = {}
total_queues = 0
min_queue_length = config.get('min_queue_length', 0)
for db_config in databases:
db_name = db_config['name']
db_number = db_config['db_number']
queues = db_config['queues']
try:
redis_client = get_redis_client(db_number)
# Test connection
redis_client.ping()
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(1)
total_queue_length = 0
active_queues = 0
for queue_name in queues:
try:
queue_length = redis_client.llen(queue_name)
# Only report queues that meet minimum length threshold
if queue_length >= min_queue_length:
celery_queue_length.labels(
queue_name=queue_name,
database=db_name,
db_number=str(db_number)
).set(queue_length)
total_queue_length += queue_length
if queue_length > 0:
active_queues += 1
logger.info(f"{db_name} (DB {db_number}) {queue_name}: {queue_length} tasks")
except Exception as e:
logger.warning(f"Error checking {db_name} queue {queue_name}: {e}")
# Set total queue length for this database
celery_queue_length.labels(
queue_name='_total',
database=db_name,
db_number=str(db_number)
).set(total_queue_length)
# Track queues discovered per database
queues_discovered.labels(database=db_name).set(len(queues))
queue_info[f'{db_name}_total_length'] = str(total_queue_length)
queue_info[f'{db_name}_active_queues'] = str(active_queues)
queue_info[f'{db_name}_total_queues'] = str(len(queues))
queue_info[f'{db_name}_source'] = db_config.get('source', 'unknown')
total_queues += len(queues)
source_info = f" ({db_config.get('source', 'unknown')})" if 'source' in db_config else ""
if total_queue_length > 0:
logger.info(f"{db_name} (DB {db_number}){source_info}: {total_queue_length} total tasks in {active_queues}/{len(queues)} queues")
except Exception as e:
logger.error(f"Error collecting metrics for {db_name} (DB {db_number}): {e}")
redis_connection_status.labels(database=db_name, db_number=str(db_number)).set(0)
# Update global queue info
queue_info.update({
'redis_host': REDIS_HOST,
'last_update': str(int(time.time())),
'databases_monitored': str(len(databases)),
'total_queues_discovered': str(total_queues),
'auto_discovery_enabled': 'true'
})
celery_queue_info.info(queue_info)
if __name__ == '__main__':
# Start Prometheus metrics server
start_http_server(8000)
logger.info("Celery metrics exporter started on port 8000")
# Collect metrics every 60 seconds
while True:
collect_metrics()
time.sleep(60)
---
# Celery Metrics Exporter Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-metrics-exporter
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
template:
metadata:
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
containers:
- name: celery-metrics-exporter
image: python:3.11-slim
command:
- /bin/sh
- -c
- |
pip install redis prometheus_client pyyaml
python /scripts/celery_metrics.py
ports:
- containerPort: 8000
name: metrics
env:
- name: REDIS_HOST
value: "redis-ha-haproxy.redis-system.svc.cluster.local"
- name: REDIS_PORT
value: "6379"
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-credentials
key: redis-password
volumeMounts:
- name: script
mountPath: /scripts
- name: config
mountPath: /config
resources:
requests:
cpu: 50m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
livenessProbe:
httpGet:
path: /metrics
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /metrics
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: script
configMap:
name: celery-metrics-script
defaultMode: 0755
- name: config
configMap:
name: celery-exporter-config
---
# Service for Celery Metrics Exporter
apiVersion: v1
kind: Service
metadata:
name: celery-metrics-exporter
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
selector:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
ports:
- port: 8000
targetPort: 8000
name: metrics
---
# ServiceMonitor for OpenTelemetry Collection
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: celery-metrics-exporter
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
spec:
selector:
matchLabels:
app.kubernetes.io/name: celery-metrics-exporter
app.kubernetes.io/component: metrics
endpoints:
- port: metrics
interval: 60s
path: /metrics

View File

@@ -0,0 +1,54 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-flower
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-flower
app.kubernetes.io/component: monitoring
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: celery-flower
app.kubernetes.io/component: monitoring
template:
metadata:
labels:
app.kubernetes.io/name: celery-flower
app.kubernetes.io/component: monitoring
spec:
containers:
- name: flower
image: mher/flower:2.0.1
ports:
- containerPort: 5555
env:
- name: CELERY_BROKER_URL
value: "redis://:<REDIS_PASSWORD>@redis-ha-haproxy.redis-system.svc.cluster.local:6379/0"
- name: FLOWER_PORT
value: "5555"
# FLOWER_BASIC_AUTH removed - authentication handled by NGINX Ingress
# This allows Kubernetes health checks to work properly
- name: FLOWER_BROKER_API
value: "redis://:<REDIS_PASSWORD>@redis-ha-haproxy.redis-system.svc.cluster.local:6379/0,redis://:<REDIS_PASSWORD>@redis-ha-haproxy.redis-system.svc.cluster.local:6379/3"
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
livenessProbe:
httpGet:
path: /
port: 5555
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /
port: 5555
initialDelaySeconds: 10
periodSeconds: 10

View File

@@ -0,0 +1,11 @@
---
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- namespace.yaml
- flower-deployment.yaml
- service.yaml
- network-policies.yaml
- redis-secret.yaml
- celery-metrics-exporter.yaml
# - openobserve-alerts.yaml

View File

@@ -0,0 +1,8 @@
---
apiVersion: v1
kind: Namespace
metadata:
name: celery-monitoring
labels:
app.kubernetes.io/name: celery-monitoring
app.kubernetes.io/component: infrastructure

View File

@@ -0,0 +1,47 @@
---
# Celery Monitoring Network Policies
# Port-forward and health check access to Flower with proper DNS/Redis connectivity
apiVersion: cilium.io/v2
kind: CiliumNetworkPolicy
metadata:
name: celery-flower-ingress
namespace: celery-monitoring
spec:
description: "Allow ingress to Flower from kubectl port-forward and health checks"
endpointSelector:
matchLabels:
app.kubernetes.io/name: celery-flower
app.kubernetes.io/component: monitoring
ingress:
# Allow kubectl port-forward access (from cluster nodes)
- fromEntities:
- cluster
- host
toPorts:
- ports:
- port: "5555"
protocol: TCP
---
apiVersion: cilium.io/v2
kind: CiliumNetworkPolicy
metadata:
name: celery-flower-egress
namespace: celery-monitoring
spec:
description: "Allow Flower to connect to Redis, DNS, and monitoring services"
endpointSelector:
matchLabels:
app.kubernetes.io/name: celery-flower
app.kubernetes.io/component: monitoring
egress:
# Allow all cluster-internal communication (like PieFed approach)
# This is more permissive but still secure within the cluster
- toEntities:
- cluster
- host
# Service access policy removed - using kubectl port-forward for local access
# Port-forward provides secure access without exposing the service externally

View File

@@ -0,0 +1,220 @@
# Keeping for reference
# ---
# # OpenObserve Alert Configuration for Celery Queue Monitoring
# # This file contains the alert configurations that should be imported into OpenObserve
# apiVersion: v1
# kind: ConfigMap
# metadata:
# name: openobserve-alert-configs
# namespace: celery-monitoring
# labels:
# app.kubernetes.io/name: openobserve-alerts
# app.kubernetes.io/component: monitoring
# data:
# celery-queue-alerts.json: |
# {
# "alerts": [
# {
# "name": "PieFed Celery Queue High",
# "description": "PieFed Celery queue has more than 10,000 pending tasks",
# "query": "SELECT avg(celery_queue_length) as avg_queue_length FROM metrics WHERE queue_name='celery' AND database='piefed' AND _timestamp >= now() - interval '5 minutes'",
# "condition": "avg_queue_length > 10000",
# "frequency": "5m",
# "severity": "warning",
# "enabled": true,
# "actions": [
# {
# "type": "webhook",
# "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
# "message": "🚨 PieFed Celery queue is high: {{avg_queue_length}} tasks pending"
# }
# ]
# },
# {
# "name": "PieFed Celery Queue Critical",
# "description": "PieFed Celery queue has more than 50,000 pending tasks",
# "query": "SELECT avg(celery_queue_length) as avg_queue_length FROM metrics WHERE queue_name='celery' AND database='piefed' AND _timestamp >= now() - interval '5 minutes'",
# "condition": "avg_queue_length > 50000",
# "frequency": "2m",
# "severity": "critical",
# "enabled": true,
# "actions": [
# {
# "type": "webhook",
# "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
# "message": "🔥 CRITICAL: PieFed Celery queue is critically high: {{avg_queue_length}} tasks pending. Consider scaling workers!"
# }
# ]
# },
# {
# "name": "BookWyrm Celery Queue High",
# "description": "BookWyrm Celery queue has more than 1,000 pending tasks",
# "query": "SELECT avg(celery_queue_length) as avg_queue_length FROM metrics WHERE queue_name='total' AND database='bookwyrm' AND _timestamp >= now() - interval '5 minutes'",
# "condition": "avg_queue_length > 1000",
# "frequency": "5m",
# "severity": "warning",
# "enabled": true,
# "actions": [
# {
# "type": "webhook",
# "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
# "message": "📚 BookWyrm Celery queue is high: {{avg_queue_length}} tasks pending"
# }
# ]
# },
# {
# "name": "Redis Connection Lost",
# "description": "Redis connection is down for Celery monitoring",
# "query": "SELECT avg(redis_connection_status) as connection_status FROM metrics WHERE _timestamp >= now() - interval '2 minutes'",
# "condition": "connection_status < 1",
# "frequency": "1m",
# "severity": "critical",
# "enabled": true,
# "actions": [
# {
# "type": "webhook",
# "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
# "message": "💥 CRITICAL: Redis connection lost for Celery monitoring!"
# }
# ]
# },
# {
# "name": "Celery Queue Processing Stalled",
# "description": "Celery queue size hasn't decreased in 15 minutes",
# "query": "SELECT celery_queue_length FROM metrics WHERE queue_name='celery' AND database='piefed' AND _timestamp >= now() - interval '15 minutes' ORDER BY _timestamp DESC LIMIT 1",
# "condition": "celery_queue_length > (SELECT celery_queue_length FROM metrics WHERE queue_name='celery' AND database='piefed' AND _timestamp >= now() - interval '20 minutes' AND _timestamp < now() - interval '15 minutes' ORDER BY _timestamp DESC LIMIT 1)",
# "frequency": "10m",
# "severity": "warning",
# "enabled": true,
# "actions": [
# {
# "type": "webhook",
# "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
# "message": "⚠️ Celery queue processing appears stalled. Queue size hasn't decreased in 15 minutes."
# }
# ]
# }
# ]
# }
# dashboard-config.json: |
# {
# "dashboard": {
# "title": "Celery Queue Monitoring",
# "description": "Monitor Celery queue sizes and processing rates for PieFed and BookWyrm",
# "panels": [
# {
# "title": "PieFed Queue Length",
# "type": "line",
# "query": "SELECT _timestamp, celery_queue_length FROM metrics WHERE queue_name='celery' AND database='piefed' AND _timestamp >= now() - interval '24 hours'",
# "x_axis": "_timestamp",
# "y_axis": "celery_queue_length"
# },
# {
# "title": "BookWyrm Total Queue Length",
# "type": "line",
# "query": "SELECT _timestamp, celery_queue_length FROM metrics WHERE queue_name='total' AND database='bookwyrm' AND _timestamp >= now() - interval '24 hours'",
# "x_axis": "_timestamp",
# "y_axis": "celery_queue_length"
# },
# {
# "title": "Queue Processing Rate (PieFed)",
# "type": "line",
# "query": "SELECT _timestamp, celery_queue_length - LAG(celery_queue_length, 1) OVER (ORDER BY _timestamp) as processing_rate FROM metrics WHERE queue_name='celery' AND database='piefed' AND _timestamp >= now() - interval '6 hours'",
# "x_axis": "_timestamp",
# "y_axis": "processing_rate"
# },
# {
# "title": "Redis Connection Status",
# "type": "stat",
# "query": "SELECT redis_connection_status FROM metrics WHERE _timestamp >= now() - interval '5 minutes' ORDER BY _timestamp DESC LIMIT 1"
# },
# {
# "title": "Current Queue Sizes",
# "type": "table",
# "query": "SELECT queue_name, database, celery_queue_length FROM metrics WHERE _timestamp >= now() - interval '5 minutes' GROUP BY queue_name, database ORDER BY celery_queue_length DESC"
# }
# ]
# }
# }
# ---
# # Instructions ConfigMap
# apiVersion: v1
# kind: ConfigMap
# metadata:
# name: openobserve-setup-instructions
# namespace: celery-monitoring
# data:
# README.md: |
# # OpenObserve Celery Queue Monitoring Setup
# ## 1. Import Alerts
# 1. Access your OpenObserve dashboard
# 2. Go to Alerts → Import
# 3. Copy the contents of `celery-queue-alerts.json` from the `openobserve-alert-configs` ConfigMap
# 4. Paste and import the alert configurations
# ## 2. Configure Webhooks
# Update the webhook URLs in the alert configurations:
# - Replace `https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK` with your actual Slack webhook URL
# - Or configure other notification methods (email, Discord, etc.)
# ## 3. Import Dashboard
# 1. Go to Dashboards → Import
# 2. Copy the contents of `dashboard-config.json` from the `openobserve-alert-configs` ConfigMap
# 3. Paste and import the dashboard configuration
# ## 4. Verify Metrics
# Check that metrics are being collected:
# ```sql
# SELECT * FROM metrics WHERE __name__ LIKE 'celery_%' ORDER BY _timestamp DESC LIMIT 10
# ```
# ## 5. Alert Thresholds
# Current alert thresholds:
# - **PieFed Warning**: > 10,000 tasks
# - **PieFed Critical**: > 50,000 tasks
# - **BookWyrm Warning**: > 1,000 tasks
# - **Redis Connection**: Connection lost
# Adjust these thresholds based on your normal queue sizes and processing capacity.
# ## 6. Monitoring Queries
# Useful queries for monitoring:
# ### Current queue sizes:
# ```sql
# SELECT queue_name, database, celery_queue_length
# FROM metrics
# WHERE _timestamp >= now() - interval '5 minutes'
# GROUP BY queue_name, database
# ORDER BY celery_queue_length DESC
# ```
# ### Queue processing rate (tasks/minute):
# ```sql
# SELECT _timestamp,
# celery_queue_length - LAG(celery_queue_length, 1) OVER (ORDER BY _timestamp) as processing_rate
# FROM metrics
# WHERE queue_name='celery' AND database='piefed'
# AND _timestamp >= now() - interval '1 hour'
# ```
# ### Average queue size over time:
# ```sql
# SELECT DATE_TRUNC('hour', _timestamp) as hour,
# AVG(celery_queue_length) as avg_queue_length
# FROM metrics
# WHERE queue_name='celery' AND database='piefed'
# AND _timestamp >= now() - interval '24 hours'
# GROUP BY hour
# ORDER BY hour
# ```

View File

@@ -0,0 +1,42 @@
# Redis credentials for Celery monitoring
apiVersion: v1
kind: Secret
metadata:
name: redis-credentials
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-monitoring
app.kubernetes.io/component: credentials
type: Opaque
stringData:
redis-password: ENC[AES256_GCM,data:F0QBEefly6IeZzyAU32dTLTV17bFl6TVq1gM3kDfHb4=,iv:Uj47EB6a20YBM4FVKEWBTZv0u9kLrzm2U1YWlwprDkI=,tag:T0ge1nLu1ogUyXCJ9G6m0w==,type:str]
sops:
lastmodified: "2025-08-25T14:29:57Z"
mac: ENC[AES256_GCM,data:S64r234afUX/Lk9TuE7OSCtIlgwD43WXQ78gFJEirGasKY8g27mn1UI16GN79qkS4+i0vg947dVpOkU2jruf897KXK8+672P9ycm4OJQ4uhHaDtKMG3YNPowo8RXFfwQ4v86JzwoUtcmDiK+xjGCTwtrtrU1hal/uN2LXcDZfj0=,iv:hPm8IdI/rBSRCxRNMNCEA/URebgFqQ/ecgcVLX5aQDo=,tag:Otbqwm24GkqNmhpy/drtlA==,type:str]
pgp:
- created_at: "2025-08-23T22:34:52Z"
enc: |-
-----BEGIN PGP MESSAGE-----
hF4DZT3mpHTS/JgSAQdAh9TpU95PiIZoVOgnXqbLZH37oLi2u63YBZUDE5QpBlww
5YNOarjb8tQ03/5jQ4b51USd15rGZBI04JM/V2PXSGRFpF2O7X0WyTw9kELUw2TF
1GgBCQIQ4Df+AQ48lRzu3PoLEwG5sF7p83G4LWXkdfZr9vFz7bpdQ/YzOOUg3TEJ
qoUq93Kbvo98dLIz9MS3qkzuh+E3S56wisziExm95vKinnzgztgIkZ7g6jkLevrK
xf/xvJVj5BVXtw==
=vqkj
-----END PGP MESSAGE-----
fp: B120595CA9A643B051731B32E67FF350227BA4E8
- created_at: "2025-08-23T22:34:52Z"
enc: |-
-----BEGIN PGP MESSAGE-----
hF4DSXzd60P2RKISAQdA2Eq3F3t1myCJVgwXufY3Z0K+Q3Tdzeu47/VoQCrY8kkw
mdtyPKmFwgtqFg8E9VRiZXwBRq3qscOki7yiGozFfGdhFmO0ZK9R/dJGOeLSStfy
1GgBCQIQbfMuXVRt14SVoTMZiHIDGcu5ZBq2iea6HmdeJoLqmweGLF/Vsbrx5pFI
hKyBVDwXE3gf1V03ts4QnbZESCrjNRyg1NsTxIsHPIu64DX6EnW13DNPI6TWZW9i
ni6ecXRfY+gpOw==
=RS4p
-----END PGP MESSAGE-----
fp: 4A8AADB4EBAB9AF88EF7062373CECE06CC80D40C
encrypted_regex: ^(data|stringData)$
version: 3.10.2

View File

@@ -0,0 +1,17 @@
---
apiVersion: v1
kind: Service
metadata:
name: celery-flower
namespace: celery-monitoring
labels:
app.kubernetes.io/name: celery-flower
app.kubernetes.io/component: monitoring
spec:
selector:
app.kubernetes.io/name: celery-flower
app.kubernetes.io/component: monitoring
ports:
- port: 5555
targetPort: 5555
name: http