将背压逻辑应用于生成器的有效方法?

问题描述

在我想从数据结构向文件系统或网络写入许多兆字节或千兆字节的情况下,我一直试图想出一种方法来减轻背压,而天真地这样做会导致 poor memory usage behavior

这是我想出来的。

import stream from "stream";
import fs from "fs";

function* seq() {
  for (let i = 0; i < 10 * 1000 * 1000; ++i) yield String(i) + "\n";
}

stream.pipeline(
  stream.Readable.from(seq()),fs.createWriteStream("test-generator.log","utf8"),(err) => {
    console.log(err ? "error" : "done");
  }
);

这确实有效,但我觉得它可以有更好的运行时性能

此处的生成器代表来自我们应用程序的数据源,我利用 pipeline 获得流背压处理。

我很确定可以争论生成器对示例没有帮助,我本可以将递增逻辑融入可读中,但是 Readable.from 糖非常引人注目。

❯ time node -r ts-node/register test3.ts                                        
done
node -r ts-node/register test3.ts  9.67s user 1.28s system 129% cpu 8.453 total

我已经确认它可以正常工作(没有与缓冲相关的失控内存问题),现在我只是觉得这有点慢。是的,这是 1000 万行,76MB 输出,大量数据,但我真的希望有可能减少吞吐量方面的瓶颈。我认为它在 cpu 上花费了太多时间,提供低于 10MB/s 的吞吐量。我不认为我会认为这是一个“好”的实现,除非它的速度比存储设备快一百倍左右。

节点分析结果:

