问题描述
我正在使用apache beam对下面显示的数据样本进行python数据预处理
userid itemid rating timestamp
1 2 3.2 10:59
1 3 3.5 11:59
2 3 4.2 10:10
2 4 1.5 10:59
我的代码如下所示
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
beam.__version__
# delete timestamp column
def del_col(data,col_name: str):
del data[col_name]
return data
# check for null values
def check_null(data,col_name1:str,col_name2:str,col_name3:str):
return len(data[col_name1]) > 0 and len(data[col_name2]) > 0 and len(data[col_name3]) > 0
# converting to comma deliminated format
def format_data(data):
data = ','.join([data['userid'],data['itemid'],data['ratings']])
return data
def print_row(data):
print(data)
if __name__ == '__main__':
options = PipelineOptions()
input_file = 'data.csv'
with beam.Pipeline(options=options) as pipeline:
(pipeline | 'ReadData' >> beam.io.ReadFromText(input_file,skip_header_lines=0) # read data with beam
| 'SplitData' >> beam.Map(lambda x: x.split(','))
| 'FormatToDict' >> beam.Map(lambda x: {"userid": x[0],"itemid": x[1],"ratings": x[2],"timestamp": x[3]}) # format to dict and name columns
| 'DeleteNullData' >> beam.Filter(check_null('userid','itemid','ratings')) # pick non null columns
| 'DeleteUnwantedData' >> beam.Map(del_col(col_name='timestamp')) # delete irrelevant columns
| 'FormatData' >> beam.Map(format_data)
这里的问题是我需要使这些功能模块化,而且我不知道如何开发这些功能。
def check_null(data,data['ratings']])
return data
此外,我如何将函数与参数一起传递给apache光束管道,我尝试了一下,但由于无法获取数据参数而无法正常工作
| 'SplitData' >> beam.Map(lambda x: x.split(','ratings')) # pick non null columns
| 'DeleteUnwantedData' >> beam.Map(del_col(col_name='timestamp')) # delete irrelevant columns
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)