Commit 3b59a756 authored by Holger Knust's avatar Holger Knust Committed by Ppchelko
Browse files

SIG* and event handler changes (#210)

Added removal of event handlers to avoid EventEmitter memory leak warnings and bumped version number to 2.7.0
parent 9b3cfc5c
......@@ -11,6 +11,7 @@ const Logger = require('./logger');
const docker = require('./docker');
const NUM_CPU_REGEX = /^(?:(?:ncpu[\s)*+-/])|[\s()*+-/.\d])+(?:ncpu)?$/;
/**
* Abstract base class for Master and Worker classes.
*
......@@ -32,13 +33,38 @@ class BaseService {
this._logger = null;
this._metrics = null;
this._ratelimiter = null;
this.serviceReturns = null;
this._serviceReturns = null;
// Avoid recursion if there are bugs in the logging code.
this._inLogger = false;
this._uncaughtExceptionHandler = (err) => {
this._unhandledRejectionHandler(err);
// Don't exit right away, allow logger to process message
setTimeout(() => {
process.exit(1);
}, 100);
};
this._unhandledRejectionHandler = (err) => {
if (!this._inLogger) {
this._inLogger = true;
if (!err.stack) {
Error.captureStackTrace(err);
}
this._logger.log('fatal/service-runner/unhandled', err);
this._inLogger = false;
}
};
}
stop() {
if (this._ratelimiter) {
this._ratelimiter.stop();
}
// Remove handlers
process.removeListener('unhandledRejection', this._unhandledRejectionHandler);
process.removeListener('uncaughtException', this._uncaughtExceptionHandler);
}
_setAppBasePath(config) {
......@@ -92,6 +118,12 @@ class BaseService {
this._logger = new Logger(config.logging);
this._setUpDNSCaching();
// Catch unhandled rejections & log them. This relies on bluebird.
process.on('unhandledRejection', this._unhandledRejectionHandler);
// Similarly, log uncaught exceptions. Also, exit.
process.on('uncaughtException', this._uncaughtExceptionHandler);
return this._start();
});
}
......
......@@ -21,28 +21,31 @@ class HeapWatch {
this.timeoutHandle = undefined;
this.gcReportInterval = undefined;
this.cumulativeGCTimes = Object.assign({}, ZERO_CUMULATIVE_GC_INTERVAL);
this.reportStatsHandler = (stats) => {
// Report GC timings to statsd (in nanoseconds).
const type = this._gcTypeName(stats.gctype);
if (type !== 'unknown') {
this.cumulativeGCTimes[this._gcTypeName(stats.gctype)] += stats.pause;
}
};
this._gcStats = null;
}
_gcTypeName(typeID) {
switch (typeID) {
case 1: return 'minor';
case 2: return 'major';
case 4: return 'incremental';
case 8: return 'weak';
case 15: return 'all';
default: return 'unknown';
}
}
setGCMonitor() {
try {
const gcStats = require('gc-stats')();
const gcTypeName = (typeID) => {
switch (typeID) {
case 1: return 'minor';
case 2: return 'major';
case 4: return 'incremental';
case 8: return 'weak';
case 15: return 'all';
default: return 'unknown';
}
};
gcStats.on('stats', (stats) => {
// Report GC timings to statsd (in nanoseconds).
const type = gcTypeName(stats.gctype);
if (type !== 'unknown') {
this.cumulativeGCTimes[gcTypeName(stats.gctype)] += stats.pause;
}
});
this._gcStats = require('gc-stats')();
this._gcStats.on('stats', this.reportStatsHandler);
this.gcReportInterval = setInterval(() => {
const timings = {};
Object.keys(this.cumulativeGCTimes).forEach((gcType) => {
......@@ -107,6 +110,9 @@ class HeapWatch {
}
close() {
if (this._gcStats) {
this._gcStats.removeListener('stats', this.reportStatsHandler);
}
if (this.timeoutHandle) {
clearTimeout(this.timeoutHandle);
this.timeoutHandle = undefined;
......
......@@ -97,7 +97,6 @@ class Logger {
this._sampled_levels = conf.sampled_levels || {};
delete conf.sampled_levels;
this._logger = bunyan.createLogger(conf);
this._addErrorHandler();
this._levelMatcher = this._levelToMatcher(conf.level);
// For each specially logged component we need to create
// a child logger that accepts everything regardless of the level
......@@ -112,8 +111,27 @@ class Logger {
this._traceLogger = this._logger.child({
level: bunyan.TRACE
});
// Set up handlers for uncaught extensions
this._setupRootHandlers();
this._errorHandler = (err, failedStream) => {
// If some fatal error occurred in one of the logger streams,
// we can't do much, so ignore.
if (failedStream.type === 'file') {
// However, if it's a `file` stream, it's likely that
// we're in some catastrophic state. Remove the stream,
// log the error and hope other streams would deliver it.
// Attempting to continue logging through a failed `file`
// stream might end up in a memory leak.
this._logger.streams = this._logger.streams.filter((s) => s !== failedStream);
failedStream.stream.destroy();
// Hope that we have other streams to report the problem
this.log('fatal/service-runner/logger', {
message: 'Failed to write logs to file',
error: err
});
}
};
this._logger.on('error', this._errorHandler );
} else {
this._sampled_levels = confOrLogger._sampled_levels;
this._logger = confOrLogger._logger;
......@@ -121,29 +139,8 @@ class Logger {
this._componentLoggers = confOrLogger._componentLoggers;
this._traceLogger = confOrLogger._traceLogger;
}
this.args = args;
}
_addErrorHandler() {
this._logger.on('error', (err, failedStream) => {
// If some fatal error occurred in one of the logger streams,
// we can't do much, so ignore.
if (failedStream.type === 'file') {
// However, if it's a `file` stream, it's likely that
// we're in some catastrophic state. Remove the stream,
// log the error and hope other streams would deliver it.
// Attempting to continue logging through a failed `file`
// stream might end up in a memory leak.
this._logger.streams = this._logger.streams.filter((s) => s !== failedStream);
failedStream.stream.destroy();
// Hope that we have other streams to report the problem
this.log('fatal/service-runner/logger', {
message: 'Failed to write logs to file',
error: err
});
}
});
this.args = args;
}
_processConf(conf) {
......@@ -199,34 +196,6 @@ class Logger {
return idx !== -1 ? idx : DEF_LEVEL_INDEX;
}
_setupRootHandlers() {
// Avoid recursion if there are bugs in the logging code.
let inLogger = false;
const logUnhandledException = (err) => {
if (!inLogger) {
inLogger = true;
if (!err.stack) {
Error.captureStackTrace(err);
}
this.log('fatal/service-runner/unhandled', err);
inLogger = false;
}
};
// Catch unhandled rejections & log them. This relies on bluebird.
process.on('unhandledRejection', logUnhandledException);
// Similarly, log uncaught exceptions. Also, exit.
process.on('uncaughtException', (err) => {
logUnhandledException(err);
// Don't exit right away, allow logger to process message
setTimeout(() => {
process.exit(1);
}, 100);
});
}
_levelToMatcher(level) {
const pos = LEVELS.indexOf(level);
if (pos !== -1) {
......@@ -376,6 +345,10 @@ class Logger {
}
close() {
if (this._logger && this._errorHandler && this._logger.removeListener) {
this._logger.removeListener('error', this._errorHandler);
}
this._logger.streams.filter((stream) => stream.type === 'file').forEach((stream) => {
stream.stream.end();
});
......
......@@ -35,7 +35,26 @@ class Master extends BaseService {
this._inRollingRestart = false;
this._firstWorkerStarted = false;
this._firstWorkerStartupAttempts = 0;
this.workerStatusMap = {};
this._workerStatusMap = {};
this._shutdownMasterHandler = () => {
this.stop()
.then(() => {
this._logger.log('info/service-runner/master', 'Exiting master');
return this._exitProcess(0);
});
};
this._rollingRestartHandler = () => {
this._firstWorkerStarted = false;
this._firstWorkerStartupAttempts = 0;
return this._updateConfig()
.then(() => {
// Recreate loggers
this._logger.close();
this._logger = new Logger(this.config.logging);
})
.then(this._rollingRestart.bind(this));
};
}
_getConfigUpdateAction(conf) {
......@@ -47,6 +66,12 @@ class Master extends BaseService {
// No workers needed, run worker code directly.
return Worker.prototype.stop.call(this);
}
// Remove signal handlers
process.removeListener('SIGINT', this._shutdownMasterHandler);
process.removeListener('SIGTERM', this._shutdownMasterHandler);
process.removeListener('SIGHUP', this._rollingRestartHandler);
super.stop();
if (this.interval) {
clearInterval(this.interval);
......@@ -70,29 +95,11 @@ class Master extends BaseService {
this._logger.log('info/service-runner',
`master(${process.pid}) initializing ${this.config.num_workers} workers`);
const shutdownMaster = () => {
this.stop()
.then(() => {
this._logger.log('info/service-runner/master', 'Exiting master');
return this._exitProcess(0);
});
};
process.on('SIGINT', shutdownMaster);
process.on('SIGTERM', shutdownMaster);
process.on('SIGINT', this._shutdownMasterHandler);
process.on('SIGTERM', this._shutdownMasterHandler);
// Set up rolling restarts
process.on('SIGHUP', () => {
this._firstWorkerStarted = false;
this._firstWorkerStartupAttempts = 0;
return this._updateConfig()
.then(() => {
// Recreate loggers
this._logger.close();
this._logger = new Logger(this.config.logging);
})
.then(this._rollingRestart.bind(this));
});
process.on('SIGHUP', this._rollingRestartHandler);
let ratelimiterSetup = P.resolve();
if (this.config.ratelimiter) {
......@@ -124,29 +131,31 @@ class Master extends BaseService {
_stopWorker(workerPid) {
const worker = cluster.workers[workerPid];
if (!worker || worker.state === 'disconnected') {
if (worker) {
delete this.workerStatusMap[worker.process.pid];
delete this._workerStatusMap[worker.process.pid];
}
return;
}
this.workerStatusMap[worker.process.pid] = {
this._workerStatusMap[worker.process.pid] = {
time: null,
killed: true
};
const res = new P((resolve) => {
const timeout = setTimeout(() => {
// worker.kill doesn't send a signal immediately, it waits until
// worker closes all connections with master. If after a minute
// it didn't happen, don't expect it happen ever.
worker.process.kill('SIGKILL');
delete this.workerStatusMap[worker.process.pid];
delete this._workerStatusMap[worker.process.pid];
resolve();
}, 60000);
worker.once('disconnect', () => {
clearTimeout(timeout);
worker.process.kill('SIGKILL');
delete this.workerStatusMap[worker.process.pid];
delete this._workerStatusMap[worker.process.pid];
resolve();
});
});
......@@ -155,9 +164,9 @@ class Master extends BaseService {
}
_onStatusReceived(worker, status) {
const val = this.workerStatusMap[worker.process.pid] || {};
const val = this._workerStatusMap[worker.process.pid] || {};
val.status = status;
this.workerStatusMap[worker.process.pid] = val;
this._workerStatusMap[worker.process.pid] = val;
}
_startWorker(workerId) {
......@@ -172,11 +181,17 @@ class Master extends BaseService {
type: 'config',
body: yaml.dump(config)
});
const startupWorkerExit = (code) => {
let workerMessageHandler = null;
const startupWorkerExitHandler = (code) => {
if (this._shuttingDown || this._inRollingRestart) {
return;
}
worker.removeListener('exit', startupWorkerExitHandler);
worker.removeListener('message', workerMessageHandler);
if (!this._firstWorkerStarted &&
this._firstWorkerStartupAttempts++ >= STARTUP_ATTEMPTS_LIMIT) {
// We tried to start the first worker 3 times, but never succeed.
......@@ -186,6 +201,7 @@ class Master extends BaseService {
worker_pid: worker.process.pid,
exit_code: code
});
return this._exitProcess(1);
}
......@@ -207,8 +223,9 @@ class Master extends BaseService {
resolve(this._startWorker(workerId));
});
};
worker.on('exit', startupWorkerExitHandler);
const workerExit = (worker) => {
const workerExitHandler = (worker) => {
if (this._shuttingDown || this._inRollingRestart) {
return;
}
......@@ -218,24 +235,26 @@ class Master extends BaseService {
worker_pid: worker.process.pid,
exit_code: worker.process.exitCode
};
if (this.workerStatusMap[worker.process.pid] &&
this.workerStatusMap[worker.process.pid].status) {
info.status = this.workerStatusMap[worker.process.pid].status;
if (this._workerStatusMap[worker.process.pid] &&
this._workerStatusMap[worker.process.pid].status) {
info.status = this._workerStatusMap[worker.process.pid].status;
}
this._logger.log('error/service-runner/master', info);
delete this.workerStatusMap[worker.process.pid];
delete this._workerStatusMap[worker.process.pid];
worker.removeListener('exit', workerExitHandler);
worker.removeListener('message', workerMessageHandler);
P.delay(Math.random() * 2000).then(() => {
resolve(this._startWorker(worker.worker_id));
});
};
worker.on('exit', startupWorkerExit);
worker.on('message', (msg) => {
workerMessageHandler = (msg) => {
switch (msg.type) {
case 'startup_finished':
worker.removeListener('exit', startupWorkerExit);
worker.removeListener('exit', startupWorkerExitHandler);
worker.on('exit', () => {
workerExit(worker);
workerExitHandler(worker);
});
this._firstWorkerStarted = true;
resolve(msg.serviceReturns);
......@@ -253,7 +272,9 @@ class Master extends BaseService {
this._logger.log('error/service-runner/master',
`unknown message type received from worker ${msg.type}`);
}
});
};
worker.on('message', workerMessageHandler);
});
}
// Fork a single worker, wait for it to start executing and set everything up,
......@@ -278,7 +299,7 @@ class Master extends BaseService {
const now = new Date();
Object.keys(cluster.workers).forEach((workerPid) => {
const worker = cluster.workers[workerPid];
const lastBeat = this.workerStatusMap[worker.process.pid];
const lastBeat = this._workerStatusMap[worker.process.pid];
if (!lastBeat || (!lastBeat.killed && now - lastBeat.time >
this.config.worker_heartbeat_timeout)) {
const info = {
......@@ -303,13 +324,13 @@ class Master extends BaseService {
* @private
*/
_saveBeat(worker) {
const currentVal = this.workerStatusMap[worker.process.pid];
const currentVal = this._workerStatusMap[worker.process.pid];
if (currentVal && currentVal.killed) {
return;
}
this.workerStatusMap[worker.process.pid] = currentVal || {};
this.workerStatusMap[worker.process.pid].time = new Date();
this.workerStatusMap[worker.process.pid].killed = false;
this._workerStatusMap[worker.process.pid] = currentVal || {};
this._workerStatusMap[worker.process.pid].time = new Date();
this._workerStatusMap[worker.process.pid].killed = false;
}
}
......
......@@ -20,12 +20,21 @@ const RateLimiterNoCluster = require('./ratelimiter').nocluster;
* @constructor
*/
class Worker extends BaseService {
constructor() {
super();
this._shutdownHandler = null;
this._dumpHeapHandler = null;
this._messageHandler = null;
this._serviceStatusHandler = null;
}
_getConfigUpdateAction() {
return new P((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Timeout waiting for config in worker ${process.pid}`));
}, 3000);
process.on('message', (message) => {
process.on('message', this._messageHandler = (message) => {
if (message.type === 'config') {
clearTimeout(timeout);
this._updateConfig(message.body)
......@@ -41,7 +50,7 @@ class Worker extends BaseService {
if (cluster.isWorker) {
// If got a status update in a worker - forward it to the master
process.on('service_status', (message) => {
process.on('service_status', this._serviceStatusHandler = (message) => {
try {
process.send({
type: 'service_status',
......@@ -69,8 +78,27 @@ class Worker extends BaseService {
this._metrics = undefined;
}
this._heapwatchHandle.close();
if (Array.isArray(this.serviceReturns)) {
return P.each(this.serviceReturns, (serviceRet) => {
// Remove signal handlers
if (this._shutdownHandler) {
process.removeListener('SIGTERM', this._shutdownHandler);
this._shutdownHandler = null;
}
if (this._dumpHeapHandler) {
process.removeListener('SIGUSR2', this._dumpHeapHandler);
this._dumpHeapHandler = null;
}
if (this._serviceStatusHandler) {
process.removeListener('service_status', this._serviceStatusHandler);
this._serviceStatusHandler = null;
}
if (this._messageHandler) {
process.removeListener('message', this._messageHandler);
this._messageHandler = null;
}
if (Array.isArray(this._serviceReturns)) {
return P.each(this._serviceReturns, (serviceRet) => {
if (serviceRet && typeof serviceRet.close === 'function') {
return serviceRet.close();
}
......@@ -82,20 +110,22 @@ class Worker extends BaseService {
_start() {
// Worker.
process.on('SIGTERM', () => this.stop()
.then(() => {
this._logger.log('info/service-runner/worker', {
message: 'worker shutting down',
worker_pid: process.pid
});
this._exitProcess(0);
}));
process.on('SIGTERM', this._shutdownHandler = () =>
this.stop()
.then(() => {
this._logger.log('info/service-runner/worker', {
message: 'worker shutting down',
worker_pid: process.pid
});
this._exitProcess(0);
})
);
// Enable heap dumps in /tmp on kill -USR2.
// See https://github.com/bnoordhuis/node-heapdump/
// For node 0.6/0.8: npm install heapdump@0.1.0
// For 0.10: npm install heapdump
process.on('SIGUSR2', () => {
process.on('SIGUSR2', this._dumpHeapHandler = () => {
try {
const heapdump = require('heapdump');
const cwd = process.cwd();
......@@ -176,7 +206,7 @@ class Worker extends BaseService {
})
.then((res) => {
let ret = res;
this.serviceReturns = res;
this._serviceReturns = res;
// Signal that this worker finished startup
if (cluster.isWorker) {
// Make sure that only JSON-serializable values are returned.
......
{
"name": "service-runner",
"version": "2.6.19",
"version": "2.7.0",
"description": "Generic nodejs service supervisor / cluster runner",
"main": "service-runner.js",
"bin": {
......
......@@ -72,10 +72,9 @@ describe('service-runner tests', () => {
.finally(() => server.stop());
});
// TODO: unskip after https://github.com/wikimedia/service-runner/pull/210 is done.
it.skip('Must remove all listeners on stop', (done) => {
it('Must remove all listeners on stop', (done) => {
const DEFAULT_MAX_LISTENERS = require('events').EventEmitter.defaultMaxListeners;
const server = new TestServer(`${__dirname}/../utils/simple_config.yaml`);
const server = new TestServer(`${__dirname}/../utils/simple_config_two_workers.yaml`);
const warningListener = (warning) => {
if (!done.called) {
done.called = true;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment