问题描述
我面临着与 Python 中的列表、出队和链表有关的内存泄漏问题。这是下面的剪辑。确切的问题是,扩展列表内存在处理后没有释放。
下面是一个线程比第二个线程更快地生成数据的片段。第一个线程产生数据并休眠几分钟。意味着第二个线程处理数据并等待新数据的到来。处理完列表中的所有元素后,期望释放所有内存。但它没有发生。有什么办法可以实现这一目标......?或者如果错了请纠正我。
import threading
import asyncio
import time
import random
from external_node import setup
import gc
class Worker:
def __init__(self):
self.data = []
self.batch_size = 500
self.check_con = 0
self.check_task = 0
self.thread_lock = threading.Lock()
self.counter = 0
def push_data(self,pid):
print(f"starting to prepare the data {time.time()} for {id} ")
prep_data = []
current_time = int(time.time() * 1000)
for i in range(1,1000):
current_time = current_time + 500
dp = {'data_type': 'process','process_id': pid,'timestamp': current_time}
data_point = {}
tags = range(1,1000)
for each_data_point in tags:
data_point[str(each_data_point)] = random.random()
# print(type(id))
dp["data"] = data_point
prep_data.append(dp)
return prep_data
def stream_thread(self,event_loop):
while True:
if self.counter < 50:
input_data = self.push_data('3')
self.thread_lock.acquire()
for index in range(0,len(input_data),self.batch_size):
value = input_data[index: index + self.batch_size]
self.data.append(value)
self.thread_lock.release()
if self.check_task == 0:
asyncio.run_coroutine_threadsafe(
self.process_data('3'),event_loop)
self.check_task = 1
print('task submitted to loop')
time.sleep(1)
self.counter = self.counter + 1
else:
time.sleep(600)
self.counter = 0
async def process_data(self,ref_id):
while True:
if len(self.data) > 0:
self.thread_lock.acquire()
required_batch = self.data.pop(0)
self.thread_lock.release()
print(f'writing records count {len(required_batch)} and total number of records {len(self.data)}')
time.sleep(5)
del required_batch
gc.collect()
elif len(self.data) < 0 and self.counter > 50:
self.data.clear()
else:
# print(f'no records found ... Wating')
await asyncio.sleep(0.05)
if __name__ == '__main__':
event_loop = asyncio.get_event_loop()
worker = Worker()
stream_thread = threading.Thread(target=worker.stream_thread,args=(event_loop,))
stream_thread.daemon = True
stream_thread.start()
event_loop.run_forever()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)