用于Foundry转换的Python单元测试?

问题描述

我想对转换为Foundry的转换进行测试,传递测试输入并检查输出是否为预期的输出。是否可以使用伪数据集(存储库中的.csv文件调用转换?还是应该在转换内部创建要由测试调用函数(以代码创建的数据)?

解决方法

如果您在Code Repositories-> Python Transforms-> Python Unit Tests下查看平台文档,将会发现很多有用的资源。

您正在寻找有关编写和运行测试的部分。


//开始文档

编写测试

完整文档可在https://docs.pytest.org

中找到

Pytest在以test_开头的任何Python文件中查找测试。 建议将所有测试放入项目src目录下的测试包中。 测试只是简单的Python函数,也使用test_前缀命名,并且断言是使用Python的assert语句进行的。 PyTest还将运行使用Python的内置unittest模块编写的测试。 例如,在transforms-python / src / test / test_increment.py中,一个简单的测试将如下所示:

def increment(num):
     return num + 1

def test_increment():
     assert increment(3) == 5

运行此测试将导致检查失败,并显示以下消息:

============================= test session starts =============================
collected 1 item

test_increment.py F                                                       [100%]

================================== FAILURES ===================================
_______________________________ test_increment ________________________________

    def test_increment():
>       assert increment(3) == 5
E       assert 4 == 5
E        +  where 4 = increment(3)

test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================

使用PySpark测试

PyTest固定装置是一项强大的功能,只需添加相同名称的参数即可将值注入测试功能。此功能用于提供spark_session固定装置,供您在测试功能中使用。例如:

def test_dataframe(spark_session):
    df = spark_session.createDataFrame([['a',1],['b',2]],['letter','number'])
    assert df.schema.names == ['letter','number']

//结束文档


如果您不想在代码中指定架构,也可以按照How To-> Read file in Python repository

下的文档中的说明,读入存储库中的文件。

//开始文档

在Python存储库中读取文件

您可以将其他文件从存储库读取到转换上下文中。这可能在设置参数供转换代码参考时很有用。

首先,请在您的python存储库中编辑setup.py:

setup(
    name=os.environ['PKG_NAME'],# ...
    package_data={
        '': ['*.yaml','*.csv']
    }
)

这告诉python将yaml和csv文件捆绑到包中。然后在您的python转换旁边放置一个配置文件(例如config.yaml,但也可以是csv或txt)(例如read_yml.py参见下文):

- name: tbl1
  primaryKey:
  - col1
  - col2
  update:
  - column: col3
    with: 'XXX'

您可以使用以下代码在转换read_yml.py中阅读它:

from transforms.api import transform_df,Input,Output
from pkg_resources import resource_stream
import yaml
import json

@transform_df(
    Output("/Demo/read_yml")
)
def my_compute_function(ctx):
    stream = resource_stream(__name__,"config.yaml")
    docs = yaml.load(stream)
    return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])

所以您的项目结构将是:

  • some_folder
    • config.yaml
    • read_yml.py

这将在您的数据集中输出一行,其中“结果”一列的内容为:

[{"primaryKey": ["col1","col2"],"update": [{"column": "col3","with": "XXX"}],"name": "tbl1"}]

//结束文档