通过 HTTP WEB API 在 HDFS 上放置数据

问题描述

我正在尝试通过 HDFS Web API 在 HDFS 上实现 PUT 请求。

所以我查阅了有关如何做到这一点的文档:https://hadoop.apache.org/docs/r1.0.4/webhdfs.html#CREATE

首先在没有重定向的情况下执行 PUT,获得 307,捕获新 URL,然后使用 DATA 在该 URL 上进行 PUT。

当我第一次放置时,我得到了 307,但 URL 与第一个相同。所以我很确定我是否已经在“好的”数据节点上。

无论如何,我得到了这个 URL 并尝试向它添加数据,但是我得到一个错误,据我所知,这是一个连接错误。主机正在切断连接。

class HttpFS:
    def __init__(self,url=settings.HTTPFS_URL):
        self.httpfs = url
        self.auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)

    def put(self,local_file,hdfs_dir):
        url = "{}{}".format(self.httpfs,hdfs_dir)
        params = {"op": "CREATE","overwrite": True}
        print(url)
        r = requests.put(url,auth=self.auth,params=params,stream=True,verify=settings.CA_ROOT_PATH,allow_redirects=False)

        r = requests.put(r.headers['Location'],data=open(local_file,'rb'),verify=settings.CA_ROOT_PATH)

这是给出的错误

    r = requests.put(r.headers['Location'],verify=settings.CA_ROOT_PATH)
  File "/home/cdsw/.local/lib/python3.6/site-packages/requests/api.py",line 134,in put
    return request('put',url,data=data,**kwargs)
  File "/home/cdsw/.local/lib/python3.6/site-packages/requests/api.py",line 61,in request
    return session.request(method=method,url=url,**kwargs)
  File "/home/cdsw/.local/lib/python3.6/site-packages/requests/sessions.py",line 530,in request
    resp = self.send(prep,**send_kwargs)
  File "/home/cdsw/.local/lib/python3.6/site-packages/requests/sessions.py",line 643,in send
    r = adapter.send(request,**kwargs)
  File "/home/cdsw/.local/lib/python3.6/site-packages/requests/adapters.py",line 498,in send
    raise ConnectionError(err,request=request)
requests.exceptions.ConnectionError: ('Connection aborted.',brokenPipeError(32,'broken pipe'))

编辑 1:

我也尝试过 https://github.com/pywebhdfs/pywebhdfs 存储库。因为它应该完全符合我的要求。但我仍然有这个断管错误

from requests_kerberos import OPTIONAL,HTTPKerberosAuth
from pywebhdfs.webhdfs import PyWebHdfsClient
from utils import settings
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)

url = f"https://{settings.HDFS_HTTPFS_HOST}:{settings.HDFS_HTTPFS_PORT}/webhdfs/v1/"

hdfs_client = PyWebHdfsClient(base_uri_pattern=url,request_extra_opts={'auth':auth,'verify': settings.CA_ROOT_PATH})
with open(data_dir + file_name,'rb') as file_data:
            hdfs_client.create_file(hdfs_path + file_name,file_data=file_data,overwrite=True)

同样的错误

    hdfs_client.create_file(hdfs_path + file_name,overwrite=True)
  File "/home/cdsw/.local/lib/python3.6/site-packages/pywebhdfs/webhdfs.py",line 115,in create_file
    **self.request_extra_opts)
  File "/home/cdsw/.local/lib/python3.6/site-packages/requests/api.py",'broken pipe'))

编辑 2:

我发现我一次发送了太多数据。所以现在我在 HDFS 上创建一个文件,然后我将数据块附加到它。但它很慢......我仍然可以随机得到与上面相同的错误。块越大,我就越有机会获得 Connection aborted。我的文件更多在 200Mb 的范围内,因此与 Hadoop 二进制文件“hdfs dfs -put”相比需要很长时间

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)