使用 neo4j python 驱动程序的简单示例?

问题描述

是否有使用 neo4j python 驱动程序的简单示例? 如何将密码查询传递给驱动程序以运行并返回游标?

如果我正在阅读 for example this,那么演示似乎有一个类包装器,我将一个私有成员 func 传递给 session.write,

session.write_transaction(self._create_and_return_greeting,...

然后使用交易作为第一个参数调用它...

def _create_and_return_greeting(tx,message):

依次运行密码

result = tx.run("CREATE (a:Greeting) "

这似乎比实际需要复杂 10 倍。

我只是尝试了一个更简单的:

def raw_query(query,**kwargs):
    neodriver = neo_connect()  # cached dbconn
    with neodriver.session() as session:
        try:
            result = session.run(query,**kwargs)
            return result.data()

但这会导致查询出现套接错误,可能是因为会话超出范围?

[dfcx/__init__] ERROR | Underlying socket connection gone (_ssl.c:2396)

[dfcx/__init__] ERROR | Failed to write data to connection IPv4Address(('neo4j-core-8afc8558-3.production-orch-0042.neo4j.io',7687)) (IPv4Address(('34.82.120.138',7687)))

此外,我无法返回游标/迭代器,只能返回 data() 当会话超出范围时,查询结果似乎随之消失。

如果我手动打开和关闭会话,我会遇到同样的问题吗?

Python 一定是这个 DB 使用的最流行的语言,每个人都使用不同的驱动程序吗? Py2neo 看起来很可爱,但是对于大多数 cypher 语言功能完全缺乏 ORM 包装器功能,所以无论如何你必须下降到原始 cypher。而且我不确定它是否以相同的方式支持 **kwargs 参数插值。

我想大幅加注应该有助于解决一些问题:D

尝试获得有效数据库包装器的略长版本:

def neo_connect() -> Union[neo4j.BoltDriver,neo4j.Neo4jDriver]:

    global raw_driver
    if raw_driver:
        # print('reuse driver')
        return raw_driver

    neoconfig = NEOCONfig
    raw_driver = neo4j.GraphDatabase.driver(
        neoconfig['url'],auth=(
            neoconfig['user'],neoconfig['pass']))
    if raw_driver is None:
        raise BaseException("cannot connect to neo4j")
    else:
        return raw_driver


def raw_query(query,**kwargs):
    # just get data,no cursor
    neodriver = neo_connect()
    session = neodriver.session()
    # logging.info('neoquery %s',query)
    # with neodriver.session() as session:
    try:
        result = session.run(query,**kwargs)
        data = result.data()
        return data

    except neo4j.exceptions.CypherSyntaxError as err:
        logging.error('neo error %s',err)
        logging.error('Failed query: %s',query)
        raise err
    # finally:
    #     logging.info('close session')
    #     session.close()

更新:有人给我指出了这个例子,这是使用 tx 包装器的另一种方式。

https://github.com/neo4j-graph-examples/northwind/blob/main/code/python/example.py#L16-L21

解决方法

def raw_query(query,**kwargs):
    neodriver = neo_connect()  # cached dbconn
    with neodriver.session() as session:
        try:
            result = session.run(query,**kwargs)
            return result.data()

这完全没问题,并且按我的预期工作。

您看到的错误表明存在连接问题。因此,服务器和驱动程序之间肯定发生了一些不受其影响的事情。

另外,请注意,所有这些运行查询的方式是不同的:

  • with driver.session():
        result = session.run("<SOME CYPHER>")
    
  • def work(tx):
        result = tx.run("<SOME CYPHER>") 
    
    with driver.session():
        session.write_transaction(work)
    

后一个可能长 3 行,并且驱动程序的团队收集了一些关于此的反馈。然而,这里还有更多的事情需要考虑。首先,更改 API 表面是需要仔细计划的事情,不能在补丁发布中完成。其次,要克服技术障碍。无论如何,这是语义:

  • 自动提交事务。仅将该查询作为一个工作单元运行。 如果您在同一会话中运行新的自动提交事务,则先前的结果将为您缓冲所有可用记录(取决于查询,这将消耗大量内存)。这可以通过调用 result.consume() 来避免。但是,如果会话超出范围,结果将被自动消耗。这意味着您无法从中提取更多记录。最后,将引发任何错误并需要在应用程序代码中进行处理。
  • 托管交易。在该函数中运行您想要的任何工作单元。事务在函数周围隐式启动和提交(除非您显式回滚)。 如果事务结束(函数结束或回滚),结果将被消耗并变为无效。在此之前,您必须提取所需的所有记录。
    这是使用驱动程序的推荐方式,因为它不会引发所有错误,而是在内部处理一些错误(在适当的情况下)并重试 work 函数(例如,如果服务器只是暂时不可用)。由于该函数可能会被多次执行,因此您必须确保它是幂等的。

结束语:
请记住,stackoverlfow 是在尽最大努力的基础上进行监控的,可以被视为草率的评论可能会妨碍您获得对您的问题的有用答案