为什么python中的扩展列表内存即使在弹出所有元素后也不会在python中释放

问题描述

我面临着与 Python 中的列表、出队和链表有关的内存泄漏问题。这是下面的剪辑。确切的问题是,扩展列表内存在处理后没有释放。

下面是一个线程比第二个线程更快地生成数据的片段。第一个线程产生数据并休眠几分钟。意味着第二个线程处理数据并等待新数据的到来。处理完列表中的所有元素后,期望释放所有内存。但它没有发生。有什么办法可以实现这一目标......?或者如果错了请纠正我。

import threading
import asyncio
import time
import random
from external_node import setup
import gc


class Worker:

    def __init__(self):
        self.data = []
        self.batch_size = 500
        self.check_con = 0
        self.check_task = 0
        self.thread_lock = threading.Lock()
        self.counter = 0

    def push_data(self,pid):
        print(f"starting to prepare the data {time.time()} for {id} ")
        prep_data = []
        current_time = int(time.time() * 1000)
        for i in range(1,1000):
            current_time = current_time + 500
            dp = {'data_type': 'process','process_id': pid,'timestamp': current_time}
            data_point = {}
            tags = range(1,1000)
            for each_data_point in tags:
                data_point[str(each_data_point)] = random.random()
            # print(type(id))
            dp["data"] = data_point
            prep_data.append(dp)
        return prep_data

    def stream_thread(self,event_loop):
        while True:
            if self.counter < 50:
                input_data = self.push_data('3')
                self.thread_lock.acquire()
                for index in range(0,len(input_data),self.batch_size):
                    value = input_data[index: index + self.batch_size]
                    self.data.append(value)
                self.thread_lock.release()
                if self.check_task == 0:
                    asyncio.run_coroutine_threadsafe(
                        self.process_data('3'),event_loop)
                    self.check_task = 1
                    print('task submitted to loop')
                time.sleep(1)
                self.counter = self.counter + 1
            else:
                time.sleep(600)
                self.counter = 0

    async def process_data(self,ref_id):
        while True:
            if len(self.data) > 0:
                self.thread_lock.acquire()
                required_batch = self.data.pop(0)
                self.thread_lock.release()
                print(f'writing records count {len(required_batch)} and total number of records {len(self.data)}')
                time.sleep(5)
                del required_batch
                gc.collect()
            elif len(self.data) < 0 and self.counter > 50:
                self.data.clear()
            else:
                # print(f'no records found ... Wating')
                await asyncio.sleep(0.05)


if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    worker = Worker()
    stream_thread = threading.Thread(target=worker.stream_thread,args=(event_loop,))
    stream_thread.daemon = True
    stream_thread.start()

    event_loop.run_forever()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...