将带参数的python函数应用于Apache Beam管道

问题描述

我正在使用apache beam对下面显示的数据样本进行python数据预处理

userid   itemid  rating timestamp
1          2       3.2    10:59
1          3       3.5    11:59
2          3       4.2    10:10
2          4       1.5    10:59 

我的代码如下所示

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

beam.__version__



# delete timestamp column
def del_col(data,col_name: str):
    del data[col_name]
    return data
  
# check for null values
def check_null(data,col_name1:str,col_name2:str,col_name3:str):
    return len(data[col_name1]) > 0 and len(data[col_name2]) > 0 and len(data[col_name3]) > 0

# converting to comma deliminated format
def format_data(data):
  data = ','.join([data['userid'],data['itemid'],data['ratings']])
  return data

def print_row(data):
  print(data)


if __name__ == '__main__':
  options = PipelineOptions()
  input_file = 'data.csv' 
  with beam.Pipeline(options=options) as pipeline:
    (pipeline | 'ReadData' >> beam.io.ReadFromText(input_file,skip_header_lines=0) # read data with beam
        | 'SplitData' >> beam.Map(lambda x: x.split(','))
        | 'FormatToDict' >> beam.Map(lambda x: {"userid": x[0],"itemid": x[1],"ratings": x[2],"timestamp": x[3]}) # format to dict and name columns
        | 'DeleteNullData' >> beam.Filter(check_null('userid','itemid','ratings')) # pick non null columns
        | 'DeleteUnwantedData' >> beam.Map(del_col(col_name='timestamp')) # delete irrelevant columns
        | 'FormatData' >> beam.Map(format_data)

这里的问题是我需要使这些功能模块化,而且我不知道如何开发这些功能。

def check_null(data,data['ratings']])
  return data

此外,我如何将函数与参数一起传递给apache光束管道,我尝试了一下,但由于无法获取数据参数而无法正常工作

| 'SplitData' >> beam.Map(lambda x: x.split(','ratings')) # pick non null columns
        | 'DeleteUnwantedData' >> beam.Map(del_col(col_name='timestamp')) # delete irrelevant columns

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...