index.js 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. var once = require('once')
  2. var eos = require('end-of-stream')
  3. var fs = require('fs') // we only need fs to get the ReadStream and WriteStream prototypes
  4. var noop = function () {}
  5. var isFn = function (fn) {
  6. return typeof fn === 'function'
  7. }
  8. var isFS = function (stream) {
  9. if (!fs) return false // browser
  10. return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close)
  11. }
  12. var isRequest = function (stream) {
  13. return stream.setHeader && isFn(stream.abort)
  14. }
  15. var destroyer = function (stream, reading, writing, callback) {
  16. callback = once(callback)
  17. var closed = false
  18. stream.on('close', function () {
  19. closed = true
  20. })
  21. eos(stream, {readable: reading, writable: writing}, function (err) {
  22. if (err) return callback(err)
  23. closed = true
  24. callback()
  25. })
  26. var destroyed = false
  27. return function (err) {
  28. if (closed) return
  29. if (destroyed) return
  30. destroyed = true
  31. if (isFS(stream)) return stream.close(noop) // use close for fs streams to avoid fd leaks
  32. if (isRequest(stream)) return stream.abort() // request.destroy just do .end - .abort is what we want
  33. if (isFn(stream.destroy)) return stream.destroy()
  34. callback(err || new Error('stream was destroyed'))
  35. }
  36. }
  37. var call = function (fn) {
  38. fn()
  39. }
  40. var pipe = function (from, to) {
  41. return from.pipe(to)
  42. }
  43. var pump = function () {
  44. var streams = Array.prototype.slice.call(arguments)
  45. var callback = isFn(streams[streams.length - 1] || noop) && streams.pop() || noop
  46. if (Array.isArray(streams[0])) streams = streams[0]
  47. if (streams.length < 2) throw new Error('pump requires two streams per minimum')
  48. var error
  49. var destroys = streams.map(function (stream, i) {
  50. var reading = i < streams.length - 1
  51. var writing = i > 0
  52. return destroyer(stream, reading, writing, function (err) {
  53. if (!error) error = err
  54. if (err) destroys.forEach(call)
  55. if (reading) return
  56. destroys.forEach(call)
  57. callback(error)
  58. })
  59. })
  60. return streams.reduce(pipe)
  61. }
  62. module.exports = pump