如何使用 Python 读取大型 Firestore 集合而不会遇到 503 超时错误

问题描述

Cloud Firestore一个认的 60 秒限制来读取一个集合。当集合中有太多文档时,连接会超时并抛出 503 错误

如何处理?我开发了一个解决方案——

解决方法

对此的解决方法是使用分页查询,将数据分成批或“页”,递归处理。但是,Firestore documentation 并没有为此提供完整的解决方案。所以在阅读 this post 之后,我决定编写一个“准备使用”的函数,如下所示。它将 Firestore 集合提取到一个 CSV 文件中。您可以修改 CSV 写入器部分以适合您使用 Firestore 数据的目的。

我还分享了在 this GitHub repo 中使用它的演示。如果你想进一步开发它,欢迎你 fork。

# Demo of extracting Firestore data into CSV file using a paginated algorithm (can prevent 503 timeout error for large dataset)

import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
from datetime import datetime
import csv

def firestore_to_csv_paginated(db,coll_to_read,fields_to_read,csv_filename='extract.csv',max_docs_to_read=-1,write_headers=True):
    """ Extract Firestore collection data and save in CSV file
    Args:
        db: Firestore database object
        coll_to_read: name of collection to read from in Unicode format (like u'CollectionName')
        fields_to_read: fields to read (like ['FIELD1','FIELD2']). Will be used as CSV headers if write_headers=True
        csv_filename: CSV filename to save
        max_docs_to_read: max # of documents to read. Default to -1 to read all
        write_headers: also write headers into CSV file
    """

    # Check input parameters
    if (str(type(db)) != "<class 'google.cloud.firestore_v1.client.Client'>") or (type(coll_to_read) is not str) or not (isinstance(fields_to_read,list) or isinstance(fields_to_read,tuple) or isinstance(fields_to_read,set)):
        print(f'??? {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} firestore_to_csv() - Unexpected parameters: \n\tdb = {db} \n\tcoll_to_read = {coll_to_read} \n\tfields_to_read = {fields_to_read}')
        return

    # Read Firestore collection and write CSV file in a paginated algorithm
    page_size = 1000   # Preset page size (max # of rows per batch to fetch/write at a time). Adjust in your case to avoid timeout in default 60s
    total_count = 0
    coll_ref = db.collection(coll_to_read)
    docs = []
    cursor = None
    try:
        # Open CSV file and write header if required
        print(f'>>> {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} firestore_to_csv() - Started processing collection {coll_to_read}...')
        with open(csv_filename,'w',newline='',encoding='utf-8') as csvfile:
            writer = csv.DictWriter(csvfile,fieldnames=fields_to_read,extrasaction='ignore',restval='Null')
            if write_headers: 
                writer.writeheader()
                print(f'<<< {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} firestore_to_csv() - Finished writing CSV headers: {str(fields_to_read)} \n---')
        
            # Append each page of data fetched into CSV file
            while True:
                docs.clear()    # Clear page
                count = 0       # Reset page counter

                if cursor:      # Stream next page starting from cursor
                    docs = [snapshot for snapshot in coll_ref.limit(page_size).order_by('__name__').start_after(cursor).stream()]
                else:           # Stream first page if cursor not defined yet
                    docs = [snapshot for snapshot in coll_ref.limit(page_size).order_by('__name__').stream()]
            
                print(f'>>> {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} firestore_to_csv() - Started writing CSV row {total_count+1}...')    # +1 as total_count starts at 0
                for doc in docs:
                    doc_dict = doc.to_dict()
                    
                    # Process columns (e.g. add an id column)
                    doc_dict['FIRESTORE_ID'] = doc.id   # Capture doc id itself. Comment out if not used
                    
                    # Process rows (e.g. convert all date columns to local time). Comment out if not used
                    for header in doc_dict.keys():
                        if (header.find('DATE') >= 0) and (doc_dict[header] is not None) and (type(doc_dict[header]) is not str):
                            try:
                                doc_dict[header] = doc_dict[header].astimezone()
                            except Exception as e_time_conv:
                                print(f'??? {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} firestore_to_csv() - Exception in converting timestamp of {doc.id} in {doc_dict[header]}',e_time_conv)

                    # Write rows but skip certain rows. Comment out "if" and unindent "write" and "count" lines if not used
                    if ('TO_SKIP' not in doc_dict.keys()) or (('TO_SKIP' in doc_dict.keys()) and (doc_dict['TO_SKIP'] is not None) and (doc_dict['TO_SKIP'] != 'VALUE_TO_SKIP')):
                        writer.writerow(doc_dict)
                        count += 1

                # Check if finished writing last page or exceeded max limit
                total_count += count                # Increment total_count
                if len(docs) < page_size:           # Break out of while loop after fetching/writing last page (not a full page)
                    break
                else:
                    if (max_docs_to_read >= 0) and (total_count >= max_docs_to_read):
                        break                       # Break out of while loop after preset max limit exceeded
                    else:
                        cursor = docs[page_size-1]  # Move cursor to end of current page
                        continue                    # Continue to process next page

    except Exception as e_read_write:
        print(f'??? {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} firestore_to_csv() - Exception in reading Firestore collection / writing CSV file:',e_read_write)
    else:
        print(f'<<< {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} firestore_to_csv() - Finished writing CSV file with {total_count} rows of data \n---')