通过 FTP 逐行读取 CSV,而不将整个文件存储在内存/磁盘中

问题描述

我一直在用管道 ftplib.FTP.retrlinescsv.reader...

FTP.retrlines 重复调用包含一行的回调,而 csv.reader 需要一个迭代器,每次调用__next__() 方法时都会返回一个字符串。

如何将这两件事结合在一起,以便我可以读取和处理文件,而无需事先读取整个文件,例如将其存储在例如io.TextIOWrapper

我的问题是 FTP.retrlines 在它消耗了整个文件之前不会返回...

解决方法

我不确定是否有更好的解决方案,但您可以使用类似队列的可迭代对象将 FTP.retrlinescsv.reader 粘合在一起。由于这两个函数是同步的,您必须在不同的线程上并行运行它们。

像这样:

from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
 
ftp = FTP(host)
ftp.login(username,password)

class LineQueue:
    _queue = Queue(10)

    def add(self,s):
        print(f"Queueing line {s}")
        self._queue.put(s)
        print(f"Queued line {s}")

    def done(self):
        print("Signaling Done")
        self._queue.put(False)
        print("Signaled Done")

    def __iter__(self):
        print("Reading lines")
        while True:
            print("Reading line")
            s = self._queue.get()
            if s == False:
                print("Read all lines")
                break

            print(f"Read line {s}")
            yield s

q = LineQueue()

def download():
    ftp.retrlines("RETR /path/data.csv",q.add)
    q.done()

thread = Thread(target=download)
thread.start()

print("Reading CSV")
for entry in csv.reader(q):
    print(entry)

print("Read CSV")

thread.join()
,

Martin's的解决方案相同,只是直接保存了几行代码子类化queue.Queue

from queue import Queue
from ftplib import FTP
from threading import Thread
import csv
 
ftp = FTP(**ftp_credentials)

class LineQueue(Queue):
    def __iter__(self):
        while True:
            s = self.get()
            if s is None:
                break
            yield s

    def __call__(self):
        ftp.retrlines(f"RETR {fname}",self.put)
        self.put(None)

q = LineQueue(10)

thread = Thread(target=q)
thread.start()

for entry in csv.reader(q):
    print(entry)

thread.join()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...