var fs = require('graceful-fs'); var Writable = require('readable-stream').Writable; var util = require('util'); var MurmurHash3 = require('imurmurhash'); var iferr = require('iferr'); var crypto = require('crypto'); var threadId = require('./thread-id'); function murmurhex() { var hash = MurmurHash3(''); for (var ii = 0; ii < arguments.length; ++ii) { hash.hash('' + arguments[ii]); } return hash.result(); } var invocations = 0; function getTmpname(filename) { return ( filename + '.' + murmurhex(__filename, process.pid, threadId, ++invocations) ); } var setImmediate = global.setImmediate || setTimeout; module.exports = WriteStreamAtomic; // Requirements: // 1. Write everything written to the stream to a temp file. // 2. If there are no errors: // a. moves the temp file into its final destination // b. emits `finish` & `closed` ONLY after the file is // fully flushed and renamed. // 3. If there's an error, removes the temp file. util.inherits(WriteStreamAtomic, Writable); function WriteStreamAtomic(path, options) { if (!(this instanceof WriteStreamAtomic)) { return new WriteStreamAtomic(path, options); } Writable.call(this, options); this.__isWin = options && options.hasOwnProperty('isWin') ? options.isWin : process.platform === 'win32'; this.__atomicTarget = path; this.__atomicTmp = getTmpname(path); this.__atomicChown = options && options.chown; this.__atomicClosed = false; this.__atomicStream = fs.WriteStream(this.__atomicTmp, options); this.__atomicStream.once('open', handleOpen(this)); this.__atomicStream.once('close', handleClose(this)); this.__atomicStream.once('error', handleError(this)); } // We have to suppress default finish emitting, because ordinarily it // would happen as soon as `end` is called on us and all of the // data has been written to our target stream. So we suppress // finish from being emitted here, and only emit it after our // target stream is closed and we've moved everything around. WriteStreamAtomic.prototype.emit = function(event) { if (event === 'finish') return this.__atomicStream.end(); return Writable.prototype.emit.apply(this, arguments); }; WriteStreamAtomic.prototype._write = function(buffer, encoding, cb) { var flushed = this.__atomicStream.write(buffer, encoding); if (flushed) return cb(); this.__atomicStream.once('drain', cb); }; function handleOpen(writeStream) { return function(fd) { writeStream.emit('open', fd); }; } function handleClose(writeStream) { return function() { if (writeStream.__atomicClosed) return; writeStream.__atomicClosed = true; if (writeStream.__atomicChown) { var uid = writeStream.__atomicChown.uid; var gid = writeStream.__atomicChown.gid; return fs.chown( writeStream.__atomicTmp, uid, gid, iferr(cleanup, moveIntoPlace), ); } else { moveIntoPlace(); } }; function moveIntoPlace() { fs.rename( writeStream.__atomicTmp, writeStream.__atomicTarget, iferr(trapWindowsEPERM, end), ); } function trapWindowsEPERM(err) { if ( writeStream.__isWin && err.syscall && err.syscall === 'rename' && err.code && err.code === 'EPERM' ) { checkFileHashes(err); } else { cleanup(err); } } function checkFileHashes(eperm) { var inprocess = 2; var tmpFileHash = crypto.createHash('sha512'); var targetFileHash = crypto.createHash('sha512'); fs.createReadStream(writeStream.__atomicTmp) .on('data', function(data, enc) { tmpFileHash.update(data, enc); }) .on('error', fileHashError) .on('end', fileHashComplete); fs.createReadStream(writeStream.__atomicTarget) .on('data', function(data, enc) { targetFileHash.update(data, enc); }) .on('error', fileHashError) .on('end', fileHashComplete); function fileHashError() { if (inprocess === 0) return; inprocess = 0; cleanup(eperm); } function fileHashComplete() { if (inprocess === 0) return; if (--inprocess) return; if (tmpFileHash.digest('hex') === targetFileHash.digest('hex')) { return cleanup(); } else { return cleanup(eperm); } } } function cleanup(err) { fs.unlink(writeStream.__atomicTmp, function() { if (err) { writeStream.emit('error', err); writeStream.emit('close'); } else { end(); } }); } function end() { // We have to use our parent class directly because we suppress `finish` // events fired via our own emit method. Writable.prototype.emit.call(writeStream, 'finish'); // Delay the close to provide the same temporal separation a physical // file operation would have– that is, the close event is emitted only // after the async close operation completes. setImmediate(function() { writeStream.emit('close'); }); } } function handleError(writeStream) { return function(er) { cleanupSync(); writeStream.emit('error', er); writeStream.__atomicClosed = true; writeStream.emit('close'); }; function cleanupSync() { try { fs.unlinkSync(writeStream.__atomicTmp); } finally { return; } } }