#!/usr/bin/node
//
//
//
//
// data_collector_process source
//
//
//
//
//
//
// File: data_collector_process
//
// Author: Francesco.Prelz@mi.infn.it
//
// Description: Standing process collecting data from an UDP channel
// and appending them into an XML file (via atomic file rotation).
//
// History:
// 6-Oct-2014 Initial creation.
var dgram = require('dgram');
var fs = require('fs');
if (process.argv.length < 4)
{
process.stderr.write("Usage: node " + process.argv[1] + " \n");
process.exit(1);
}
var myself = process.argv[1];
var pidfile = process.argv[2];
var xmlout = process.argv[3];
var xmlout_tmp = xmlout + ".tmp";
var server = dgram.createSocket("udp4");
var xml_head = "\n\n\n";
var xml_tail = "\n";
var xml_tail_buf = new Buffer(xml_tail, "utf8");
var xml_tail_offset = xml_head.length;
var rotate_period = 10;
var serial_count = 0;
fs.writeFileSync(xmlout, xml_head+xml_tail);
fs.createReadStream(xmlout).pipe(fs.createWriteStream(xmlout_tmp));
var xfd = fs.openSync(xmlout_tmp,"r+");
// Another process running on this pidfile ?
var my_pid;
try
{
my_pid = fs.readFileSync(pidfile, "utf8");
my_pid = my_pid.replace(/[^0-9]*$/, '');
}
catch (err)
{
my_pid = "";
}
if (parseInt(my_pid, 10) > 0)
{
var pid_in_proc = "/proc/"+my_pid;
if (fs.existsSync(pid_in_proc))
{
process.exit(0);
}
}
fs.writeFileSync(pidfile, process.pid+"\n");
server.on("error",
function (err)
{
process.stderr.write(myself+ ": server error:\n" + err.stack);
server.close();
});
server.on("message",
function (msg, rinfo)
{
var robj = JSON.parse(msg);
if ((robj.source.apiName && robj.source.apiName.match(/accelerometer/i)) ||
(robj.type == "update"))
{
var item = "";
item += ""+serial_count+"\n";
item += ""+robj.timestamp+"\n";
item += ""+robj.x+"\n";
item += ""+robj.y+"\n";
item += ""+robj.z+"\n";
item += "\n";
var old_offset = xml_tail_offset;
var buf = new Buffer(item, "utf8");
xml_tail_offset += buf.length;
var wrbuf = Buffer.concat([buf, xml_tail_buf]);
fs.writeSync(xfd, wrbuf, 0, wrbuf.length, old_offset);
serial_count++;
if ((serial_count % rotate_period) == 0)
{
fs.closeSync(xfd);
fs.renameSync(xmlout_tmp, xmlout);
// Copy file synchronously...
buf = new Buffer(131072);
var fr = fs.openSync(xmlout, 'r');
var fw = fs.openSync(xmlout_tmp, 'w');
var bread = 1;
var pos = 0;
while (bread > 0)
{
bread = fs.readSync(fr, buf, 0, buf.length, pos);
fs.writeSync(fw, buf, 0, bread, pos);
pos += bread;
}
fs.closeSync(fr);
fs.closeSync(fw);
xfd = fs.openSync(xmlout_tmp,"r+");
}
}
});
server.bind(5555);