Python Process Pool non-daemonic?
daemonic이 아닌 python Pool을 만들 수 있을까요?저는 수영장 안에 다른 수영장이 있는 기능을 호출할 수 있기를 원합니다.
디몬 프로세스는 프로세스를 생성할 수 없기 때문에 저는 이것을 원합니다.구체적으로 오류가 발생합니다.
AssertionError: daemonic processes are not allowed to have children
예를 들어, 다음과 같은 시나리오를 생각해 보십시오.function_a수영장을 운영하고 있습니다.function_b수영장이 있는 곳은function_c. 이 함수 체인은 실패합니다. 왜냐하면function_b는 데몬 프로세스에서 실행되고 있으며 데몬 프로세스는 프로세스를 생성할 수 없습니다.
multiprocessing.pool.Pool클래스는 작업자 프로세스를 생성합니다.__init__ 는 의 method, demonic합니다를 할 수 . 그러면 다시 설정할 수 없습니다.daemon에 귀속시키다False시작하기 전에 (그리고 그 후에는 더 이상 허용되지 않습니다).하지만 당신은 당신만의 하위 클래스를 만들 수 있습니다.multiprocesing.pool.Pool(multiprocessing.Pool포장지 함수일 뿐입니다). 그리고 자신의 것으로 대체합니다.multiprocessing.Process작업자 프로세스에 사용할 서브클래스(sub-class). 항상 daemonic이 아닌 것입니다.
다음은 이 방법에 대한 전체 예시입니다.중요한 부분은 두가지 수업입니다.NoDaemonProcess그리고.MyPool정상에 서서 전화할pool.close()그리고.pool.join()너의 위에MyPool끝에 있는 예
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()
저는 파이썬 3.7에서 비대모닉 풀을 사용해야 하는 필요성이 있었고, 수락된 답변에 게시된 코드를 수정하게 되었습니다.아래에는 비대모닉 풀을 생성하는 토막글이 있습니다.
import multiprocessing.pool
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)
현재의 구현 예로서multiprocessing광범위하게 리팩토링되어 문맥에 기반을 두고 있습니다. 우리는 다음을 제공할 필요가 있습니다.NoDaemonContext우리의 수업이 있는.NoDaemonProcess속성으로NestablePool그러면 기본 컨텍스트 대신 해당 컨텍스트를 사용합니다.
그렇기는 하지만 이 접근법에 대해 적어도 두 가지 주의해야 할 사항이 있음을 경고해야 합니다.
- 이는 여전히 다음의 구현 세부 사항에 달려 있습니다.
multiprocessing따라서 언제든지 파손될 수 있습니다. - 다음과 같은 타당한 이유가 있습니다.
multiprocessingdaem이 아닌 프로세스를 사용하는 것을 매우 어렵게 만들었고, 그 중 많은 부분이 여기에서 설명됩니다.제 의견 중 가장 설득력 있는 것은 다음과 같습니다.
하위 프로세스를 사용하여 자식 스레드가 자신의 자식에게 산란되도록 허용하는 것에 관해서는 하위 프로세스가 완료되고 돌아오기 전에 부모 스레드 또는 자식 스레드가 종료되면 좀비 '손자녀'의 작은 군대가 생성될 위험이 있습니다.
파이썬 3.8부터는 이 제한이 없습니다.문제가 전혀 없는 중첩 프로세스 풀을 가질 수 있습니다.
from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time
def pid():
return current_process().pid
def _square(i): # Runs in inner_pool
square = i ** 2
time.sleep(i / 10)
print(f'{pid()=} {i=} {square=}')
return square
def _sum_squares(i, j): # Runs in outer_pool
with Pool(max_workers=2) as inner_pool:
squares = inner_pool.map(_square, (i, j))
sum_squares = sum(squares)
time.sleep(sum_squares ** .5)
print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
return sum_squares
def main():
with Pool(max_workers=3) as outer_pool:
for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
print(f'{pid()=} {sum_squares=}')
if __name__ == "__main__":
main()
위 데모 코드는 Python 3.8로 테스트되었습니다.
의 한계ProcessPoolExecutor, 하지만, 그것은 그것이 그들이 가지고 있지 않다는 것입니다.maxtasksperchild. 이것이 필요하다면 대신 마시밀리아노의 대답을 생각해 보세요.
크레딧: jfs로 답변
멀티프로세싱 모듈은 프로세스나 스레드가 있는 풀을 사용하기 좋은 인터페이스를 갖추고 있습니다.현재 사용 사례에 따라 다음을 사용하는 것을 고려할 수 있습니다.multiprocessing.pool.ThreadPool프로세스가 아닌 스레드(내부에서 프로세스를 생성할 수 있는)가 발생하는 외부 풀에 대해 설명합니다.
GIL에 의해 제한될 수도 있지만, 저의 특정한 경우(둘 다 테스트), 외부에서 프로세스를 시작하는 시간Pool여기서 창조된 것과 같이, 해결책보다 훨씬 더 많습니다.ThreadPool.
교환하기가 정말 쉽습니다.Processes위해서Threads. 사용 방법에 대해 자세히 알아보기ThreadPool여기나 여기나 해결책.
일부 Pool to custom으로 표준 Pool을 대체하는 Python 버전에서는 다음과 같은 오류가 발생할 수 있습니다.AssertionError: group argument must be None for now.
여기서 다음과 같은 도움을 줄 수 있는 솔루션을(를)
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, val):
pass
class NoDaemonProcessPool(multiprocessing.pool.Pool):
def Process(self, *args, **kwds):
proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
proc.__class__ = NoDaemonProcess
return proc
를 사용하여 이 문제를 다루는 사람들을 본 적이 있습니다.celery의 갈림길multiprocessing대모닉 프로세스가 아이들을 산란시킬 수 있게 해주는 당구(multip처리 풀 확장)라고 불립니다.이 방법은 단순히 다음을 대체하는 것입니다.multiprocessing모듈 기준:
import billiard as multiprocessing
모듈 간에 글로벌을 가져오려다가 ProcessPool() 라인이 여러 번 평가되는 문제가 발생했습니다.
globals.py
from processing import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading import ThreadPool
class SingletonMeta(type):
def __new__(cls, name, bases, dict):
dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
return super(SingletonMeta, cls).__new__(cls, name, bases, dict)
def __init__(cls, name, bases, dict):
super(SingletonMeta, cls).__init__(name, bases, dict)
cls.instance = None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
return cls.instance
def __deepcopy__(self, item):
return item.__class__.instance
class Globals(object):
__metaclass__ = SingletonMeta
"""
This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
The root cause is that importing this file from different modules causes this file to be reevalutated each time,
thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
"""
def __init__(self):
print "%s::__init__()" % (self.__class__.__name__)
self.shared_manager = Manager()
self.shared_process_pool = ProcessPool()
self.shared_thread_pool = ThreadPool()
self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
그런 다음 코드의 다른 곳에서 안전하게 가져오기
from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock
저는 좀 더 확장된 포장지 클래스를 작성했습니다.pathos.multiprocessing기:
참고로 사용 사례에서 성능 최적화로 비동기 다중 프로세스 맵만 필요로 하는 경우 joblib은 모든 프로세스 풀을 백그라운드에서 관리하며 다음과 같은 매우 간단한 구문을 사용할 수 있습니다.
squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
이것은 오류가 겉보기에 거짓 양성일 때의 해결책을 제시합니다.James가 언급한 것처럼 대모닉 프로세스에서 의도하지 않은 가져오기가 발생할 수 있습니다.
예를 들어, 만약 당신이 다음과 같은 간단한 코드를 가지고 있다면,WORKER_POOL작업자로부터 실수로 가져올 수 있으므로 오류가 발생할 수 있습니다.
import multiprocessing
WORKER_POOL = multiprocessing.Pool()
간단하지만 안정적인 해결 방법은 다음과 같습니다.
import multiprocessing
import multiprocessing.pool
class MyClass:
@property
def worker_pool(self) -> multiprocessing.pool.Pool:
# Ref: https://stackoverflow.com/a/63984747/
try:
return self._worker_pool # type: ignore
except AttributeError:
# pylint: disable=protected-access
self.__class__._worker_pool = multiprocessing.Pool() # type: ignore
return self.__class__._worker_pool # type: ignore
# pylint: enable=protected-access
위의 해결책에서,MyClass.worker_pool오류 없이 사용할 수 있습니다.만약 당신이 이 방법이 개선될 수 있다고 생각한다면, 저에게 알려주세요.
이미 대모닉 프로세스에 있는 경우에도 풀을 시작할 수 있는 방법은 다음과 같습니다.이것은 python 3.8.5에서 테스트되었습니다.
먼저, 다음을 정의합니다.Undaemonizecontext manager - 현재 프로세스의 데몬 상태를 일시적으로 삭제합니다.
class Undaemonize(object):
'''Context Manager to resolve AssertionError: daemonic processes are not allowed to have children
Tested in python 3.8.5'''
def __init__(self):
self.p = multiprocessing.process.current_process()
if 'daemon' in self.p._config:
self.daemon_status_set = True
else:
self.daemon_status_set = False
self.daemon_status_value = self.p._config.get('daemon')
def __enter__(self):
if self.daemon_status_set:
del self.p._config['daemon']
def __exit__(self, type, value, traceback):
if self.daemon_status_set:
self.p._config['daemon'] = self.daemon_status_value
이제 데몬 프로세스 내에서도 다음과 같이 풀을 시작할 수 있습니다.
with Undaemonize():
pool = multiprocessing.Pool(1)
pool.map(... # you can do something with the pool outside of the context manager
여기서의 다른 접근 방식은 애초에 데모닉이 아닌 풀을 만드는 것을 목표로 하지만, 이 접근 방식을 사용하면 이미 데모닉 프로세스에 있는 경우에도 풀을 시작할 수 있습니다.
Python 버전 3.7 이후로 non-daemonic ProcessPool Executer를 생성할 수 있습니다.
으로.if __name__ == "__main__":멀티프로세싱을 사용하는 동안 필요합니다.
from concurrent.futures import ProcessPoolExecutor as Pool
num_pool = 10
def main_pool(num):
print(num)
strings_write = (f'{num}-{i}' for i in range(num))
with Pool(num) as subp:
subp.map(sub_pool,strings_write)
return None
def sub_pool(x):
print(f'{x}')
return None
if __name__ == "__main__":
with Pool(num_pool) as p:
p.map(main_pool,list(range(1,num_pool+1)))
언급URL : https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
'programing' 카테고리의 다른 글
| 고급 사용자 정의 필드 - 이전에 선택한 분류법으로 사후 개체 필드 필터링 (0) | 2023.10.17 |
|---|---|
| 오류 61이 발생하여 원격 MySQL 서버에 연결할 수 없습니다. (0) | 2023.10.17 |
| 깃 푸쉬 오리진 헤드는 무엇을 의미합니까? (0) | 2023.10.17 |
| 플러그인 없이 두꺼운 상자에 내장된 워드프레스 사용 (0) | 2023.10.17 |
| PayPal IPN 응답에서 Woocommerce 기능은 무엇입니까? (0) | 2023.10.17 |