将信息与单次读取并行复制到几个 DB

问题描述

目前我有一个代码 - 对“源数据库”进行任何更新并将其复制到“目标数据库

我需要使用来自更新的“源数据库单次读取调用并并行复制到几个“目标数据库

def loop(partial_src_db=None,partial_dst_db=None,collection_name=None,**options):

src_db = partial_src_db()

stream = None
while True:
try:

    collection_in = source_db.get_collection(collection_name)
    collection_out = destination_db.get_collection(collection_name)

    with collection_in.watch(full_document='updateLookup') as stream:

        for change in stream:
            oper_type = change['operationType']

            logger.debug(f"{oper_type} received: {collection_name}")

            if oper_type in ignored_ops:
                logger.debug(f"{oper_type} operation ignored")
                continue

            if oper_type == "insert":
                callback = db_insert_callback

            elif oper_type == "replace":
                # db_update_callback(change)
                callback = db_update_callback

            elif oper_type == "delete":
                callback = db_remove_callback

            callback(change,collection_out=collection_out)


except Exception as ex:
    logger.exception(ex)

finally:
    if stream:
        stream.close()

插入示例

def db_insert_callback(insert_change,collection_out):
doc = insert_change['fullDocument']
# logger.info(insert_change)
try:
    collection_out.insert(doc)
except Exception as ex:
    logger.exception(ex)

主要的样子

def main(**options):
partial_src_db = partial(db_connect,db_host=settings.SRC_RB_HOST,db_name=settings.SRC_RB_NAME,db_user=settings.SRC_RB_USER,db_pass=settings.SRC_RB_PASS,replica=settings.SRC_RB_REPLICA)
partial_dst_db = partial(db_connect,db_host=settings.DEST_RB_HOST,db_name=settings.DEST_RB_NAME)
src_db = partial_src_db()
src_db = partial_src_db()
logger.debug("connected to src db")
db_coll_names = src_db.list_collection_names()
cli_collection_names = options.get('collection') or db_coll_names
coll_names = list(set(db_coll_names) & set(cli_collection_names))
for coll_name in coll_names:                                                      
    partial_loop = partial(loop,partial_src_db=partial_dst_db,partial_dst_db=environment_list,collection_name=coll_name,**options)
    t = Thread(target=partial_loop,daemon=False,name=coll_name)
    t.start()

希望有人能成功帮助我,谢谢

解决方法

如果 collections.out 是一个 list 实例,那么它在添加操作下是线程安全的。但是,不清楚您的 db_insert_callback 方法是否真正代表您的回调,使用多线程或多处理是否会获得任何好处。这是因为如果您在提交的作业中所做的大部分事情只是向列表中添加一些内容,根据定义必须是序列化操作,那么并发/并行化就不会太多。

如果回调函数比您展示的更多,或者为了您的教育目的,这就是您对回调进行多线程处理的方式:

from concurrent.futures import ThreadPoolExecutor

# example callback
def db_insert_callback(insert_change,collection_out):
    doc = insert_change['fullDocument']
    # logger.info(insert_change)
    try:
        collection_out.insert(doc)
    except Exception as ex:
        logger.exception(ex)


def loop(partial_src_db=None,partial_dst_db=None,collection_name=None,**options):

    src_db = partial_src_db()
    
    N_THREADS = 64 # depends on number of changes you expect in stream
    with ThreadPoolExecutor(workers=N_THREADS) as executor:
        while True:
            try:
                collection_in = source_db.get_collection(collection_name)
                collection_out = destination_db.get_collection(collection_name)
            
                with collection_in.watch(full_document='updateLookup') as stream:
            
                    futures = []
                    for change in stream:
                        oper_type = change['operationType']
            
                        logger.debug(f"{oper_type} received: {collection_name}")
            
                        if oper_type in ignored_ops:
                            logger.debug(f"{oper_type} operation ignored")
                            continue
            
                        if oper_type == "insert":
                            callback = db_insert_callback
            
                        elif oper_type == "replace":
                            # db_update_callback(change)
                            callback = db_update_callback
            
                        elif oper_type == "delete":
                            callback = db_remove_callback
            
                        futures.append(executor.submit(callback,change,collection_out=collection_out))
                        for future in futures:
                             # Wait for each job to complete.
                             # This may throw an exception if the callback threw an exception:
                            future.result()
            
            
            except Exception as ex:
                logger.exception(ex)
            
            finally:
                if stream:
                    stream.close()

注意:我不知道 stream 是什么类,但返回此值的 with 语句表明此上下文处理程序可能正在关闭流当 with 块终止时自动。如果是这种情况,您可以删除分配 stream = Nonefinally 块。

更新

我已经更新了源代码以将代码包含在您提供的函数 loop 中(您确实需要正确地缩进)。但是:

您定义了一个参数 partial_src_db,其默认值是 None,但您调用 partial_src_db() 而没有测试它是否为 None。然后将返回值分配给 src_db 并且不再引用该变量。如果永远不会向 loop 传递 Nonepartial_src_db 值,则不要为此参数提供默认值 None

,

同时打开两个数据库,将源数据库的数据存储在一个变量中,然后将变量的值插入到目标数据库中

,

这里的 source.txt 包含一些数据,dest.txt 是写入数据的文件。 对于文本文件,它是通过这种方式完成的:

f=open('source.txt','r')
g=open('dest.txt','w')

r=f.read()
r=r.split()
for x in r:
    g.write(x+' ')

f.close()
g.close()

因此,对于可能使用线程的数据库,每个线程连接到数据库并在这些线程中进行传输。