Statistical profiling result from isolate-0x5d5b060-10453-v8.log,(8109 ticks,49 unaccounted,0 excluded).

 [Shared libraries]:
   ticks  total  nonlib   name
   5444   67.1%          /usr/local/bin/node
    266    3.3%          /lib/x86_64-linux-gnu/libc-2.27.so
      2    0.0%          [vdso]

 [JavaScript]:
   ticks  total  nonlib   name
    347    4.3%   14.5%  LazyCompile: *next internal/streams/from.js:74:22
    340    4.2%   14.2%  LazyCompile: *ondata internal/streams/readable.js:717:18
    249    3.1%   10.4%  LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    195    2.4%    8.1%  LazyCompile: *fromString buffer.js:444:20
    123    1.5%    5.1%  LazyCompile: *seq /home/slu/Documents/entropy-dynamics/faa/test3.ts:8:14
     76    0.9%    3.2%  LazyCompile: *emit events.js:264:44
     57    0.7%    2.4%  LazyCompile: *WriteStream._writev internal/fs/streams.js:416:41
     51    0.6%    2.1%  LazyCompile: *EventEmitter.emit domain.js:456:39
     11    0.1%    0.5%  LazyCompile: *<anonymous> internal/fs/utils.js:636:45
      8    0.1%    0.3%  LazyCompile: *scan /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:10708:22
      6    0.1%    0.3%  LazyCompile: *writev fs.js:717:16
      5    0.1%    0.2%  LazyCompile: *bindWorker /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:42602:28
      4    0.0%    0.2%  LazyCompile: *processticksAndRejections internal/process/task_queues.js:65:35
      4    0.0%    0.2%  LazyCompile: *parseJSDocCommentWorker /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:34575:45
      4    0.0%    0.2%  LazyCompile: *<anonymous> /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:34592:74
      3    0.0%    0.1%  LazyCompile: *scanjsDocToken /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:11431:32
      2    0.0%    0.1%  LazyCompile: *onwrite internal/streams/writable.js:394:17
      2    0.0%    0.1%  LazyCompile: *nextTick internal/process/task_queues.js:101:18
      2    0.0%    0.1%  LazyCompile: *declareSymbol /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:40668:31
      2    0.0%    0.1%  LazyCompile: *bindChildren /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:40973:30
      2    0.0%    0.1%  LazyCompile: *bind /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:42512:22
      1    0.0%    0.0%  RegExp: (\slib\s*=\s*)('|")(.+?)\2
      1    0.0%    0.0%  LazyCompile: *visitNode /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:28207:23
      1    0.0%    0.0%  LazyCompile: *setContextFlag /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:29293:32
      1    0.0%    0.0%  LazyCompile: *scanString /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:10343:28
      1    0.0%    0.0%  LazyCompile: *scanIdentifier /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:11172:32
      1    0.0%    0.0%  LazyCompile: *parseStatement /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:33466:32
      1    0.0%    0.0%  LazyCompile: *parsePropertyOrMethodSignature /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:30933:48
      1    0.0%    0.0%  LazyCompile: *parseParameterWorker /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:30759:38
      1    0.0%    0.0%  LazyCompile: *parseDelimitedList /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:30342:36
      1    0.0%    0.0%  LazyCompile: *normalizeString path.js:52:25
      1    0.0%    0.0%  LazyCompile: *nextToken /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:29434:27
      1    0.0%    0.0%  LazyCompile: *modifiersToFlags /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:17966:30
      1    0.0%    0.0%  LazyCompile: *iterateCommentRanges /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:9928:34
      1    0.0%    0.0%  LazyCompile: *isLeftHandSideExpressionKind /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:13005:42
      1    0.0%    0.0%  LazyCompile: *isExportSpecifier /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:27185:31
      1    0.0%    0.0%  LazyCompile: *hasDynamicName /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:16495:28
      1    0.0%    0.0%  LazyCompile: *getSyntacticModifierFlagsNoCache /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:17958:46
      1    0.0%    0.0%  LazyCompile: *get internal/streams/writable.js:698:8
      1    0.0%    0.0%  LazyCompile: *forEachChild /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:28244:26
      1    0.0%    0.0%  LazyCompile: *createtoken /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:21424:29
      1    0.0%    0.0%  LazyCompile: *createNodeArray /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:29620:33
      1    0.0%    0.0%  LazyCompile: *createNodeArray /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:21143:33
      1    0.0%    0.0%  LazyCompile: *createBaseGenericNamedDeclaration /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:21210:51
      1    0.0%    0.0%  LazyCompile: *codePointAt /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:11595:63
      1    0.0%    0.0%  LazyCompile: *bindParameter /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:43358:31
      1    0.0%    0.0%  LazyCompile: *bindEach /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:40963:26
      1    0.0%    0.0%  LazyCompile: *appendIfUnique /home/slu/Documents/entropy-dynamics/faa/node_modules/typescript/lib/typescript.js:1226:28

 [C++]:
   ticks  total  nonlib   name
    589    7.3%   24.6%  epoll_pwait
     51    0.6%    2.1%  __GI___pthread_getspecific
     48    0.6%    2.0%  __pthread_cond_timedwait
     26    0.3%    1.1%  __GI___pthread_mutex_lock
     16    0.2%    0.7%  cfree
     16    0.2%    0.7%  __pthread_cond_signal
     11    0.1%    0.5%  write
     11    0.1%    0.5%  mprotect
      8    0.1%    0.3%  __libc_malloc
      8    0.1%    0.3%  __GI___pthread_mutex_unlock
      7    0.1%    0.3%  munmap
      6    0.1%    0.3%  __lll_lock_wait
      5    0.1%    0.2%  __lll_unlock_wake
      4    0.0%    0.2%  std::basic_ostream<char,std::char_traits<char> >& std::__ostream_insert<char,std::char_traits<char> >(std::basic_ostream<char,std::char_traits<char> >&,char const*,long)
      4    0.0%    0.2%  fwrite
      3    0.0%    0.1%  std::ostream::sentry::~sentry()
      3    0.0%    0.1%  mmap
      2    0.0%    0.1%  do_futex_wait.constprop.1
      2    0.0%    0.1%  _IO_file_xsputn
      1    0.0%    0.0%  sysconf
      1    0.0%    0.0%  std::string::_Rep::_S_create(unsigned long,unsigned long,std::allocator<char> const&)
      1    0.0%    0.0%  std::ostream::sentry::sentry(std::ostream&)
      1    0.0%    0.0%  sigemptyset
      1    0.0%    0.0%  sigaddset
      1    0.0%    0.0%  __printf_fp
      1    0.0%    0.0%  __libc_write
      1    0.0%    0.0%  _IO_file_sync

 [Summary]:
   ticks  total  nonlib   name
   1520   18.7%   63.4%  JavaScript
    828   10.2%   34.5%  C++
    656    8.1%   27.4%  GC
   5712   70.4%          Shared libraries
     49    0.6%          Unaccounted

 [C++ entry points]:
   ticks    cpp   total   name
     50   42.7%    0.6%  __GI___pthread_getspecific
     21   17.9%    0.3%  __GI___pthread_mutex_lock
      8    6.8%    0.1%  write
      8    6.8%    0.1%  __libc_malloc
      8    6.8%    0.1%  __GI___pthread_mutex_unlock
      4    3.4%    0.0%  std::basic_ostream<char,long)
      4    3.4%    0.0%  fwrite
      3    2.6%    0.0%  std::ostream::sentry::~sentry()
      3    2.6%    0.0%  mprotect
      2    1.7%    0.0%  _IO_file_xsputn
      1    0.9%    0.0%  std::ostream::sentry::sentry(std::ostream&)
      1    0.9%    0.0%  cfree
      1    0.9%    0.0%  __pthread_cond_signal
      1    0.9%    0.0%  __lll_lock_wait
      1    0.9%    0.0%  __libc_write
      1    0.9%    0.0%  _IO_file_sync

 [Bottom up (heavy) profile]:
  Note: percentage shows a share of a particular caller in the total
  amount of its parent calls.
  Callers occupying less than 1.0% are not shown.

   ticks parent  name
   5444   67.1%  /usr/local/bin/node
   1343   24.7%    /usr/local/bin/node
    436   32.5%      /usr/local/bin/node
    426   97.7%        LazyCompile: *seq /home/slu/Documents/entropy-dynamics/faa/test3.ts:8:14
    426  100.0%          /usr/local/bin/node
    425   99.8%            LazyCompile: *next internal/streams/from.js:74:22
    433   32.2%      LazyCompile: ~writev fs.js:717:16
    432   99.8%        LazyCompile: *WriteStream._writev internal/fs/streams.js:416:41
    275   63.7%          LazyCompile: ~doWrite internal/streams/writable.js:367:17
    275  100.0%            LazyCompile: ~clearBuffer internal/streams/writable.js:493:21
    157   36.3%          LazyCompile: *onwrite internal/streams/writable.js:394:17
    157  100.0%            LazyCompile: ~<anonymous> internal/fs/streams.js:396:60
     82    6.1%      LazyCompile: *seq /home/slu/Documents/entropy-dynamics/faa/test3.ts:8:14
     82  100.0%        /usr/local/bin/node
     82  100.0%          LazyCompile: *next internal/streams/from.js:74:22
     82  100.0%            LazyCompile: *next internal/streams/from.js:74:22
     78    5.8%      LazyCompile: *next internal/streams/from.js:74:22
     74   94.9%        LazyCompile: *next internal/streams/from.js:74:22
     74  100.0%          /usr/local/bin/node
      4    5.1%        /usr/local/bin/node
     72    5.4%      LazyCompile: ~wrapSafe internal/modules/cjs/loader.js:964:18
     72  100.0%        LazyCompile: ~Module._compile internal/modules/cjs/loader.js:1017:37
     72  100.0%          LazyCompile: ~Module._extensions..js internal/modules/cjs/loader.js:1072:37
     72  100.0%            LazyCompile: ~Module.load internal/modules/cjs/loader.js:916:33
     19    1.4%      LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
     17   89.5%        LazyCompile: ~nativeModuleRequire internal/bootstrap/loaders.js:303:29
      3   17.6%          Eval: ~<anonymous> repl.js:1:1
      3  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      2   11.8%          Eval: ~<anonymous> stream.js:1:1
      2  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      2   11.8%          Eval: ~<anonymous> internal/modules/esm/loader.js:1:1
      2  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      2   11.8%          Eval: ~<anonymous> internal/bootstrap/node.js:1:1
      1    5.9%          LazyCompile: ~setupPrepareStackTrace internal/bootstrap/node.js:252:32
      1  100.0%            Eval: ~<anonymous> internal/bootstrap/node.js:1:1
      1    5.9%          Eval: ~<anonymous> tty.js:1:1
      1  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      1    5.9%          Eval: ~<anonymous> internal/url.js:1:1
      1  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      1    5.9%          Eval: ~<anonymous> internal/source_map/source_map_cache.js:1:1
      1  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      1    5.9%          Eval: ~<anonymous> internal/process/task_queues.js:1:1
      1  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      1    5.9%          Eval: ~<anonymous> internal/modules/cjs/loader.js:1:1
      1  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      1    5.9%          Eval: ~<anonymous> events.js:1:1
      1  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      1    5.9%          Eval: ~<anonymous> buffer.js:1:1
      1  100.0%            LazyCompile: ~compileForInternalLoader internal/bootstrap/loaders.js:270:27
      2   10.5%        LazyCompile: ~compileForPublicLoader internal/bootstrap/loaders.js:219:25
      2  100.0%          LazyCompile: ~loadNativeModule internal/modules/cjs/helpers.js:31:26
      2  100.0%            LazyCompile: ~Module._load internal/modules/cjs/loader.js:704:24
   1182   21.7%    LazyCompile: *fromString buffer.js:444:20
   1182  100.0%      LazyCompile: *ondata internal/streams/readable.js:717:18
   1182  100.0%        LazyCompile: *emit events.js:264:44
   1182  100.0%          LazyCompile: *EventEmitter.emit domain.js:456:39
   1180   99.8%            LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    736   13.5%    LazyCompile: *next internal/streams/from.js:74:22
    406   55.2%      LazyCompile: *next internal/streams/from.js:74:22
    406  100.0%        /usr/local/bin/node
    329   44.7%      /usr/local/bin/node
    188    3.5%    LazyCompile: *writev fs.js:717:16
    188  100.0%      LazyCompile: *WriteStream._writev internal/fs/streams.js:416:41
    188  100.0%        LazyCompile: *onwrite internal/streams/writable.js:394:17
    188  100.0%          LazyCompile: ~<anonymous> internal/fs/streams.js:396:60
    188  100.0%            LazyCompile: ~wrapper fs.js:636:19
    169    3.1%    LazyCompile: *ondata internal/streams/readable.js:717:18
    169  100.0%      LazyCompile: *emit events.js:264:44
    169  100.0%        LazyCompile: *EventEmitter.emit domain.js:456:39
    168   99.4%          LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    168  100.0%            LazyCompile: *next internal/streams/from.js:74:22
    152    2.8%    LazyCompile: *EventEmitter.emit domain.js:456:39
    152  100.0%      LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    152  100.0%        LazyCompile: *next internal/streams/from.js:74:22
    152  100.0%          /usr/local/bin/node
    138    2.5%    LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    138  100.0%      LazyCompile: *next internal/streams/from.js:74:22
    138  100.0%        /usr/local/bin/node

    589    7.3%  epoll_pwait

    347    4.3%  LazyCompile: *next internal/streams/from.js:74:22
    257   74.1%    /usr/local/bin/node
     90   25.9%    LazyCompile: *next internal/streams/from.js:74:22
     90  100.0%      /usr/local/bin/node

    340    4.2%  LazyCompile: *ondata internal/streams/readable.js:717:18
    319   93.8%    LazyCompile: *emit events.js:264:44
    319  100.0%      LazyCompile: *EventEmitter.emit domain.js:456:39
    319  100.0%        LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    319  100.0%          LazyCompile: *next internal/streams/from.js:74:22
    319  100.0%            /usr/local/bin/node
     21    6.2%    LazyCompile: *EventEmitter.emit domain.js:456:39
     21  100.0%      LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
     21  100.0%        LazyCompile: *next internal/streams/from.js:74:22
     21  100.0%          /usr/local/bin/node

    266    3.3%  /lib/x86_64-linux-gnu/libc-2.27.so
    152   57.1%    /usr/local/bin/node
    136   89.5%      /usr/local/bin/node
    133   97.8%        LazyCompile: *seq /home/slu/Documents/entropy-dynamics/faa/test3.ts:8:14
    133  100.0%          /usr/local/bin/node
    133  100.0%            LazyCompile: *next internal/streams/from.js:74:22
      6    3.9%      LazyCompile: *ondata internal/streams/readable.js:717:18
      6  100.0%        LazyCompile: *emit events.js:264:44
      6  100.0%          LazyCompile: *EventEmitter.emit domain.js:456:39
      6  100.0%            LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
      2    1.3%      LazyCompile: *createPool buffer.js:158:20
      2  100.0%        LazyCompile: *fromString buffer.js:444:20
      2  100.0%          LazyCompile: *ondata internal/streams/readable.js:717:18
      2  100.0%            LazyCompile: *emit events.js:264:44
     89   33.5%    LazyCompile: *fromString buffer.js:444:20
     89  100.0%      LazyCompile: *ondata internal/streams/readable.js:717:18
     89  100.0%        LazyCompile: *emit events.js:264:44
     89  100.0%          LazyCompile: *EventEmitter.emit domain.js:456:39
     89  100.0%            LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
      8    3.0%    LazyCompile: *writev fs.js:717:16
      8  100.0%      LazyCompile: *WriteStream._writev internal/fs/streams.js:416:41
      8  100.0%        LazyCompile: *onwrite internal/streams/writable.js:394:17
      8  100.0%          LazyCompile: ~<anonymous> internal/fs/streams.js:396:60
      8  100.0%            LazyCompile: ~wrapper fs.js:636:19

    249    3.1%  LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    210   84.3%    LazyCompile: *next internal/streams/from.js:74:22
    210  100.0%      /usr/local/bin/node
     39   15.7%    /usr/local/bin/node

    195    2.4%  LazyCompile: *fromString buffer.js:444:20
    173   88.7%    LazyCompile: *ondata internal/streams/readable.js:717:18
    173  100.0%      LazyCompile: *emit events.js:264:44
    173  100.0%        LazyCompile: *EventEmitter.emit domain.js:456:39
    173  100.0%          LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
    173  100.0%            LazyCompile: *next internal/streams/from.js:74:22
     22   11.3%    LazyCompile: *emit events.js:264:44
     22  100.0%      LazyCompile: *EventEmitter.emit domain.js:456:39
     22  100.0%        LazyCompile: *readableAddChunk internal/streams/readable.js:231:26
     22  100.0%          LazyCompile: *next internal/streams/from.js:74:22
     22  100.0%            /usr/local/bin/node

    123    1.5%  LazyCompile: *seq /home/slu/Documents/entropy-dynamics/faa/test3.ts:8:14
    104   84.6%    /usr/local/bin/node
    104  100.0%      LazyCompile: *next internal/streams/from.js:74:22
    104  100.0%        LazyCompile: *next internal/streams/from.js:74:22
    104  100.0%          /usr/local/bin/node
     19   15.4%    LazyCompile: *next internal/streams/from.js:74:22
     19  100.0%      LazyCompile: *next internal/streams/from.js:74:22
     19  100.0%        /usr/local/bin/node

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)