Added redis and worker services

This commit is contained in:
blaisadmin
2026-05-02 00:14:56 -04:00
parent 5a3ca9021a
commit 673df353b9
15 changed files with 833 additions and 15 deletions
+85 -13
View File
@@ -1,6 +1,7 @@
import crypto from 'crypto';
import { existsSync } from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import cors from 'cors';
import dotenv from 'dotenv';
import express, { type NextFunction, type Request, type Response } from 'express';
@@ -12,6 +13,7 @@ import Stripe from 'stripe';
import { z } from 'zod';
import { ensureSchema } from './db/schema.js';
import { enqueueBirdMilestoneReminderJob, getBirdMilestoneReminderQueueCounts } from './queues/birdMilestoneReminderQueue.js';
import {
consumeMagicLinkToken,
consumeOAuthState,
@@ -337,7 +339,8 @@ const smtpPass = process.env.SMTP_PASS?.trim() ?? '';
const smtpFromEmail = process.env.SMTP_FROM_EMAIL?.trim() ?? '';
const smtpFromName = process.env.SMTP_FROM_NAME?.trim() || 'FlockPal';
const rescueStatusNotificationEmail = process.env.RESCUE_STATUS_NOTIFICATION_EMAIL?.trim() || 'appadmin@flockpal.app';
const rescueOnboardingWebhookUrl = 'https://n8n.blaishome.online/webhook/395cd538-5e0d-4e89-8070-9e66f571b7ee';
const rescueOnboardingWebhookUrl =
process.env.RESCUE_ONBOARDING_WEBHOOK_URL?.trim() || 'https://n8n.blaishome.online/webhook/395cd538-5e0d-4e89-8070-9e66f571b7ee';
const stripeSecretKey = process.env.STRIPE_SECRET_KEY?.trim() ?? '';
const stripeWebhookSecret = process.env.STRIPE_WEBHOOK_SECRET?.trim() ?? '';
const withBillingRedirectState = (url: string, billingState: 'success' | 'cancelled' | 'portal') => {
@@ -690,6 +693,40 @@ app.use(express.json({ limit: '2mb' }));
app.use(express.urlencoded({ extended: false }));
app.use(morgan(process.env.NODE_ENV === 'production' ? 'combined' : 'dev'));
const requestMetrics = {
startedAt: new Date().toISOString(),
totalRequests: 0,
totalErrors: 0,
inFlightRequests: 0,
totalDurationMs: 0,
byStatus: {} as Record<string, number>,
byRoute: {} as Record<string, number>,
};
app.use((req: Request, res: Response, next: NextFunction) => {
const startedAt = process.hrtime.bigint();
requestMetrics.totalRequests += 1;
requestMetrics.inFlightRequests += 1;
res.on('finish', () => {
const durationMs = Number(process.hrtime.bigint() - startedAt) / 1_000_000;
requestMetrics.inFlightRequests -= 1;
requestMetrics.totalDurationMs += durationMs;
const statusBucket = `${Math.floor(res.statusCode / 100)}xx`;
requestMetrics.byStatus[statusBucket] = (requestMetrics.byStatus[statusBucket] ?? 0) + 1;
if (res.statusCode >= 500) {
requestMetrics.totalErrors += 1;
}
const routeKey = `${req.method} ${req.route?.path ?? req.path}`;
requestMetrics.byRoute[routeKey] = (requestMetrics.byRoute[routeKey] ?? 0) + 1;
});
next();
});
const normalizeWorkspaceMembershipList = async (userId: string) =>
(await listMembershipsForUser(userId)).map((row) => ({
membership: normalizeWorkspaceMember(row),
@@ -1412,7 +1449,7 @@ const sendBirdMilestoneReminderNotification = async ({
return { delivered: true };
};
const runBirdMilestoneReminders = async (runDate = getDateInTimeZone()) => {
export const runBirdMilestoneReminders = async (runDate = getDateInTimeZone()) => {
const reminders = await listDueBirdMilestoneReminders(runDate);
let sent = 0;
let skipped = 0;
@@ -1458,7 +1495,7 @@ const runBirdMilestoneReminders = async (runDate = getDateInTimeZone()) => {
let lastMilestoneReminderRunDate = '';
const startBirdMilestoneReminderScheduler = () => {
export const startBirdMilestoneReminderScheduler = () => {
if (!milestoneRemindersEnabled) {
console.log('Bird milestone reminders are disabled.');
return;
@@ -1471,10 +1508,8 @@ const startBirdMilestoneReminderScheduler = () => {
}
lastMilestoneReminderRunDate = runDate;
const result = await runBirdMilestoneReminders(runDate);
console.log(
`Bird milestone reminders completed for ${result.runDate}: checked=${result.checked}, sent=${result.sent}, skipped=${result.skipped}, failed=${result.failed}`,
);
const job = await enqueueBirdMilestoneReminderJob(runDate);
console.log(`Bird milestone reminder job queued for ${runDate}: id=${job.id ?? 'unknown'}`);
};
setTimeout(() => {
@@ -1614,6 +1649,40 @@ app.get('/api/health', (_req: Request, res: Response) => {
res.json({ ok: true });
});
app.get('/api/metrics', requireAuth, requireAdmin, async (_req: Request, res: Response, next: NextFunction) => {
const memoryUsage = process.memoryUsage();
const averageDurationMs = requestMetrics.totalRequests > 0 ? requestMetrics.totalDurationMs / requestMetrics.totalRequests : 0;
try {
const birdMilestoneReminderQueueCounts = await getBirdMilestoneReminderQueueCounts();
res.json({
startedAt: requestMetrics.startedAt,
uptimeSeconds: Math.round(process.uptime()),
requests: {
total: requestMetrics.totalRequests,
inFlight: requestMetrics.inFlightRequests,
errors: requestMetrics.totalErrors,
averageDurationMs: Number(averageDurationMs.toFixed(2)),
byStatus: requestMetrics.byStatus,
byRoute: requestMetrics.byRoute,
},
memory: {
rss: memoryUsage.rss,
heapTotal: memoryUsage.heapTotal,
heapUsed: memoryUsage.heapUsed,
external: memoryUsage.external,
arrayBuffers: memoryUsage.arrayBuffers,
},
queues: {
birdMilestoneReminders: birdMilestoneReminderQueueCounts,
},
});
} catch (error) {
next(error);
}
});
app.post('/api/lost-bird/report', lostBirdReportLimiter, async (req: Request, res: Response) => {
const parsed = lostBirdReportSchema.safeParse(req.body);
@@ -3073,15 +3142,18 @@ app.use((error: unknown, _req: Request, res: Response, _next: NextFunction) => {
res.status(500).json({ error: error instanceof Error ? error.message : 'Internal server error' });
});
const start = async () => {
export const startApiServer = async () => {
await ensureSchema();
app.listen(port, () => {
console.log(`FlockPal backend listening on port ${port}`);
});
startBirdMilestoneReminderScheduler();
};
start().catch((error) => {
console.error('Failed to start backend', error);
process.exit(1);
});
const currentModulePath = fileURLToPath(import.meta.url);
if (process.argv[1] === currentModulePath) {
startApiServer().catch((error) => {
console.error('Failed to start backend', error);
process.exit(1);
});
}
+33
View File
@@ -47,6 +47,9 @@ export const ensureSchema = async (database: DatabaseClient = db) => {
ON workspaces (stripe_customer_id)
WHERE stripe_customer_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_workspaces_rescue_status
ON workspaces (workspace_type, rescue_verification_status, created_at DESC);
UPDATE workspaces
SET subscription_status = 'none'
WHERE workspace_type = 'standard'
@@ -119,6 +122,15 @@ export const ensureSchema = async (database: DatabaseClient = db) => {
ON workspace_members (workspace_id, user_id)
WHERE user_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_workspace_members_user_accepted
ON workspace_members (user_id, accepted_at, workspace_id)
WHERE user_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_workspace_members_owner_email
ON workspace_members (LOWER(COALESCE(invite_email, email)), workspace_id)
WHERE role = 'owner'
AND accepted_at IS NOT NULL;
CREATE TABLE IF NOT EXISTS auth_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
@@ -128,6 +140,9 @@ export const ensureSchema = async (database: DatabaseClient = db) => {
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_auth_sessions_created_user
ON auth_sessions (created_at DESC, user_id);
CREATE TABLE IF NOT EXISTS integration_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
@@ -257,6 +272,21 @@ export const ensureSchema = async (database: DatabaseClient = db) => {
AND BTRIM(tag_id) <> ''
AND LOWER(BTRIM(tag_id)) NOT IN ('unknown', 'not recorded', 'n/a', 'na', 'none');
CREATE INDEX IF NOT EXISTS idx_birds_workspace_active_name
ON birds (workspace_id, name)
WHERE memorialized_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_birds_workspace_memorialized
ON birds (workspace_id, memorialized_on DESC, name)
WHERE memorialized_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_birds_tag_lookup_active
ON birds (LOWER(tag_id), created_at)
WHERE tag_id IS NOT NULL
AND BTRIM(tag_id) <> ''
AND LOWER(BTRIM(tag_id)) NOT IN ('unknown', 'not recorded', 'n/a', 'na', 'none')
AND memorialized_at IS NULL;
CREATE TABLE IF NOT EXISTS pending_bird_transfers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
bird_id UUID NOT NULL REFERENCES birds(id) ON DELETE CASCADE,
@@ -374,6 +404,9 @@ export const ensureSchema = async (database: DatabaseClient = db) => {
CREATE INDEX IF NOT EXISTS idx_medication_administrations_bird_administered_on
ON medication_administrations (bird_id, administered_on DESC);
CREATE INDEX IF NOT EXISTS idx_medication_administrations_medication_date
ON medication_administrations (medication_id, administered_on DESC, created_at DESC);
DO $$
BEGIN
IF EXISTS (
@@ -0,0 +1,53 @@
import { Queue, type Job } from 'bullmq';
import { redisConnection } from './redisConnection.js';
export type BirdMilestoneReminderJobData = {
runDate: string;
requestedBy: 'scheduler';
};
export type BirdMilestoneReminderJobResult = {
runDate: string;
checked: number;
sent: number;
skipped: number;
failed: number;
};
export const birdMilestoneReminderQueueName = 'bird-milestone-reminders';
export const birdMilestoneReminderQueue = new Queue<BirdMilestoneReminderJobData, BirdMilestoneReminderJobResult>(
birdMilestoneReminderQueueName,
{
connection: redisConnection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 60_000,
},
removeOnComplete: 100,
removeOnFail: 1_000,
},
},
);
export const enqueueBirdMilestoneReminderJob = (runDate: string): Promise<Job<BirdMilestoneReminderJobData, BirdMilestoneReminderJobResult>> =>
birdMilestoneReminderQueue.add(
'run-daily-reminders',
{
runDate,
requestedBy: 'scheduler',
},
{
jobId: `bird-milestone-reminders:${runDate}`,
},
);
export const closeBirdMilestoneReminderQueue = async () => {
await birdMilestoneReminderQueue.close();
};
export const getBirdMilestoneReminderQueueCounts = () =>
birdMilestoneReminderQueue.getJobCounts('waiting', 'active', 'delayed', 'completed', 'failed');
+19
View File
@@ -0,0 +1,19 @@
import type { RedisOptions } from 'bullmq';
const redisUrl = process.env.REDIS_URL?.trim() || 'redis://localhost:6379';
const parseRedisConnection = (): RedisOptions => {
const url = new URL(redisUrl);
const db = url.pathname && url.pathname !== '/' ? Number(url.pathname.slice(1)) : undefined;
return {
host: url.hostname,
port: url.port ? Number(url.port) : 6379,
username: url.username ? decodeURIComponent(url.username) : undefined,
password: url.password ? decodeURIComponent(url.password) : undefined,
db: Number.isFinite(db) ? db : undefined,
maxRetriesPerRequest: null,
};
};
export const redisConnection = parseRedisConnection();
+61
View File
@@ -0,0 +1,61 @@
import { Worker } from 'bullmq';
import { ensureSchema } from './db/schema.js';
import { db } from './db/client.js';
import { runBirdMilestoneReminders, startBirdMilestoneReminderScheduler } from './app.js';
import {
birdMilestoneReminderQueueName,
closeBirdMilestoneReminderQueue,
type BirdMilestoneReminderJobData,
type BirdMilestoneReminderJobResult,
} from './queues/birdMilestoneReminderQueue.js';
import { redisConnection } from './queues/redisConnection.js';
let birdMilestoneWorker: Worker<BirdMilestoneReminderJobData, BirdMilestoneReminderJobResult> | null = null;
const startWorker = async () => {
await ensureSchema();
birdMilestoneWorker = new Worker<BirdMilestoneReminderJobData, BirdMilestoneReminderJobResult>(
birdMilestoneReminderQueueName,
async (job) => {
const result = await runBirdMilestoneReminders(job.data.runDate);
console.log(
`Bird milestone reminder job completed for ${result.runDate}: checked=${result.checked}, sent=${result.sent}, skipped=${result.skipped}, failed=${result.failed}`,
);
return result;
},
{
connection: redisConnection,
concurrency: 1,
},
);
birdMilestoneWorker.on('failed', (job, error) => {
console.error(`Bird milestone reminder job failed: id=${job?.id ?? 'unknown'}`, error);
});
startBirdMilestoneReminderScheduler();
console.log('FlockPal worker started.');
};
const shutdown = async (signal: string) => {
console.log(`FlockPal worker received ${signal}; shutting down.`);
await birdMilestoneWorker?.close();
await closeBirdMilestoneReminderQueue();
await db.close();
process.exit(0);
};
process.on('SIGINT', () => {
void shutdown('SIGINT');
});
process.on('SIGTERM', () => {
void shutdown('SIGTERM');
});
startWorker().catch((error) => {
console.error('Failed to start FlockPal worker', error);
process.exit(1);
});