Commit 23da874

Anton Golub <antongolub@antongolub.com>
2025-08-01 14:33:24
feat: add `ProcessPromise.unpipe()` (#1302)
* feat: add `ProcessPromise.unpipe()` * fix: avoid multiple `streamPromisify()` the same target * test: check `pipe()` throws if wratable is ended * refactor: conceal `ProcessPromise` ref in `promisifiedStream.run()` * test: minor test impr
1 parent 9f195ee
build/core.cjs
@@ -619,66 +619,6 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
       if (this.sync) throw output;
     }
   }
-  // prettier-ignore
-  _pipe(source, dest, ...args) {
-    if ((0, import_util.isString)(dest))
-      return this._pipe(source, import_node_fs.default.createWriteStream(dest));
-    if ((0, import_util.isStringLiteral)(dest, ...args))
-      return this._pipe(
-        source,
-        $({
-          halt: true,
-          signal: this.signal
-        })(dest, ...args)
-      );
-    this._piped = true;
-    const { ee } = this._snapshot;
-    const output = this.output;
-    const isP = dest instanceof _ProcessPromise;
-    const from = new import_vendor_core2.VoidStream();
-    const end = () => {
-      if (!isP) return from.end();
-      setImmediate(() => {
-        _ProcessPromise.bus.unpipe(this, dest);
-        _ProcessPromise.bus.sources(dest).length === 0 && from.end();
-      });
-    };
-    const fill = () => {
-      for (const chunk of this._zurk.store[source]) from.write(chunk);
-    };
-    const fillSettled = () => {
-      if (!output) return;
-      if (!output.ok && isP) dest.break(output.exitCode, output.signal, output.cause);
-      fill();
-      end();
-    };
-    if (!output) {
-      const onData = (chunk) => from.write(chunk);
-      ee.once(source, () => {
-        fill();
-        ee.on(source, onData);
-      }).once("end", () => {
-        ee.removeListener(source, onData);
-        end();
-      });
-    }
-    if (isP) {
-      if (dest.isSettled()) throw new Fail("Cannot pipe to a settled process.");
-      _ProcessPromise.bus.pipe(this, dest);
-      from.pipe(dest._stdin);
-      if (dest.isHalted() && this.isHalted()) {
-        ee.once("start", () => dest.run());
-      } else {
-        dest.run();
-        this.catch((e) => dest.break(e.exitCode, e.signal, e.cause));
-      }
-      fillSettled();
-      return dest;
-    }
-    from.once("end", () => dest.emit(EPF)).pipe(dest);
-    fillSettled();
-    return promisifyStream(dest, this);
-  }
   abort(reason) {
     if (this.isSettled()) throw new Fail("Too late to abort the process.");
     if (this.signal !== this.ac.signal)
@@ -694,6 +634,35 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
     if (!this.pid) throw new Fail("The process pid is undefined.");
     return $.kill(this.pid, signal || this._snapshot.killSignal || $.killSignal);
   }
+  // Configurators
+  stdio(stdin, stdout = "pipe", stderr = "pipe") {
+    this._snapshot.stdio = [stdin, stdout, stderr];
+    return this;
+  }
+  nothrow(v = true) {
+    this._snapshot.nothrow = v;
+    return this;
+  }
+  quiet(v = true) {
+    this._snapshot.quiet = v;
+    return this;
+  }
+  verbose(v = true) {
+    this._snapshot.verbose = v;
+    return this;
+  }
+  timeout(d = 0, signal = $.timeoutSignal) {
+    if (this.isSettled()) return this;
+    const $2 = this._snapshot;
+    $2.timeout = (0, import_util.parseDuration)(d);
+    $2.timeoutSignal = signal;
+    if (this._timeoutId) clearTimeout(this._timeoutId);
+    if ($2.timeout && this.isRunning()) {
+      this._timeoutId = setTimeout(() => this.kill($2.timeoutSignal), $2.timeout);
+      this.finally(() => clearTimeout(this._timeoutId)).catch(import_util.noop);
+    }
+    return this;
+  }
   /**
    *  @deprecated Use $({halt: true})`cmd` instead.
    */
@@ -758,35 +727,6 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
   [Symbol.toPrimitive]() {
     return this.toString();
   }
