这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# mqttpc

[![npm version](https://badge.fury.io/js/mqttpc.svg)](https://badge.fury.io/js/mqttpc)
[![License][mit-badge]][mit-url]

> Advanced process control via MQTT :satellite:
Expand Down Expand Up @@ -29,9 +28,9 @@ Options:
https://github.com/mqttjs/MQTT.js#connect-using-a-url
[default: "mqtt://127.0.0.1"]
-f, --config config file [default: "./procs.json"]
-h, --help Show help
-h, --help Show help
--version Show version number

```

### Config file
Expand All @@ -55,16 +54,23 @@ The config file contains a JSON definition of all processes you want to control

The only mandatory attribute for each process is "path", all others are optional.

* path - (string) path to the process
* args - (array[string]) arguments
* cwd - (string) the working directory (default: the cwd of mqttpc)
* env - (object) key-value paired environment (default: the env of mqttpc)
* uid - (number) user id
* gid - (number) group id
* shell - (boolean|string) run command in a shell (default: false). See https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options
* disableStdin - (boolean) Disable the possibility to send data through MQTT to the process stdin (default: false).
* disableStdout - (boolean) Disable MQTT publish of the process stdout (default: false).
* disableStderr - (boolean) Disable MQTT publish of the process stderr (default: false).
* `path` - (string) path to the process
* `args` - (array[string]) arguments
* `cwd` - (string) the working directory (default: the cwd of mqttpc)
* `env` - (object) key-value paired environment (default: the env of mqttpc)
* `uid` - (number) user id
* `gid` - (number) group id
* `shell` - (boolean|string) run command in a shell (default: false). See https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options
* `disableStdin` - (boolean) Disable the possibility to send data through MQTT to the process stdin (default: false).
* `stdinFromSpawnPayload` - (boolean) On spawn, send payload to stdin then close it (default: false).
* `enqueueSpawns` - (boolean) If spawn is called when it's already running, enqueue and run after process exits (default: false).
* `bufferMax` - (number default: 128k) Maximum byte size for each buffered output
* `stdout`, `stderr` and `output`: (`output` is the combination of the other two, preserving order as flushed)
* `drop`: **Default** - Output ignored
* `buffer` : Output is buffered until the last `bufferMax` bytes, and published when the process exits
* `buffer_retain` : Same as above but publishes have retain set
* `stream` : Each block of output is posted immediately (as per the decisions of the gods of buffers and node and os and so on)
* `stream_retain` : Same as above but publishes have retain set

### Usage example

Expand Down Expand Up @@ -97,25 +103,29 @@ Errors on process spawn will be published retained on this topic. On next succes

#### pc/status/<process_name>/stdout

The processes stdout will be published on this topic (not retained).
The processes `stdout` will be published on this topic.

#### pc/status/<process_name>/stderr

The processes stderr will be published on this topic (not retained).
The processes `stderr` will be published on this topic.

#### pc/status/<process_name>/output

The processes' combined `stdout` + `stderr` will be published on this topic.

#### pc/connected

Mqttpc will publish ```1``` on start. Will be reset to ```0``` via last will if broker connection or mqttpc process dies.

### Topics mqttpc subscribes
### Topics mqttpc subscribes

#### pc/set/<process_name>/spawn

Start the process

#### pc/set/<process_name>/pipe

Pipe payload into stdin of the process
Pipe payload into stdin of the process. Send an empty payload to close stdin.

#### pc/set/<process_name>/signal

Expand Down
209 changes: 156 additions & 53 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ var spawn = require('child_process').spawn;

var procs = require(config.config);

const DEFAULT_BUF_SIZE = 128 * 1024;
const TOPICS_TO_WATCH_FOR_RETAINED_FROM_PREVIOUS_CONF = '/status/+/stdout /status/+/stderr /status/+/output'.split(' ');
const TOPICS_TO_WATCH_FOR_RETAINED_TIMEOUT_MS = 20 * 1000;

var mqttConnected;

log.setLevel(config.verbosity);
Expand All @@ -23,40 +27,180 @@ mqtt.on('connect', function () {
log.info('mqtt connected', config.url);
mqtt.publish(config.name + '/connected', '1', {retain: true});

log.info('mqtt subscribe', config.name + '/set/#');
mqtt.subscribe(config.name + '/set/#');
const subAndLog = (topic) => {
log.info('mqtt subscribe', config.name + topic);
mqtt.subscribe(config.name + topic);
}

subAndLog('/set/#');

TOPICS_TO_WATCH_FOR_RETAINED_FROM_PREVIOUS_CONF.forEach(x => subAndLog(x));
setTimeout(() => {
const topics = TOPICS_TO_WATCH_FOR_RETAINED_FROM_PREVIOUS_CONF.map(x => config.name + x);
log.info('mqtt unsubscribe', topics.join(" and "));
mqtt.unsubscribe(topics);
}, TOPICS_TO_WATCH_FOR_RETAINED_TIMEOUT_MS);
});

mqtt.on('close', function () {
if (mqttConnected) {
mqttConnected = false;
log.info('mqtt closed ' + config.url);
}

});

mqtt.on('error', function (err) {
log.error('mqtt', err);

});

mqtt.on('message', function (topic, payload) {
function appendBuffer(proc, fdName, data) {
const fds = proc._fdBuffers = proc._fdBuffers || {};
const fdBuf = fds[fdName] = fds[fdName] || { len: 0, data: [], clipped: 0 };
fdBuf.len += data.length;
fdBuf.data.push(data);
const max_buffer_size = proc.bufferMax || DEFAULT_BUF_SIZE;
while (fdBuf.data.length > 1 && fdBuf.len > (max_buffer_size + fdBuf.data[0].length)) {
const plopped = fdBuf.data.shift();
fdBuf.len -= plopped.length;
fdBuf.clipped += plopped.length;
}
}

function handleProcessOutputEach(procName, proc, fdName, data) {
log.debug(procName, fdName, data.toString().replace(/\n$/, ''));
let retain = false;
switch (proc[fdName] || 'drop') {
case 'drop': break;
case 'buffer':
case 'buffer_retain':
appendBuffer(proc, fdName, data);
break;
case 'stream_retain':
retain = true;
// no break: passthrough on purpose here
case 'stream':
mqtt.publish(config.name + '/status/' + procName + '/' + fdName, data, {retain});
break;
default:
throw new Error("Unknown handler " + JSON.stringify(proc[fdName]) + " in proc definition for " + procName);
}
}

function handleProcessOutput(procName, proc, fdName, data) {
handleProcessOutputEach(procName, proc, fdName, data);
handleProcessOutputEach(procName, proc, 'output', data);
}

function handleProcessOutputAtExit(procName, proc, fdName) {
let retain = false;
switch (proc[fdName] || 'drop') {
case 'buffer_retain':
retain = true;
// no break: passthrough on purpose here
case 'buffer':
const fds = proc._fdBuffers = proc._fdBuffers || {};
const fdBuf = fds[fdName] = fds[fdName] || { len: 0, data: [] };
if (!fdBuf.len) return;
const dropped = fdBuf.clipped ? `...(clipped ${fdBuf.clipped})...\n` : "";
const result = Buffer.concat(fdBuf.data, fdBuf.len);
mqtt.publish(config.name + '/status/' + procName + '/' + fdName, dropped ? (dropped + result.toString()) : result, {retain});
delete fdBuf[fdName];
break;
}
}

function fdActionHasRetain(proc, fdName) {
return proc[fdName] == 'buffer_retain' || [fdName] == 'stream_retain';
}

function processSpawn(procName, proc, payload) {
if (proc._) {
if (proc.enqueueSpawns) {
log.warn(procName, 'already running', proc._.pid, ' enqueuing...');
(proc.queue = proc.queue || []).push(payload);
} else {
log.error(procName, 'already running', proc._.pid);
}
return;
}

mqtt.publish(config.name + '/status/' + procName + '/error', '', {retain: true});

proc._ = spawn(proc.path, proc.args, {
cwd: proc.cwd,
env: proc.env,
uid: proc.uid,
gid: proc.gid,
shell: proc.shell,
stdio: 'pipe'
});

if (proc._.pid) {
log.info(procName, 'started', proc.path, proc._.pid);
mqtt.publish(config.name + '/status/' + procName + '/pid', '' + proc._.pid, {retain: true});

} else {
log.error(procName, 'no pid, start failed');
}

delete proc._fdBuffers;
proc._.stdout.on('data', data => handleProcessOutput(procName, proc, 'stdout', data));
proc._.stderr.on('data', data => handleProcessOutput(procName, proc, 'stderr', data));

proc._.on('exit', function (code, signal) {
log.info(procName, 'exit', code, signal);
handleProcessOutputAtExit(procName, proc, 'stdout');
handleProcessOutputAtExit(procName, proc, 'stderr');
handleProcessOutputAtExit(procName, proc, 'output');
mqtt.publish(config.name + '/status/' + procName + '/pid', '', {retain: true});
mqtt.publish(config.name + '/status/' + procName + '/exit', '' + (code === null ? signal : code), {retain: true});
delete(proc._);
if (proc.queue && proc.queue.length) {
log.info(procName, 'finished running, dequeuing...');
processSpawn(procName, proc, proc.queue.shift());
}
});

proc._.on('error', function (e) {
log.error(procName, 'error', e);
mqtt.publish(config.name + '/status/' + procName + '/error', e.toString(), {retain: true});
});

if (proc.stdinFromSpawnPayload) {
proc._.stdin.write(payload);
proc._.stdin.end();
}

}

mqtt.on('message', function (topic, payload, packet) {
payload = payload.toString();
log.debug('mqtt <', topic, payload);

var tmp = topic.split('/');

var tmp = topic.substr(config.name.length).split('/');
const dir = tmp[1];
var p = tmp[2];
var cmd = tmp[3];

if (!procs[p]) {
log.error('unknown process ' + p);
return;
}

var proc = procs[p];

if (dir == "status") {
if (!packet.retain) return;

const fd = cmd;
if (!fdActionHasRetain(proc)) {
log.warn(p, 'deleting retained mqtt but not retained in config', packet);
mqtt.publish(topic, "", {retain: true});
}
return;
}


switch (cmd) {
case 'pipe':
if (proc.disableStdin) {
Expand All @@ -67,56 +211,15 @@ mqtt.on('message', function (topic, payload) {
log.error(p, 'not running');
return;
}
if (payload.length)
proc._.stdin.write(payload);
else
proc._.stdin.end();
break;


case 'spawn':
if (proc._) {
log.error(p, 'already running', proc._.pid);
return;
}

mqtt.publish(config.name + '/status/' + p + '/error', '', {retain: true});

proc._ = spawn(proc.path, proc.args, {
cwd: proc.cwd,
env: proc.env,
uid: proc.uid,
gid: proc.gid,
shell: proc.shell,
stdio: 'pipe'
});

if (proc._.pid) {
log.info(p, 'started', proc.path, proc._.pid);
mqtt.publish(config.name + '/status/' + p + '/pid', '' + proc._.pid, {retain: true});

} else {
log.error(p, 'no pid, start failed');
}

proc._.stdout.on('data', function (data) {
log.debug(p, 'stdout', data.toString().replace(/\n$/, ''));
if (!proc.disableStdout) mqtt.publish(config.name + '/status/' + p + '/stdout', data.toString(), {retain: true});
});

proc._.stderr.on('data', function (data) {
log.debug(p, 'stderr', data.toString().replace(/\n$/, ''));
if (!proc.disableStderr) mqtt.publish(config.name + '/status/' + p + '/stderr', data.toString(), {retain: true});
});

proc._.on('exit', function (code, signal) {
log.info(p, 'exit', code, signal);
mqtt.publish(config.name + '/status/' + p + '/pid', '', {retain: true});
mqtt.publish(config.name + '/status/' + p + '/exit', '' + (typeof code === null ? signal : code), {retain: true});
delete(proc._);
});

proc._.on('error', function (e) {
log.error(p, 'error', e);
mqtt.publish(config.name + '/status/' + p + '/error', e.toString(), {retain: true});
});

processSpawn(p, proc, payload);
break;


Expand Down
Loading