问题描述
我正在尝试将 mrJobs 与 csv 文件一起使用。问题是 csv 文件的输入跨越多行。
搜索mrJob 文档,我觉得我需要编写一个自定义协议来处理输入。
我尝试在下面编写自己的协议,multiLineCsvInputProtocol
,但我已经收到错误:TypeError: a bytes-like object is required,not 'str'
我不会说谎,我认为我在这里过头了。
基本上,多行 csv 文件中的每一行新数据都以日期字符串开头。我想逐行读取输入,在逗号上吐出每一行,将值存储在列表中,每当新行以日期字符串开头时,我想将整个列表 yield
到第一个映射器。
(或者找到其他更好的方法来读取多行 csv 输入)
import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol
class multiLineCsvInputProtocol(object):
def read(self,line):
key,val = enumerate(line.split(',',1))
return key,val
class soMetask(MRJob):
INPUT_PROTOCOL = multiLineCsvInputProtocol
def mapper1(self,_,row):
yield (row,1 )
if __name__ == '__main__':
MRFindReciprocal.run()
解决方法
根据mrjob
的{{3}},read函数的line参数为bytestring类型强>,您很可能会收到该错误,因为您被 ','
split-ting str:
编写自定义协议
协议是一个对象,具有 read(self,line) 和 write(self,核心价值)。 read() 方法接受一个字节串并返回一个 解码对象的 2 元组,write() 获取键和值,然后 返回要传递回 Hadoop Streaming 或作为输出的字节。
可能的解决方案:
- 您可以尝试按
b','
进行拆分,这是一个字节串 - 您可以在拆分之前解码行,如下所示:
line.decode().split(',',1)
(指定编码可能是个好主意)