收到n个元素后如何退出Akka流?

问题描述

我是Akka的新手,我只是想抓住它。

作为一个实验,我想从Kinesis流中读取并收集n条消息并停止。

我发现唯一会停止读取记录的是Sink.head()。但这只会返回一条记录,我想获得更多。

我不太清楚在收到n条消息后如何停止从流中读取。

这是我到目前为止尝试过的代码

  @Test
  public void testReadingFromKinesisNRecords() throws ExecutionException,InterruptedException {
    final ActorSystem system = ActorSystem.create("foo");
    final Materializer materializer = ActorMaterializer.create(system);

    ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();

    final KinesisAsyncclient kinesisClient = KinesisAsyncclient.builder()
        .credentialsProvider(profileCredentialsProvider)
        .region(Region.US_WEST_2)
            .httpClient(AkkaHttpClient.builder()
                .withActorSystem(system).build())
            .build();

    system.registerOnTermination(kinesisClient::close);

    String streamName = "akka-test-stream";
    String shardId = "shardId-000000000000";

    int numberOfRecordsToRead = 3;

    final ShardSettings settings = ShardSettings.create(streamName,shardId)
            .withRefreshInterval(Duration.ofSeconds(1))
            .withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
            .withShardIterator(ShardIterators.latest());

    final Source<Record,NotUsed> sourceKinesisBasic = KinesisSource.basic(settings,kinesisClient);

    Flow<Record,String,NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromrecord(record));
    Flow<String,NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
//    Flow<String,List<String>,NotUsed> flowGroupedWithinMinute =
//        Flow.of(String.class).groupedWithin(
//            numberOfRecordsToRead,// group size
//            Duration.ofSeconds(60) // group time
//        );

    Source<String,NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter);
//        .via(flowGroupedWithinMinute); // nope

    // sink to list of strings
//    Sink<String,CompletionStage<List<String>>> sinkToList = Sink.seq();
    Sink<String,CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
//    Sink<String,CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message

    CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
        .runWith(sink10,materializer);
    CompletableFuture<List<String>> CompletableFuture = streamCompletion.toCompletableFuture();
    CompletableFuture.join(); // never stops running...
    List<String> result = CompletableFuture.get();
    int foo = 1;
  }

  private String extractDataFromrecord(Record record) {
    String encType = record.encryptionTypeAsstring();
    Instant arrivalTimestamp = record.approximateArrivalTimestamp();
    String data = record.data().asstring(StandardCharsets.UTF_8);
    return data;
  }

  private String debugPrint(String s) {
    System.out.println(s);
    return s;
  }

感谢您提供任何线索

解决方法

我发现答案是在流量级别使用takeN

...
Flow<String,String,NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);

Source<String,NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .via(flowMapRecordToString)
    .via(flowPrinter)
    .via(flowTakeN);
...
,

只需添加您找到的答案,也可以不用via更直接地表达事物:

Source<String,NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .map(record -> extractDataFromRecord(record))
    .map(s -> debugPrint(s))
    .take(10)