在python中调用CosmosClienturl,key,proxy_config = d时出现错误?

问题描述

使用客户端配置和执行对Azure Cosmos DB服务的请求

`
d=ProxyConfiguration()
d.Host='string'
d.Port=int
client = CosmosClient(url,key,proxy_config = d) `

我遇到错误

azure.core.exceptions.ServiceRequestError:<urllib3.connection.HTTPSConnection object at 0x0000020EC3BCCD60>: Failed to establish a new connection: [Errno 11001] getaddrinfo Failed

将值传递给proxy_config参数的正确方法是什么?

CosmoClient

解决方法

您可以下载官方SDK进行测试,我认为您应该使用CosmosClientConnection而不是CosmosClient

proxy_tests.py

#The MIT License (MIT)
#Copyright (c) 2014 Microsoft Corporation

#Permission is hereby granted,free of charge,to any person obtaining a copy
#of this software and associated documentation files (the "Software"),to deal
#in the Software without restriction,including without limitation the rights
#to use,copy,modify,merge,publish,distribute,sublicense,and/or sell
#copies of the Software,and to permit persons to whom the Software is
#furnished to do so,subject to the following conditions:

#The above copyright notice and this permission notice shall be included in all
#copies or substantial portions of the Software.

#THE SOFTWARE IS PROVIDED "AS IS",WITHOUT WARRANTY OF ANY KIND,EXPRESS OR
#IMPLIED,INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,DAMAGES OR OTHER
#LIABILITY,WHETHER IN AN ACTION OF CONTRACT,TORT OR OTHERWISE,ARISING FROM,#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#SOFTWARE.

import unittest
import pytest
import platform
import azure.cosmos.documents as documents
import azure.cosmos._cosmos_client_connection as cosmos_client_connection
import test_config
import six
if six.PY2:
    from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
else:
    from http.server import BaseHTTPRequestHandler,HTTPServer
from threading import Thread
from azure.core.exceptions import ServiceRequestError

pytestmark = pytest.mark.cosmosEmulator

@pytest.mark.usefixtures("teardown")
class CustomRequestHandler(BaseHTTPRequestHandler):
    database_name = None
    def _set_headers(self):
        self.send_response(200)
        self.send_header('Content-type','application/json')
        self.end_headers()

    def _send_payload(self):
        self._set_headers()
        payload = "{\"id\":\"" + self.database_name + "\",\"_self\":\"self_link\"}"
        if six.PY2:
            self.wfile.write(payload)
        else:
            self.wfile.write(bytes(payload,"utf-8"))

    def do_GET(self):
        self._send_payload()

    def do_POST(self):
        self._send_payload()

class Server(Thread):
    def __init__(self,database_name,PORT):
        Thread.__init__(self)
        server_address = ('',PORT)
        CustomRequestHandler.database_name = database_name
        self.httpd = HTTPServer(server_address,CustomRequestHandler)

    def run(self):
        self.httpd.serve_forever()

    def shutdown(self):
        self.httpd.shutdown()

class ProxyTests(unittest.TestCase):
    """Proxy Tests.
    """
    host = 'http://localhost:8081'
    masterKey = test_config._test_config.masterKey
    testDbName = 'sample database'
    serverPort = 8089

    @classmethod
    def setUpClass(cls):
        global server
        global connection_policy
        server = Server(cls.testDbName,cls.serverPort)
        server.start()
        connection_policy = documents.ConnectionPolicy() 
        connection_policy.ProxyConfiguration = documents.ProxyConfiguration() 
        connection_policy.ProxyConfiguration.Host = 'http://127.0.0.1' 

    @classmethod
    def tearDownClass(cls):
        server.shutdown()

    def test_success_with_correct_proxy(self):
        if platform.system() == 'Darwin':
            pytest.skip("TODO: Connection error raised on OSX")
        connection_policy.ProxyConfiguration.Port = self.serverPort 
        client = cosmos_client_connection.CosmosClientConnection(self.host,{'masterKey': self.masterKey},connection_policy)
        created_db = client.CreateDatabase({ 'id': self.testDbName })
        self.assertEqual(created_db['id'],self.testDbName,msg="Database id is incorrect")

    def test_failure_with_wrong_proxy(self):
        connection_policy.ProxyConfiguration.Port = self.serverPort + 1
        try:
            # client does a getDatabaseAccount on initialization,which fails
            client = cosmos_client_connection.CosmosClientConnection(self.host,connection_policy)
            self.fail("Client instantiation is not expected")
        except Exception as e:
            self.assertTrue(type(e) is ServiceRequestError,msg="Error is not a ServiceRequestError")

if __name__ == "__main__":
    #import sys;sys.argv = ['','Test.testName']
    unittest.main()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...