#!/usr/bin/node // // // // // data_collector_process source // // // // // // // File: data_collector_process // // Author: Francesco.Prelz@mi.infn.it // // Description: Standing process collecting data from an UDP channel // and appending them into an XML file (via atomic file rotation). // // History: // 6-Oct-2014 Initial creation. var dgram = require('dgram'); var fs = require('fs'); if (process.argv.length < 4) { process.stderr.write("Usage: node " + process.argv[1] + " <pidfile> <xml_out>\n"); process.exit(1); } var myself = process.argv[1]; var pidfile = process.argv[2]; var xmlout = process.argv[3]; var xmlout_tmp = xmlout + ".tmp"; var server = dgram.createSocket("udp4"); var xml_head = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<!-- Data logged in " + xmlout + " -->\n<dataBlocks>\n"; var xml_tail = "</dataBlocks>\n"; var xml_tail_buf = new Buffer(xml_tail, "utf8"); var xml_tail_offset = xml_head.length; var rotate_period = 10; var serial_count = 0; fs.writeFileSync(xmlout, xml_head+xml_tail); fs.createReadStream(xmlout).pipe(fs.createWriteStream(xmlout_tmp)); var xfd = fs.openSync(xmlout_tmp,"r+"); // Another process running on this pidfile ? var my_pid; try { my_pid = fs.readFileSync(pidfile, "utf8"); my_pid = my_pid.replace(/[^0-9]*$/, ''); } catch (err) { my_pid = ""; } if (parseInt(my_pid, 10) > 0) { var pid_in_proc = "/proc/"+my_pid; if (fs.existsSync(pid_in_proc)) { process.exit(0); } } fs.writeFileSync(pidfile, process.pid+"\n"); server.on("error", function (err) { process.stderr.write(myself+ ": server error:\n" + err.stack); server.close(); }); server.on("message", function (msg, rinfo) { var robj = JSON.parse(msg); if ((robj.source.apiName && robj.source.apiName.match(/accelerometer/i)) || (robj.type == "update")) { var item = "<dataBlock>"; item += "<serial>"+serial_count+"</serial>\n"; item += "<timeStamp>"+robj.timestamp+"</timeStamp>\n"; item += "<x>"+robj.x+"</x>\n"; item += "<y>"+robj.y+"</y>\n"; item += "<z>"+robj.z+"</z>\n"; item += "</dataBlock>\n"; var old_offset = xml_tail_offset; var buf = new Buffer(item, "utf8"); xml_tail_offset += buf.length; var wrbuf = Buffer.concat([buf, xml_tail_buf]); fs.writeSync(xfd, wrbuf, 0, wrbuf.length, old_offset); serial_count++; if ((serial_count % rotate_period) == 0) { fs.closeSync(xfd); fs.renameSync(xmlout_tmp, xmlout); // Copy file synchronously... buf = new Buffer(131072); var fr = fs.openSync(xmlout, 'r'); var fw = fs.openSync(xmlout_tmp, 'w'); var bread = 1; var pos = 0; while (bread > 0) { bread = fs.readSync(fr, buf, 0, buf.length, pos); fs.writeSync(fw, buf, 0, bread, pos); pos += bread; } fs.closeSync(fr); fs.closeSync(fw); xfd = fs.openSync(xmlout_tmp,"r+"); } } }); server.bind(5555);