我试图在python脚本中读取(打开)和写入hdfs中的文件.但有错误.谁能告诉我这里有什么问题.
代码(完整):sample.py
#!/usr/bin/python
from subprocess import Popen, PIPE
print "Before Loop"
cat = Popen(["hadoop", "fs", "-cat", "./sample.txt"],
stdout=PIPE)
print "After Loop 1"
put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
stdin=PIPE)
print "After Loop 2"
for line in cat.stdout:
line += "Blah"
print line
print "Inside Loop"
put.stdin.write(line)
cat.stdout.close()
cat.wait()
put.stdin.close()
put.wait()
当我执行:
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.5.1.jar -file ./sample.py -mapper './sample.py' -input sample.txt -output fileRead
它正确执行我找不到应该在hdfs modifiedfile中创建的文件
当我执行时:
hadoop fs -getmerge ./fileRead/ file.txt
在file.txt里面,我得到了:
Before Loop
Before Loop
After Loop 1
After Loop 1
After Loop 2
After Loop 2
有人可以告诉我这里我做错了什么?我不认为它从sample.txt中读取
解决方法:
尝试更改put子进程以通过更改它来自行获取cat stdout
put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
stdin=PIPE)
进入这个
put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
stdin=cat.stdout)
完整脚本:
#!/usr/bin/python
from subprocess import Popen, PIPE
print "Before Loop"
cat = Popen(["hadoop", "fs", "-cat", "./sample.txt"],
stdout=PIPE)
print "After Loop 1"
put = Popen(["hadoop", "fs", "-put", "-", "./modifiedfile.txt"],
stdin=cat.stdout)
put.communicate()