Skip to content

Commit abcd712

Browse files
committed
feat(cluster): implement round-robin scheduling and worker management in clustered server mode
1 parent 5d3d4fb commit abcd712

File tree

8 files changed

+157
-30
lines changed

8 files changed

+157
-30
lines changed

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ RUN npm ci && npm run build && npm prune --omit=dev
1010

1111
FROM node:20-bookworm-slim AS runtime
1212
ENV NODE_ENV=production
13+
ENV WORKERS=0
1314
WORKDIR /app
1415

1516
COPY --from=build /app/package*.json ./
@@ -18,4 +19,4 @@ COPY --from=build /app/dist ./dist
1819
COPY --from=build /app/public ./public
1920

2021
EXPOSE 3000
21-
CMD ["node", "dist/index.js"]
22+
CMD ["node", "dist/server-cluster.js"]

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ For more details: [User Badges](docs/example/badge-user.md) · [Badge Collection
320320
- **API**: GitHub REST + GraphQL APIs with intelligent batching
321321
- **Caching**: Multi-tier (Memory → Redis → Source) with 2-hour default TTL
322322
- **Database**: SQLite with Drizzle ORM for badge counters and visitor logs
323-
- **Server**: Express.js with optional cluster mode for multi-core scaling
323+
- **Server**: Express.js with round-robin cluster load balancing in production
324324
- **Rendering**: Server-side SVG generation with optional WebP/PNG/GIF export
325325

326326
## Notes
@@ -329,6 +329,7 @@ For more details: [User Badges](docs/example/badge-user.md) · [Badge Collection
329329
- Without a GitHub token, API rate limits are very low (~60 requests/hour)
330330
- Set `GITHUB_TOKEN` to get 5,000 requests/hour
331331
- Redis is optional but recommended for production (enables distributed caching)
332+
- Docker and `npm start` now boot the clustered entrypoint; set `WORKERS` to cap worker count, or leave it at `0` to use all available CPU cores
332333
- User visitor badges (`/badges?username=...&name=visitors`) use IP hashing for privacy-preserving unique visitor counting
333334
- Project visitor badges (`/project/visitors`) increment once per same IP every 5 minutes
334335

compose.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ services:
66
container_name: ${CLOUDFLARED_TUNNEL_NAME}-app
77
restart: unless-stopped
88
ports:
9-
- "3000:${PORT:-3000}"
9+
- "3102:${PORT:-3000}"
1010
environment:
1111
NODE_ENV: production
1212
APP_ENV: production
1313
HOST: 0.0.0.0
1414
PORT: ${PORT:-3000}
15+
WORKERS: ${WORKERS:-0}
1516
DATABASE_PROVIDER: ${DATABASE_PROVIDER:-sqlite}
1617
DATABASE_URL: ${DATABASE_URL:-/app/data/stats.db}
1718
CLOUDFLARE_ACCOUNT_ID: ${CLOUDFLARE_ACCOUNT_ID:-}
@@ -36,6 +37,7 @@ services:
3637
timeout: 10s
3738
retries: 5
3839
start_period: 20s
40+
stop_grace_period: 20s
3941

4042
redis:
4143
image: redis:7-alpine

docs/structures/03_PRACTICES_AND_WORKFLOW.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
3. Database: run npm run db:migrate
2626
4. Testing: run npm test
2727
5. Build: run npm run build
28-
6. Deploy: run npm run start:cluster
28+
6. Deploy: run npm start (clustered by default) or set WORKERS to pin the worker count
2929

3030
## Related Documentation
3131

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"vercel-build": "echo 'Vercel compiles TypeScript at runtime — no pre-build needed'",
1010
"dev": "node --watch --no-warnings=ExperimentalWarning --loader ts-node/esm ./src/index.ts",
1111
"dev:modular": "node --watch --no-warnings=ExperimentalWarning --loader ts-node/esm ./src/server.ts",
12-
"start": "node dist/index.js",
12+
"start": "node dist/server-cluster.js",
13+
"start:single": "node dist/index.js",
1314
"start:cluster": "node dist/server-cluster.js",
1415
"start:production": "NODE_ENV=production node dist/server-cluster.js",
1516
"test": "jest",

src/cluster.ts

Lines changed: 93 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,72 +22,110 @@ export async function startCluster(
2222
workerFile: string,
2323
options: ClusterOptions = {}
2424
) {
25+
const availableWorkers = typeof os.availableParallelism === 'function'
26+
? os.availableParallelism()
27+
: os.cpus().length;
2528
const {
26-
workers = os.cpus().length,
29+
workers = availableWorkers,
2730
respawnDelay = 1000,
2831
maxRestarts = 5
2932
} = options;
33+
const workerCount = Math.max(1, Math.min(workers, availableWorkers));
3034

3135
if (cluster.isPrimary) {
36+
cluster.schedulingPolicy = cluster.SCHED_RR;
37+
3238
logger.info('Starting cluster mode', {
33-
workers,
34-
cpus: os.cpus().length,
39+
workers: workerCount,
40+
cpus: availableWorkers,
3541
platform: os.platform(),
36-
memory: `${Math.round(os.totalmem() / 1024 / 1024 / 1024)}GB`
42+
memory: `${Math.round(os.totalmem() / 1024 / 1024 / 1024)}GB`,
43+
schedulingPolicy: 'round-robin'
3744
});
3845

3946
const workerRestarts = new Map<number, number>();
47+
const workerSlots = new Map<number, number>();
48+
let isShuttingDown = false;
49+
let healthCheckInterval: NodeJS.Timeout | undefined;
4050

4151
// Spawn workers
42-
for (let i = 0; i < workers; i++) {
43-
spawnWorker(i + 1);
52+
for (let slot = 1; slot <= workerCount; slot++) {
53+
spawnWorker(slot, workerSlots);
4454
}
4555

4656
// Handle worker exit
4757
cluster.on('exit', (worker, code, signal) => {
4858
const workerId = worker.id;
49-
const restarts = workerRestarts.get(workerId) || 0;
59+
const workerSlot = workerSlots.get(workerId) || workerId;
60+
const restarts = workerRestarts.get(workerSlot) || 0;
61+
62+
workerSlots.delete(workerId);
5063

5164
logger.warn('Worker died', {
5265
workerId,
66+
workerSlot,
5367
pid: worker.process.pid,
5468
code,
5569
signal,
5670
restarts
5771
});
5872

73+
if (isShuttingDown) {
74+
return;
75+
}
76+
5977
// Check if we should respawn
6078
if (restarts < maxRestarts) {
61-
workerRestarts.set(workerId, restarts + 1);
79+
workerRestarts.set(workerSlot, restarts + 1);
6280

6381
setTimeout(() => {
64-
logger.info('Respawning worker', { workerId, attempt: restarts + 1 });
65-
spawnWorker(workerId);
82+
if (isShuttingDown) {
83+
return;
84+
}
85+
86+
logger.info('Respawning worker', { workerId, workerSlot, attempt: restarts + 1 });
87+
spawnWorker(workerSlot, workerSlots);
6688
}, respawnDelay);
6789
} else {
68-
logger.error('Worker exceeded max restarts', undefined, { workerId, maxRestarts });
90+
logger.error('Worker exceeded max restarts', undefined, {
91+
workerId,
92+
workerSlot,
93+
maxRestarts,
94+
});
6995
}
7096
});
7197

7298
// Handle worker online
7399
cluster.on('online', (worker) => {
100+
const workerSlot = workerSlots.get(worker.id) || worker.id;
74101
logger.info('Worker online', {
75102
workerId: worker.id,
103+
workerSlot,
76104
pid: worker.process.pid
77105
});
78106
});
79107

80108
// Handle worker listening
81109
cluster.on('listening', (worker, address) => {
110+
const workerSlot = workerSlots.get(worker.id) || worker.id;
82111
logger.info('Worker listening', {
83112
workerId: worker.id,
113+
workerSlot,
84114
pid: worker.process.pid,
85115
address: `${address.address}:${address.port}`
86116
});
87117
});
88118

89119
// Graceful shutdown
90120
const shutdown = async () => {
121+
if (isShuttingDown) {
122+
return;
123+
}
124+
125+
isShuttingDown = true;
126+
if (healthCheckInterval) {
127+
clearInterval(healthCheckInterval);
128+
}
91129
logger.info('Shutting down cluster...');
92130

93131
const workers = Object.values(cluster.workers || {});
@@ -124,7 +162,7 @@ export async function startCluster(
124162
process.on('SIGINT', shutdown);
125163

126164
// Performance monitoring
127-
setInterval(() => {
165+
healthCheckInterval = setInterval(() => {
128166
const workers = Object.values(cluster.workers || {});
129167
const activeWorkers = workers.filter(w => w && !w.isDead()).length;
130168

@@ -136,22 +174,53 @@ export async function startCluster(
136174
});
137175
}, 60000); // Every minute
138176

177+
healthCheckInterval.unref();
178+
139179
} else {
140180
// Worker process - import and run the application
141181
try {
142-
await import(workerFile);
182+
const workerModule = await import(workerFile) as {
183+
startServer?: () => Promise<unknown>;
184+
stopServer?: () => Promise<void>;
185+
};
186+
let isWorkerShuttingDown = false;
187+
188+
if (typeof workerModule.startServer === 'function') {
189+
await workerModule.startServer();
190+
}
191+
192+
const shutdownWorker = async () => {
193+
if (isWorkerShuttingDown) {
194+
return;
195+
}
196+
197+
isWorkerShuttingDown = true;
198+
logger.info('Worker received shutdown signal', {
199+
workerId: cluster.worker?.id,
200+
workerSlot: process.env.WORKER_SLOT,
201+
});
202+
203+
try {
204+
await workerModule.stopServer?.();
205+
} catch (error) {
206+
logger.error('Worker failed to shut down cleanly', error as Error, {
207+
workerId: cluster.worker?.id,
208+
workerSlot: process.env.WORKER_SLOT,
209+
});
210+
} finally {
211+
process.exit(0);
212+
}
213+
};
143214

144215
// Handle shutdown signal from master
145216
process.on('message', (msg) => {
146217
if (msg === 'shutdown') {
147-
logger.info('Worker received shutdown signal', {
148-
workerId: cluster.worker?.id
149-
});
150-
151-
// Gracefully close connections
152-
process.exit(0);
218+
void shutdownWorker();
153219
}
154220
});
221+
222+
process.once('SIGTERM', () => void shutdownWorker());
223+
process.once('SIGINT', () => void shutdownWorker());
155224

156225
} catch (error) {
157226
logger.error('Worker failed to start', error as Error, {
@@ -165,9 +234,11 @@ export async function startCluster(
165234
/**
166235
* Spawn a new worker
167236
*/
168-
function spawnWorker(workerId: number) {
169-
const worker = cluster.fork();
170-
worker.id = workerId;
237+
function spawnWorker(workerSlot: number, workerSlots: Map<number, number>) {
238+
const worker = cluster.fork({
239+
WORKER_SLOT: String(workerSlot),
240+
});
241+
workerSlots.set(worker.id, workerSlot);
171242
return worker;
172243
}
173244

src/server-cluster.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ import path from 'path';
1414
const __filename = fileURLToPath(import.meta.url);
1515
const __dirname = path.dirname(__filename);
1616

17-
const workerFile = pathToFileURL(path.join(__dirname, 'index.js')).href;
17+
const workerFile = pathToFileURL(path.join(__dirname, 'server.js')).href;
1818
const workers = parseInt(process.env.WORKERS || '0') || undefined;
1919

2020
startCluster(workerFile, {
2121
workers,
2222
respawnDelay: 1000,
23-
maxRestarts: 1
23+
maxRestarts: 5
2424
});

src/server.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@
33
* Handles server initialization with modular structure
44
*/
55

6+
import { type Server as HttpServer } from 'http';
67
import { type Express } from 'express';
78
import { createApp, initializeRoutes, setupErrorHandlers } from './app.js';
89
import { getEnv } from './shared/config/env.js';
910
import { createLogger } from './shared/logs/logger.js';
1011
import { initializeDatabaseAsync } from './shared/config/db.js';
1112
import { GitHubClient } from './shared/utils/github-client.js';
12-
import { getRedisClient } from './shared/utils/redis-client.js';
13+
import { closeRedisClient, getRedisClient } from './shared/utils/redis-client.js';
1314
import type { ICacheService } from './services/base.service.js';
1415

1516
const logger = createLogger({ module: 'server' });
17+
let activeApp: Express | null = null;
18+
let activeServer: HttpServer | null = null;
19+
let shutdownPromise: Promise<void> | null = null;
1620

1721
// Shared cache for API responses
1822
const cache = new Map<string, { data: string; timestamp: number }>();
@@ -95,6 +99,10 @@ async function initializeServices(): Promise<{ cacheService?: ICacheService }> {
9599
* Start the server
96100
*/
97101
export async function startServer(): Promise<Express> {
102+
if (activeApp && activeServer?.listening) {
103+
return activeApp;
104+
}
105+
98106
const env = getEnv();
99107

100108
// Initialize services
@@ -124,6 +132,9 @@ export async function startServer(): Promise<Express> {
124132
});
125133
});
126134

135+
activeApp = app;
136+
activeServer = server;
137+
127138
server.on('error', (error: NodeJS.ErrnoException) => {
128139
logger.error('HTTP server failed to listen', error, {
129140
port,
@@ -136,6 +147,46 @@ export async function startServer(): Promise<Express> {
136147
return app;
137148
}
138149

150+
export async function stopServer(): Promise<void> {
151+
if (shutdownPromise) {
152+
return shutdownPromise;
153+
}
154+
155+
shutdownPromise = (async () => {
156+
if (activeServer) {
157+
await new Promise<void>((resolve, reject) => {
158+
activeServer?.close((error) => {
159+
if (error) {
160+
reject(error);
161+
return;
162+
}
163+
164+
resolve();
165+
});
166+
});
167+
168+
logger.info('HTTP server stopped');
169+
}
170+
171+
try {
172+
await closeRedisClient();
173+
} catch (error) {
174+
logger.warn('Failed to close Redis client cleanly', {
175+
error: error instanceof Error ? error.message : String(error),
176+
});
177+
}
178+
179+
activeServer = null;
180+
activeApp = null;
181+
})();
182+
183+
try {
184+
await shutdownPromise;
185+
} finally {
186+
shutdownPromise = null;
187+
}
188+
}
189+
139190
// Start server if this file is run directly
140191
if (import.meta.url === `file://${process.argv[1]}`) {
141192
startServer().catch((error) => {

0 commit comments

Comments
 (0)