0%

多进程嵌套内存泄漏问题记录

记一次多进程嵌套发现的内存泄漏问题,以及解决方案。

问题代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import concurrent.futures
from typing import Any
import gc
import sys

class Foo(object):
def __init__(self):
self.executor = ProcessPoolExecutor(max_workers=10, mp_context=multiprocessing.get_context("spawn"))

@staticmethod
def fun(arg):
raise concurrent.futures.process._RemoteTraceback("error")

def call(self, i):
x = []
try:
future = self.executor.submit(self.fun, i)
res = future.result(timeout=1)
except Exception as e:
print("-"*20)
return x

def predict():
f = Foo()
for i in range(3):
f.call(i)

if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=2, mp_context=multiprocessing.get_context("spawn")) as e:
futures = e.submit(predict)
futures.result()

肉眼观察 乍一看这段代码是没有任何问题的,在 submit 之后,获取 result ,并且用 try ... except ... 来捕获可能出现的任何情况。尤其是,当不使用 main 函数中的进程池时,直接普通的使用 predict 函数,这段代码不会引发任何问题,可以正常的退出。

但为什么改成多进程时,这段代码就无法正常退出了呢?程序无法正常退出往往是因为程序某些对象没有完全被释放干净,但这里并没有使用特殊的函数却依旧引发了该问题,所以需要借助工具来进行简单的分析。

分析

garbage collector python 的垃圾回收机制主要使用引用计数(reference counting)来跟踪和回收垃圾。大部分情况下,Python 能够在对象结束执行后,引用计数会变为 0,使得最后能够被回收。这里程序退出不了的原因是某些对象没发自动退出,所以在 predict 函数中,观察变量 f 和 f.executor 的引用计数

1
2
3
4
5
6
def predict():
f = Foo()
for i in range(3):
f.call(i)
print(sys.getrefcount(f))
print(sys.getrefcount(f.executor))

程序的输出,对象 f 的计数为 5,而 f.executor 的计数为 2。为什么是 2,因此在最外层还有一个进程数量为 2 的进程池没有执行完。而 f 等于 4 而不等于 f.executor 这就不符合预期,比正常数值多 3。这个 3 是因为循环了 3 次,每次都引发了异常。

进一步地,观察 f 中什么对象没有被清理。

1
2
3
4
5
6
7
def predict():
f = Foo()
for i in range(3):
f.call(i)
print(gc.get_referrers(f))
print(sys.getrefcount(f))
print(sys.getrefcount(f.executor))

此时输出会多一行,形如

1
[<frame at 0x10c266240, file 'XXX', line 24, code call>, <frame at 0x10c6b0240, file 'XXX', line 24, code call>, <frame at 0x10c6b0440, file 'XXX', line 24, code call>]

这表明程序在何处的对象没有被处理,可以定位到函数 call 的 return 位置。

为什么? 在获取 result 时,如果遇到异常,则中间变量 future 无法被回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def result(self, timeout=None):
"""Return the result of the call that the future represents.

Args:
timeout: The number of seconds to wait for the result if the future
isn't done. If None, then there is no limit on the wait time.

Returns:
The result of the call that the future represents.

Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
Exception: If the call raised then that exception will be raised.
"""
try:
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()

self._condition.wait(timeout)

if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
finally:
# Break a reference cycle with the exception in self._exception
self = None

解决方案

消除中间变量 future

1
2
3
4
future = self.executor.submit(self.fun, i)
res = future.result(timeout=1)
# 改为
res = self.executor.submit(self.fun, i).result(timeout=1)

手动 GC

在调用完 Foo 之后,使用 gc.collect()

主动关闭进程池

在调用完 Foo 之后,使用 f.executor.shutdown()

在 return 前 主动删除未释放的对象

如下所示,使用 future = None 也可以达到相似效果(让 Python 自动调用垃圾回收机制)

1
2
3
4
5
6
7
8
9
def call(self, i):
x = []
try:
future = self.executor.submit(self.fun, i)
res = future.result(timeout=1)
except Exception as e:
print("-"*20)
del future
return x

Q & A

Q: 能不能在 Foo 中添加 del 函数解决这个问题 A: 在 Python 3.8.16 可以,Python 3.9.6 不行。可以手动增加 del 函数添加 print 函数,观察是否有对应的输出。

Q: gc 显示为什么是在 return 这,而不是其他地方? A: frame 对象有一个属性叫作 f_lineno,它表示当前执行的代码行。当函数执行到 return 语句时,f_lineno 会更新为 return 语句所在的行号。

这是因为在函数返回值之后,控制权会返回到调用该函数的代码。此时函数的上下文(包括它的局部变量、执行状态等)会被丢弃,栈帧也会被弹出调用栈。这个过程称为函数的退出或返回。所以,return 语句所在的行是函数执行的最后一行。