-  // Configurators
-  stdio(stdin, stdout = "pipe", stderr = "pipe") {
-    this._snapshot.stdio = [stdin, stdout, stderr];
-    return this;
-  }
-  nothrow(v = true) {
-    this._snapshot.nothrow = v;
-    return this;
-  }
-  quiet(v = true) {
-    this._snapshot.quiet = v;
-    return this;
-  }
-  verbose(v = true) {
-    this._snapshot.verbose = v;
-    return this;
-  }
-  timeout(d = 0, signal = $.timeoutSignal) {
-    if (this.isSettled()) return this;
-    const $2 = this._snapshot;
-    $2.timeout = (0, import_util.parseDuration)(d);
-    $2.timeoutSignal = signal;
-    if (this._timeoutId) clearTimeout(this._timeoutId);
-    if ($2.timeout && this.isRunning()) {
-      this._timeoutId = setTimeout(() => this.kill($2.timeoutSignal), $2.timeout);
-      this.finally(() => clearTimeout(this._timeoutId)).catch(import_util.noop);
-    }
-    return this;
-  }
   // Output formatters
   json() {
     return this.then((o) => o.json());
@@ -822,6 +762,74 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
   isRunning() {
     return this.stage === "running";
   }
+  unpipe(to) {
+    _ProcessPromise.bus.unpipe(this, to);
+    return this;
+  }
+  // prettier-ignore
+  _pipe(source, dest, ...args) {
+    if ((0, import_util.isString)(dest))
+      return this._pipe(source, import_node_fs.default.createWriteStream(dest));
+    if ((0, import_util.isStringLiteral)(dest, ...args))
+      return this._pipe(
+        source,
+        $({
+          halt: true,
+          signal: this.signal
+        })(dest, ...args)
+      );
+    const isP = dest instanceof _ProcessPromise;
+    if (isP && dest.isSettled()) throw new Fail("Cannot pipe to a settled process.");
+    if (!isP && dest.writableEnded) throw new Fail("Cannot pipe to a closed stream.");
+    this._piped = true;
+    _ProcessPromise.bus.pipe(this, dest);
+    const { ee } = this._snapshot;
+    const output = this.output;
+    const from = new import_vendor_core2.VoidStream();
+    const check = () => {
+      var _a;
+      return !!((_a = _ProcessPromise.bus.refs.get(this)) == null ? void 0 : _a.has(dest));
+    };
+    const end = () => {
+      if (!check()) return;
+      setImmediate(() => {
+        _ProcessPromise.bus.unpipe(this, dest);
+        _ProcessPromise.bus.sources(dest).length === 0 && from.end();
+      });
+    };
+    const fill = () => {
+      for (const chunk of this._zurk.store[source]) from.write(chunk);
+    };
+    const fillSettled = () => {
+      if (!output) return;
+      if (isP && !output.ok) dest.break(output.exitCode, output.signal, output.cause);
+      fill();
+      end();
+    };
+    if (!output) {
+      const onData = (chunk) => check() && from.write(chunk);
+      ee.once(source, () => {
+        fill();
+        ee.on(source, onData);
+      }).once("end", () => {
+        ee.removeListener(source, onData);
+        end();
+      });
+    }
+    if (isP) {
+      from.pipe(dest._stdin);
+      if (this.isHalted()) ee.once("start", () => dest.run());
+      else {
+        dest.run();
+        this.catch((e) => dest.break(e.exitCode, e.signal, e.cause));
+      }
+      fillSettled();
+      return dest;
+    }
+    from.once("end", () => dest.emit(EPF)).pipe(dest);
+    fillSettled();
+    return _ProcessPromise.promisifyStream(dest, this);
+  }
   // Promise API
   then(onfulfilled, onrejected) {
     return super.then(onfulfilled, onrejected);
@@ -906,6 +914,7 @@ Object.defineProperty(_ProcessPromise.prototype, "pipe", { get() {
 // prettier-ignore
 _ProcessPromise.bus = {
   refs: /* @__PURE__ */ new Map(),
+  streams: /* @__PURE__ */ new WeakMap(),
   pipe(from, to) {
     const set = this.refs.get(from) || this.refs.set(from, /* @__PURE__ */ new Set()).get(from);
     set.add(to);
@@ -925,8 +934,10 @@ _ProcessPromise.bus = {
     }
   },
   runBack(p) {
+    var _a;
     for (const from of this.sources(p)) {
-      from.run();
+      if (from instanceof _ProcessPromise) from.run();
+      else (_a = this.streams.get(from)) == null ? void 0 : _a.run();
     }
   },
   sources(p) {
@@ -937,6 +948,25 @@ _ProcessPromise.bus = {
     return refs;
   }
 };
+_ProcessPromise.promisifyStream = (stream, from) => {
+  const proxy = _ProcessPromise.bus.streams.get(stream) || (0, import_util.proxyOverride)(stream, {
+    then(res = import_util.noop, rej = import_util.noop) {
+      return new Promise((_res, _rej) => {
+        const end = () => _res(res((0, import_util.proxyOverride)(stream, from.output)));
+        stream.once("error", (e) => _rej(rej(e))).once("finish", end).once(EPF, end);
+      });
+    },
+    run() {
+      from.run();
+    },
+    pipe(...args) {
+      const dest = stream.pipe.apply(stream, args);
+      return dest instanceof _ProcessPromise ? dest : _ProcessPromise.promisifyStream(dest, from);
+    }
+  });
+  _ProcessPromise.bus.streams.set(stream, proxy);
+  return proxy;
+};
 var ProcessPromise = _ProcessPromise;
 var _ProcessOutput = class _ProcessOutput extends Error {
   // prettier-ignore
@@ -1105,22 +1135,6 @@ function kill(_0) {
     }
   });
 }
-var promisifyStream = (stream, from) => (0, import_util.proxyOverride)(stream, {
-  then(res = import_util.noop, rej = import_util.noop) {
-    return new Promise((_res, _rej) => {
-      const onend = () => _res(res((0, import_util.proxyOverride)(stream, from.output)));
-      stream.once("error", (e) => _rej(rej(e))).once("finish", onend).once(EPF, onend);
-    });
-  },
-  run() {
-    return from.run();
-  },
-  // TODO _pipedFrom: from,
-  pipe(...args) {
-    const piped = stream.pipe.apply(stream, args);
-    return piped instanceof ProcessPromise ? piped : promisifyStream(piped, from);
-  }
-});
 function resolveDefaults(defs = defaults, prefix = ENV_PREFIX, env = import_node_process2.default.env, allowed = ENV_OPTS) {
   return Object.entries(env).reduce((m, [k, v]) => {
     if (v && k.startsWith(prefix)) {
build/core.d.ts
@@ -79,10 +79,13 @@ type PromiseCallback = {
     (resolve: Resolve, reject: Reject): void;
     [SHOT]?: Snapshot;
 };
-type PromisifiedStream<D extends Writable> = D & PromiseLike<ProcessOutput & D>;
+type PromisifiedStream<D extends Writable = Writable> = D & PromiseLike<ProcessOutput & D> & {
+    run(): void;
+};
+type PipeAcceptor = Writable | ProcessPromise;
 type PipeMethod = {
     (dest: TemplateStringsArray, ...args: any[]): ProcessPromise;
-    (file: string): PromisifiedStream<Writable>;
+    (file: string): PromisifiedStream;
     <D extends Writable>(dest: D): PromisifiedStream<D>;
     <D extends ProcessPromise>(dest: D): D;
 };
@@ -103,13 +106,13 @@ export declare class ProcessPromise extends Promise<ProcessOutput> {
     private _breakData?;
     private break;
     private finalize;
-    pipe: PipeMethod & {
-        [key in keyof TSpawnStore]: PipeMethod;
-    };
-    private _pipe;
-    private static bus;
     abort(reason?: string): void;
     kill(signal?: NodeJS.Signals | null): Promise<void>;
+    stdio(stdin: IOType, stdout?: IOType, stderr?: IOType): this;
+    nothrow(v?: boolean): this;
+    quiet(v?: boolean): this;
+    verbose(v?: boolean): this;
+    timeout(d?: Duration, signal?: NodeJS.Signals | undefined): this;
     /**
      *  @deprecated Use $({halt: true})`cmd` instead.
      */
@@ -130,11 +133,6 @@ export declare class ProcessPromise extends Promise<ProcessOutput> {
     get sync(): boolean;
     get [Symbol.toStringTag](): string;
     [Symbol.toPrimitive](): string;
-    stdio(stdin: IOType, stdout?: IOType, stderr?: IOType): this;
-    nothrow(v?: boolean): this;
-    quiet(v?: boolean): this;
-    verbose(v?: boolean): this;
-    timeout(d?: Duration, signal?: NodeJS.Signals | undefined): this;
     json<T = any>(): Promise<T>;
     text(encoding?: Encoding): Promise<string>;
     lines(delimiter?: Options['delimiter']): Promise<string[]>;
@@ -146,6 +144,13 @@ export declare class ProcessPromise extends Promise<ProcessOutput> {
     isHalted(): boolean;
     private isSettled;
     private isRunning;
+    pipe: PipeMethod & {
+        [key in keyof TSpawnStore]: PipeMethod;
+    };
+    unpipe(to?: PipeAcceptor): this;
+    private _pipe;
+    private static bus;
+    private static promisifyStream;
     then<R = ProcessOutput, E = ProcessOutput>(onfulfilled?: ((value: ProcessOutput) => PromiseLike<R> | R) | undefined | null, onrejected?: ((reason: ProcessOutput) => PromiseLike<E> | E) | undefined | null): Promise<R | E>;
     catch<T = ProcessOutput>(onrejected?: ((reason: ProcessOutput) => PromiseLike<T> | T) | undefined | null): Promise<ProcessOutput | T>;
     [Symbol.asyncIterator](): AsyncIterator<string>;
docs/process-promise.md
@@ -254,6 +254,25 @@ const o2 = await fs
 o2.stdout //  'TEST'
 ```
 
+## `unpipe()`
+
+Opposite of `pipe()`, it removes the process from the pipeline.
+
+```js
+const p1 = $`echo foo && sleep 0.05 && echo bar && sleep 0.05 && echo baz && sleep 0.05 && echo qux`
+const p2 = $`echo 1 && sleep 0.05 && echo 2 && sleep 0.05 && echo 3`
+const p3 = $`cat`
+
+p1.pipe(p3)
+p2.pipe(p3)
+
+setTimeout(() => p1.unpipe(p3), 105)
+
+assert.equal((await p1).stdout, 'foo\nbar\nbaz\nqux')
+assert.equal((await p2).stdout, '1\n2\n3')
+assert.equal((await p3).stdout, 'foo\n1\nbar\n2\n3')
+```
+
 ## `kill()`
 
 Kills the process and all children.
src/core.ts
@@ -233,12 +233,14 @@ type PromiseCallback = {
   [SHOT]?: Snapshot
 }
 
-type PromisifiedStream<D extends Writable> = D & PromiseLike<ProcessOutput & D>
+type PromisifiedStream<D extends Writable = Writable> = D &
+  PromiseLike<ProcessOutput & D> & { run(): void }
 
-type PipeDest = Writable | ProcessPromise | TemplateStringsArray | string
+type PipeAcceptor = Writable | ProcessPromise
+type PipeDest = PipeAcceptor | TemplateStringsArray | string
 type PipeMethod = {
   (dest: TemplateStringsArray, ...args: any[]): ProcessPromise
-  (file: string): PromisifiedStream<Writable>
+  (file: string): PromisifiedStream
   <D extends Writable>(dest: D): PromisifiedStream<D>
   <D extends ProcessPromise>(dest: D): D
 }
@@ -411,126 +413,6 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     }
   }
 
-  // Essentials
-  pipe!: PipeMethod & {
-    [key in keyof TSpawnStore]: PipeMethod
-  }
-  // prettier-ignore
-  static {
-    Object.defineProperty(this.prototype, 'pipe', { get() {
-      const self = this
-      const getPipeMethod = (kind: keyof TSpawnStore): PipeMethod => function (dest: PipeDest, ...args: any[]) { return self._pipe.call(self, kind, dest, ...args) }
-      const stdout = getPipeMethod('stdout')
-      const stderr = getPipeMethod('stderr')
-      const stdall = getPipeMethod('stdall')
-      return Object.assign(stdout, { stderr, stdout, stdall })
-    }})
-  }
-  // prettier-ignore
-  private _pipe(source: keyof TSpawnStore, dest: PipeDest, ...args: any[]): PromisifiedStream<Writable> | ProcessPromise {
-    if (isString(dest))
-      return this._pipe(source, fs.createWriteStream(dest))
-
-    if (isStringLiteral(dest, ...args))
-      return this._pipe(
-        source,
-        $({
-          halt: true,
-          signal: this.signal,
-        })(dest as TemplateStringsArray, ...args)
-      )
-
-    this._piped = true
-    const { ee } = this._snapshot
-    const output = this.output!
-    const isP = dest instanceof ProcessPromise
-    const from = new VoidStream()
-    const end = () => {
-      if (!isP) return from.end()
-      setImmediate(() => {
-        ProcessPromise.bus.unpipe(this, dest)
-        ProcessPromise.bus.sources(dest).length === 0 && from.end()
-      })
-    }
-    const fill = () => {
-      for (const chunk of this._zurk!.store[source]) from.write(chunk)
-    }
-    const fillSettled = () => {
-      if (!output) return
-      if (!output.ok && isP) dest.break(output.exitCode, output.signal, output.cause)
-      fill()
-      end()
-    }
-
-    if (!output) {
-      const onData = (chunk: string | Buffer) => from.write(chunk)
-      ee
-        .once(source, () => {
-          fill()
-          ee.on(source, onData)
-        })
-        .once('end', () => {
-          ee.removeListener(source, onData)
-          end()
-        })
-    }
-
-    if (isP) {
-      if (dest.isSettled()) throw new Fail('Cannot pipe to a settled process.')
-      ProcessPromise.bus.pipe(this, dest)
-
-      from.pipe(dest._stdin)
-      if (dest.isHalted() && this.isHalted()) {
-        ee.once('start', () => dest.run())
-      } else {
-        dest.run()
-        this.catch((e) => dest.break(e.exitCode, e.signal, e.cause))
-      }
-      fillSettled()
-      return dest
-    }
-
-    from.once('end', () => dest.emit(EPF)).pipe(dest)
-    fillSettled()
-    return promisifyStream(dest, this) as Writable &
-      PromiseLike<ProcessOutput & Writable>
-  }
-
-  // prettier-ignore
-  private static bus = {
-    refs: new Map<ProcessPromise, Set<ProcessPromise>>,
-    pipe(from: ProcessPromise, to: ProcessPromise) {
-      const set = this.refs.get(from) || (this.refs.set(from, new Set<ProcessPromise>())).get(from)!
-      set.add(to)
-    },
-    unpipe(from: ProcessPromise, to?: ProcessPromise) {
-      const set = this.refs.get(from)
-      if (!set) return
-      if (to) set.delete(to)
-      if (set.size) return
-      this.refs.delete(from)
-      from._piped = false
-    },
-    unpipeBack(to: ProcessPromise, from?: ProcessPromise) {
-      if (from) return this.unpipe(from, to)
-      for (const _from of this.refs.keys()) {
-        this.unpipe(_from, to)
-      }
-    },
-    runBack(p: ProcessPromise) {
-      for (const from of this.sources(p)) {
-        from.run()
-      }
-    },
-    sources(p: ProcessPromise): ProcessPromise[] {
-      const refs = []
-      for (const [from, set] of this.refs.entries()) {
-        set.has(p) && refs.push(from)
-      }
-      return refs
-    }
-  }
-
   abort(reason?: string) {
     if (this.isSettled()) throw new Fail('Too late to abort the process.')
     if (this.signal !== this.ac.signal)
@@ -550,6 +432,41 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     return $.kill(this.pid, signal || this._snapshot.killSignal || $.killSignal)
   }
 
+  // Configurators
+  stdio(stdin: IOType, stdout: IOType = 'pipe', stderr: IOType = 'pipe'): this {
+    this._snapshot.stdio = [stdin, stdout, stderr]
+    return this
+  }
+
+  nothrow(v = true): this {
+    this._snapshot.nothrow = v
+    return this
+  }
+
+  quiet(v = true): this {
+    this._snapshot.quiet = v
+    return this
+  }
+
+  verbose(v = true): this {
+    this._snapshot.verbose = v
+    return this
+  }
+
+  timeout(d: Duration = 0, signal = $.timeoutSignal): this {
+    if (this.isSettled()) return this
+
+    const $ = this._snapshot
+    $.timeout = parseDuration(d)
+    $.timeoutSignal = signal
+
+    if (this._timeoutId) clearTimeout(this._timeoutId)
+    if ($.timeout && this.isRunning()) {
+      this._timeoutId = setTimeout(() => this.kill($.timeoutSignal), $.timeout)
+      this.finally(() => clearTimeout(this._timeoutId)).catch(noop)
+    }
+    return this
+  }
   /**
    *  @deprecated Use $({halt: true})`cmd` instead.
    */
@@ -626,42 +543,6 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     return this.toString()
   }
 
-  // Configurators
-  stdio(stdin: IOType, stdout: IOType = 'pipe', stderr: IOType = 'pipe'): this {
-    this._snapshot.stdio = [stdin, stdout, stderr]
-    return this
-  }
-
-  nothrow(v = true): this {
-    this._snapshot.nothrow = v
-    return this
-  }
-
-  quiet(v = true): this {
-    this._snapshot.quiet = v
-    return this
-  }
-
-  verbose(v = true): this {
-    this._snapshot.verbose = v
-    return this
-  }
-
-  timeout(d: Duration = 0, signal = $.timeoutSignal): this {
-    if (this.isSettled()) return this
-
-    const $ = this._snapshot
-    $.timeout = parseDuration(d)
-    $.timeoutSignal = signal
-
-    if (this._timeoutId) clearTimeout(this._timeoutId)
-    if ($.timeout && this.isRunning()) {
-      this._timeoutId = setTimeout(() => this.kill($.timeoutSignal), $.timeout)
-      this.finally(() => clearTimeout(this._timeoutId)).catch(noop)
-    }
-    return this
-  }
-
   // Output formatters
   json<T = any>(): Promise<T> {
     return this.then((o) => o.json<T>())
@@ -708,6 +589,166 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     return this.stage === 'running'
   }
 
+  // Piping
+  pipe!: PipeMethod & {
+    [key in keyof TSpawnStore]: PipeMethod
+  }
+
+  unpipe(to?: PipeAcceptor): this {
+    ProcessPromise.bus.unpipe(this, to)
+    return this
+  }
+
+  // prettier-ignore
+  static {
+    Object.defineProperty(this.prototype, 'pipe', { get() {
+        const self = this
+        const getPipeMethod = (kind: keyof TSpawnStore): PipeMethod => function (dest: PipeDest, ...args: any[]) { return self._pipe.call(self, kind, dest, ...args) }
+        const stdout = getPipeMethod('stdout')
+        const stderr = getPipeMethod('stderr')
+        const stdall = getPipeMethod('stdall')
+        return Object.assign(stdout, { stderr, stdout, stdall })
+      }})
+  }
+  // prettier-ignore
+  private _pipe(source: keyof TSpawnStore, dest: PipeDest, ...args: any[]): PromisifiedStream | ProcessPromise {
+    if (isString(dest))
+      return this._pipe(source, fs.createWriteStream(dest))
+
+    if (isStringLiteral(dest, ...args))
+      return this._pipe(
+        source,
+        $({
+          halt: true,
+          signal: this.signal,
+        })(dest as TemplateStringsArray, ...args)
+      )
+
+    const isP = dest instanceof ProcessPromise
+    if (isP && dest.isSettled()) throw new Fail('Cannot pipe to a settled process.')
+    if (!isP && dest.writableEnded) throw new Fail('Cannot pipe to a closed stream.')
+
+    this._piped = true
+    ProcessPromise.bus.pipe(this, dest)
+
+    const { ee } = this._snapshot
+    const output = this.output
+    const from = new VoidStream()
+    const check = () => !!ProcessPromise.bus.refs.get(this)?.has(dest)
+    const end = () => {
+      if (!check()) return
+      setImmediate(() => {
+        ProcessPromise.bus.unpipe(this, dest)
+        ProcessPromise.bus.sources(dest).length === 0 && from.end()
+      })
+    }
+    const fill = () => {
+      for (const chunk of this._zurk!.store[source]) from.write(chunk)
+    }
+    const fillSettled = () => {
+      if (!output) return
+      if (isP && !output.ok) dest.break(output.exitCode, output.signal, output.cause)
+      fill()
+      end()
+    }
+
+    if (!output) {
+      const onData = (chunk: string | Buffer) => check() && from.write(chunk)
+      ee
+        .once(source, () => {
+          fill()
+          ee.on(source, onData)
+        })
+        .once('end', () => {
+          ee.removeListener(source, onData)
+          end()
+        })
+    }
+
+    if (isP) {
+      from.pipe(dest._stdin)
+      if (this.isHalted()) ee.once('start', () => dest.run())
+      else {
+        dest.run()
+        this.catch((e) => dest.break(e.exitCode, e.signal, e.cause))
+      }
+      fillSettled()
+      return dest
+    }
+
+    from.once('end', () => dest.emit(EPF)).pipe(dest)
+    fillSettled()
+    return ProcessPromise.promisifyStream(dest, this)
+  }
+
+  // prettier-ignore
+  private static bus = {
+    refs: new Map<ProcessPromise, Set<PipeAcceptor>>,
+    streams: new WeakMap<Writable, PromisifiedStream>(),
+    pipe(from: ProcessPromise, to: PipeAcceptor) {
+      const set = this.refs.get(from) || (this.refs.set(from, new Set())).get(from)!
+      set.add(to)
+    },
+    unpipe(from: ProcessPromise, to?: PipeAcceptor) {
+      const set = this.refs.get(from)
+      if (!set) return
+      if (to) set.delete(to)
+      if (set.size) return
+      this.refs.delete(from)
+      from._piped = false
+    },
+    unpipeBack(to: ProcessPromise, from?: ProcessPromise) {
+      if (from) return this.unpipe(from, to)
+      for (const _from of this.refs.keys()) {
+        this.unpipe(_from, to)
+      }
+    },
+    runBack(p: PipeAcceptor) {
+      for (const from of this.sources(p)) {
+        if (from instanceof ProcessPromise) from.run()
+        else this.streams.get(from)?.run()
+      }
+    },
+    sources(p: PipeAcceptor): PipeAcceptor[] {
+      const refs = []
+      for (const [from, set] of this.refs.entries()) {
+        set.has(p) && refs.push(from)
+      }
+      return refs
+    }
+  }
+
+  private static promisifyStream = <S extends Writable>(
+    stream: S,
+    from: ProcessPromise
+  ): PromisifiedStream<S> => {
+    const proxy =
+      ProcessPromise.bus.streams.get(stream) ||
+      proxyOverride(stream as PromisifiedStream<S>, {
+        then(res: any = noop, rej: any = noop) {
+          return new Promise((_res, _rej) => {
+            const end = () => _res(res(proxyOverride(stream, from.output)))
+            stream
+              .once('error', (e) => _rej(rej(e)))
+              .once('finish', end)
+              .once(EPF, end)
+          })
+        },
+        run() {
+          from.run()
+        },
+        pipe(...args: any) {
+          const dest = stream.pipe.apply(stream, args)
+          return dest instanceof ProcessPromise
+            ? dest
+            : ProcessPromise.promisifyStream(dest as Writable, from)
+        },
+      })
+
+    ProcessPromise.bus.streams.set(stream, proxy as any)
+    return proxy as PromisifiedStream<S>
+  }
+
   // Promise API
   override then<R = ProcessOutput, E = ProcessOutput>(
     onfulfilled?:
@@ -1032,32 +1073,6 @@ export async function kill(pid: number, signal = $.killSignal) {
   }
 }
 
-const promisifyStream = <S extends Writable>(
-  stream: S,
-  from: ProcessPromise
-): S & PromiseLike<ProcessOutput & S> =>
-  proxyOverride(stream as S & PromiseLike<ProcessOutput & S>, {
-    then(res: any = noop, rej: any = noop) {
-      return new Promise((_res, _rej) => {
-        const onend = () => _res(res(proxyOverride(stream, from.output)))
-        stream
-          .once('error', (e) => _rej(rej(e)))
-          .once('finish', onend)
-          .once(EPF, onend)
-      })
-    },
-    run() {
-      return from.run()
-    },
-    // TODO _pipedFrom: from,
-    pipe(...args: any) {
-      const piped = stream.pipe.apply(stream, args)
-      return piped instanceof ProcessPromise
-        ? piped
-        : promisifyStream(piped as Writable, from)
-    },
-  })
-
 export function resolveDefaults(
   defs: Options = defaults,
   prefix: string = ENV_PREFIX,
src/util.ts
@@ -129,10 +129,9 @@ export const proxyOverride = <T extends object>(
   ...fallbacks: any
 ): T =>
   new Proxy(origin, {
-    get(target: T, key) {
+    get(target: any, key) {
       return (
-        fallbacks.find((f: any) => key in f)?.[key] ??
-        Reflect.get(target as T, key)
+        fallbacks.find((f: any) => key in f)?.[key] ?? Reflect.get(target, key)
       )
     },
   }) as T
test/core.test.js
@@ -615,7 +615,7 @@ describe('core', () => {
       assert.equal((await p2).stdout, 'bar')
     })
 
-    describe('pipe() API', () => {
+    describe('pipe()', () => {
       test('accepts Writable', async () => {
         let contents = ''
         const stream = new Writable({
@@ -626,12 +626,20 @@ describe('core', () => {
         })
         const p1 = $`echo 'test'`
         const p2 = p1.pipe(stream)
+        assert.equal(p1._piped, true)
         await p2
-        assert.ok(p1._piped)
+        assert.equal(p1._piped, false)
         assert.ok(p1.stderr instanceof Socket)
         assert.equal(contents, 'test\n')
       })
 
+      test('throws if Writable ended', async () => {
+        const stream = { writableEnded: true }
+        const p = $`echo foo`
+        assert.throws(() => p.pipe(stream), /Cannot pipe to a closed stream/)
+        await p
+      })
+
       test('accepts WriteStream', async () => {
         const file = tempfile()
         try {
@@ -781,14 +789,14 @@ describe('core', () => {
         test('$ halted > stream', async () => {
           const file = tempfile()
           const fileStream = fs.createWriteStream(file)
-          const p = $({ halt: true })`echo "hello"`
-            .pipe(getUpperCaseTransform())
-            .pipe(fileStream)
+          const p1 = $({ halt: true })`echo "hello"`
+          const p2 = p1.pipe(getUpperCaseTransform()).pipe(fileStream)
+
+          assert.ok(p2 instanceof WriteStream)
+          assert.equal(p2.run(), undefined)
+
+          await p2
 
-          assert.ok(p instanceof WriteStream)
-          assert.ok(p.run() instanceof ProcessPromise)
-          await p
-          assert.equal((await p.run()).stdout, 'hello\n')
           assert.equal((await fs.readFile(file)).toString(), 'HELLO\n')
           await fs.rm(file)
         })
@@ -956,6 +964,24 @@ describe('core', () => {
       })
     })
 
+    describe('unpipe()', () => {
+      it('disables piping', async () => {
+        const p1 = $`echo foo && sleep 0.1 && echo bar && sleep 0.1 && echo baz && sleep 0.1 && echo qux`
+        const p2 = $`echo 1 && sleep 0.15 && echo 2 && sleep 0.1 && echo 3`
+        const p3 = $`cat`
+
+        p1.pipe(p3)
+        p2.pipe(p3)
+
+        setTimeout(() => {
+          p1.unpipe(p3)
+        }, 150)
+
+        const { stdout } = await p3
+        assert.equal(stdout, 'foo\n1\nbar\n2\n3\n')
+      })
+    })
+
     describe('abort()', () => {
       test('just works', async () => {
         const p = $({ detached: true })`sleep 999`
test/export.test.js
@@ -34,6 +34,7 @@ describe('core', () => {
     assert.equal(typeof core.ProcessOutput.getExitMessage, 'function', 'core.ProcessOutput.getExitMessage')
     assert.equal(typeof core.ProcessPromise, 'function', 'core.ProcessPromise')
     assert.equal(typeof core.ProcessPromise.bus, 'object', 'core.ProcessPromise.bus')
+    assert.equal(typeof core.ProcessPromise.promisifyStream, 'function', 'core.ProcessPromise.promisifyStream')
     assert.equal(typeof core.cd, 'function', 'core.cd')
     assert.equal(typeof core.chalk, 'function', 'core.chalk')
     assert.equal(typeof core.chalk.level, 'number', 'core.chalk.level')
@@ -161,6 +162,7 @@ describe('index', () => {
     assert.equal(typeof index.ProcessOutput.getExitMessage, 'function', 'index.ProcessOutput.getExitMessage')
     assert.equal(typeof index.ProcessPromise, 'function', 'index.ProcessPromise')
     assert.equal(typeof index.ProcessPromise.bus, 'object', 'index.ProcessPromise.bus')
+    assert.equal(typeof index.ProcessPromise.promisifyStream, 'function', 'index.ProcessPromise.promisifyStream')
     assert.equal(typeof index.VERSION, 'string', 'index.VERSION')
     assert.equal(typeof index.YAML, 'object', 'index.YAML')
     assert.equal(typeof index.YAML.Alias, 'function', 'index.YAML.Alias')
test-d/core.test-d.ts
@@ -27,9 +27,12 @@ expectType<ProcessPromise>(p.nothrow())
 expectType<ProcessPromise>(p.quiet())
 expectType<ProcessPromise>(p.pipe($`cmd`))
 expectType<ProcessPromise>(p.pipe`cmd`)
-expectType<Writable & PromiseLike<ProcessOutput & Writable>>(p.pipe('file'))
 expectType<
-  typeof process.stdout & PromiseLike<ProcessOutput & typeof process.stdout>
+  Writable & PromiseLike<ProcessOutput & Writable> & { run: () => void }
+>(p.pipe('file'))
+expectType<
+  typeof process.stdout &
+    PromiseLike<ProcessOutput & typeof process.stdout> & { run: () => void }
 >(p.pipe(process.stdout))
 expectType<ProcessPromise>(p.stdio('pipe'))
 expectType<ProcessPromise>(p.timeout('1s'))
.size-limit.json
@@ -15,7 +15,7 @@
       "README.md",
       "LICENSE"
     ],
-    "limit": "124.25 kB",
+    "limit": "125.00 kB",
     "brotli": false,
     "gzip": false
   },
@@ -29,14 +29,14 @@
       "build/globals.js",
       "build/deno.js"
     ],
-    "limit": "814.60 kB",
+    "limit": "815.20 kB",
     "brotli": false,
     "gzip": false
   },
   {
     "name": "libdefs",
     "path": "build/*.d.ts",
-    "limit": "40.30 kB",
+    "limit": "40.45 kB",
     "brotli": false,
     "gzip": false
   },
@@ -62,7 +62,7 @@
       "README.md",
       "LICENSE"
     ],
-    "limit": "871.75 kB",
+    "limit": "872.50 kB",
     "brotli": false,
     "gzip": false
   }