使用Apache Beam中的PAssert containsInAnyOrder比较对象

问题描述

虽然编写单元使用PAssert对我的光束管道进行测试,但管道输出的对象很好,但是在与以下声明错误进行比较的过程中测试失败:

java.lang.AssertionError: Decode pubsub message/ParMultiDo(DecodePubSubMessage).output: 
Expected: iterable with items [<PubsubMessage{message=[123,34,104...],attributes={messageId=2be485e4-3e53-4468-a482-a49842b87ed5,dataPipelineId=bc957aa3-17e7-46d6-bc73-0924fa5674fa,region=us-west1,ingestionTimestamp=2020-02-02T12:34:56.789Z},messageId=null}>] in any order
     but: not matched: <PubsubMessage{message=[123,messageId=null}>

我还尝试将ExpectedOutputPubSubMessage封装在列表中(显然原始输出在Array中)无济于事。文档中所有给定的PAssert示例都进行了简单的字符串或键值比较。

@RunWith(powermockrunner.class)
public class DataDecodePipelineTest implements Serializable {

  @Rule
  public TestPipeline p = TestPipeline.create();

  @Test
  public void testPipeline(){
      PubsubMessage inputPubSubMessage =
              new PubsubMessage(
                      TEST_ENCODED_PAYLOAD.getBytes(),new HashMap<String,String>() {
                          {
                              put(MESSAGE_ID_NAME,TEST_MESSAGE_ID);
                              put(DATA_PIPELINE_ID_NAME,TEST_DATA_PIPELINE_ID);
                              put(INGESTION_TIMESTAMP_NAME,TEST_INGESTION_TIMESTAMP);
                              put(REGION_NAME,TEST_REGION);
                          }
                      });

      PubsubMessage expectedOutputPubSubMessage =
              new PubsubMessage(
                      TEST_DECODED_PAYLOAD.getBytes(),TEST_REGION);
                          }
                      });

      PCollection<PubsubMessage> input =
              p.apply(Create.of(Collections.singletonList(inputPubSubMessage)));

      PCollection<PubsubMessage> output =
              input.apply("Decode pubsub message",ParDo.of(new DataDecodePipeline.DecodePubSubMessage()));

      PAssert.that(output).containsInAnyOrder(expectedOutputPubSubMessage);
      
      p.run().waitUntilFinish();
  }
}

显然,几年前有人遇到了完全相同的问题,但仍未解决Test pipeline comparing objects using PAssert containsInAnyOrder()

解决方法

问题在于您正在比较不同的对象

您的管道的返回是一个 PCollection,您正在将它与 PubsubMessage 进行比较

你必须从 expectedOutputPubSubMessage 创建一个 PCollection

试试这个:

      PAssert.that(output).containsInAnyOrder(Create.of(Collections.singletonList(expectedOutputPubSubMessage));

示例:https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/test/java/com/google/cloud/teleport/templates/PubsubToPubsubTest.java