programing

다중 처리 - 파이프 대 대기열

subpage 2023. 6. 18. 16:01
반응형

다중 처리 - 파이프 대 대기열

파이썬의 멀티프로세싱 패키지에서 큐와 파이프의 근본적인 차이점은 무엇입니까?

어떤 시나리오에서 다른 시나리오보다 하나를 선택해야 합니까?언제 사용하는 것이 유리합니까?Pipe()언제 사용하는 것이 유리합니까?Queue()?

간단한 요약

CY2023년 현재, 이 답변에 설명된 기술은 상당히 구식입니다.요즘에는 또는 대신 사용할 수 있습니다.multiprocessing...

ProcessPoolExector()이 하지 않습니다.Pipe()또는Queue()작업/결과를 전달합니다.

원답

  • A에는 엔드포인트가 두 개만 있을 수 있습니다.

  • A에는 여러 생산자와 소비자가 있을 수 있습니다.

사용 시기

통신에 두 개 이상의 점이 필요한 경우 를 사용합니다.

절대적인 성능이 필요한 경우 a가 훨씬 빠릅니다.Queue()▁of▁top다▁on▁built니 위에 구축되어 있습니다.Pipe().

성능 벤치마킹

두 프로세스를 생성하고 프로세스 간에 가능한 한 빨리 메시지를 보내려고 합니다.다음은 다음을 사용하여 유사한 테스트 간 드래그 레이스의 타이밍 결과입니다.Pipe()그리고.Queue()...

참고로, 저는 보너스로 결과를 제출했습니다.

  • JoinableQueue()과 같은 경우 합니다.queue.task_done()작업에 대기열에서되지 않은 작업만 함) 이라고 하여 (으)ㄹ 수 있습니다.queue.join()일이 끝났다는 것을 압니다.

이 답변의 하단에 있는 각각의 코드는...

# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.9.2

$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.14316844940185547 seconds
Sending 100000 numbers to Pipe() took 1.3749017715454102 seconds
Sending 1000000 numbers to Pipe() took 14.252539157867432 seconds
$  python multi_queue.py
Sending 10000 numbers to Queue() took 0.17014789581298828 seconds
Sending 100000 numbers to Queue() took 1.7723784446716309 seconds
Sending 1000000 numbers to Queue() took 17.758610725402832 seconds
$ python multi_simplequeue.py
Sending 10000 numbers to SimpleQueue() took 0.14937686920166016 seconds
Sending 100000 numbers to SimpleQueue() took 1.5389132499694824 seconds
Sending 1000000 numbers to SimpleQueue() took 16.871352910995483 seconds
$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.15144729614257812 seconds
Sending 100000 numbers to JoinableQueue() took 1.567549228668213 seconds
Sending 1000000 numbers to JoinableQueue() took 16.237736225128174 seconds



# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.7.0

(py37_test) [mpenning@mudslide ~]$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.13469791412353516 seconds
Sending 100000 numbers to Pipe() took 1.5587594509124756 seconds
Sending 1000000 numbers to Pipe() took 14.467186689376831 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.1897726058959961 seconds
Sending 100000 numbers to Queue() took 1.7622203826904297 seconds
Sending 1000000 numbers to Queue() took 16.89015531539917 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.2238149642944336 seconds
Sending 100000 numbers to JoinableQueue() took 1.4744081497192383 seconds
Sending 1000000 numbers to JoinableQueue() took 15.264554023742676 seconds


# This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

요약:

  • 2에서 python 2.7에서는Pipe() 300% 더니다보다 약.Queue()그것에 대해 생각조차 하지 마.JoinableQueue()당신이 꼭 혜택을 받아야 하는 경우가 아니라면요.
  • 3에서 python 3.x는Pipe()여전히 (약 20%) 우위에 있습니다.Queue() s 사이의 , 하만사성능차이의이지차▁s▁between.Pipe()그리고.Queue()파이썬 2.7에서처럼 극적이지는 않습니다.한 다한양한▁theQueue()구현은 서로의 약 15% 이내에 있습니다.또한 제 테스트는 정수 데이터를 사용합니다.일부 사람들은 다중 처리에 사용되는 데이터 유형에서 성능 차이를 발견했다고 말했습니다.

파이썬 3.x의 핵심: YMMV... 자체 데이터 유형(예: 정수 / 문자열 / 객체)으로 자체 테스트를 실행하여 자체 관심 플랫폼사용 사례에 대한 결론을 도출하는 것을 고려합니다.

또한 python3.x 성능 테스트가 일관성이 없고 다소 차이가 있다는 점도 언급해야 합니다.각 사례에 대한 최상의 결과를 얻기 위해 몇 분에 걸쳐 여러 테스트를 수행했습니다.이러한 차이가 VMWare/가상화 환경에서 python3 테스트를 실행하는 것과 관련이 있다고 생각합니다. 하지만 가상화 진단은 추측에 불과합니다.

테스트 기법에 대한 의견에 대한 답변 ***

@JJC는 댓글에서 다음과 같이 말했습니다.

더 공정한 비교는 N개의 작업자를 실행하는 것이며, 각각의 작업자는 포인트 투 포인트 파이프를 통해 메인 스레드와 통신하는 것입니다. N개의 작업자는 모두 단일 포인트 투 멀티 포인트 대기열에서 풀링합니다.

이 은 한 한 으로, 그것이 << 래이답은한와한작다명생성니을의고습만려했과원자산의자업의명변-▁▁for▁use;다case▁baseline▁only니원▁that▁answer▁and▁this래'고습려sance>의 기본적인 사용 사례입니다. 이것이 기본적인 사용 사례입니다.Pipe()귀하의 의견에는 여러 작업자 프로세스에 대해 다른 테스트를 추가해야 합니다.은 일반적인 는일으유관만지찰에 이지만,Queue()사용 사례. 완전히 새로운 축을 따라 테스트 매트릭스를 쉽게 폭발시킬 수 있습니다(즉, 다양한 수의 작업자 프로세스로 테스트 추가).

보너스 자료 2

멀티프로세싱은 정보 흐름에 미묘한 변화를 초래하여 사용자가 일부 단축키를 알지 못하는 한 디버깅을 어렵게 만듭니다.예를 들어, 여러 조건에서 사전을 통해 색인화할 때 잘 작동하지만 특정 입력에서 자주 실패하는 스크립트가 있을 수 있습니다.

일반적으로 전체 파이썬 프로세스가 충돌할 때 오류에 대한 단서를 얻지만, 멀티프로세싱 기능이 충돌할 경우 콘솔에 원하지 않는 충돌 추적이 인쇄되지 않습니다.알려지지 않은 다중 처리 충돌을 추적하는 것은 무엇이 프로세스를 충돌시켰는지에 대한 단서 없이는 어렵습니다.

다중 처리 충돌 정보를 추적하는 가장 간단한 방법은 전체 다중 처리 기능을 다음과 같이 포장하는 것입니다.try/except 및사를 합니다.traceback.print_exc():

import traceback
def run(self, args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

이제 충돌을 발견하면 다음과 같은 것이 나타납니다.

FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(args)
  File "foo.py", line 46, in run
    KeyError: 'that'

소스 코드:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in range(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))

"""
multi_simplequeue.py
"""

from multiprocessing import Process, SimpleQueue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = SimpleQueue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to SimpleQueue() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))

의 한 가 추 기능의 한 Queue()주목할 가치가 있는 것은 피더 스레드입니다.이 섹션에서는 "프로세스가 처음으로 대기열에 항목을 배치할 때 버퍼에서 파이프로 객체를 전송하는 피더 스레드가 시작됩니다."라고 설명합니다.무한한 수의(또는 최대 크기) 항목을 삽입할 수 있습니다.Queue()queue.put()막는.이렇게 하면 여러 개의 항목을 한 번에 저장할 수 있습니다.Queue()프로그램이 처리할 준비가 될 때까지.

Pipe()그러나 한 연결에 전송되었지만 다른 연결에서 수신되지 않은 항목에 대한 저장 공간은 한정되어 있습니다.이 스토리지가 모두 사용된 후 다음과 같은 전화가 걸려옵니다.connection.send()전체 항목을 쓸 공간이 있을 때까지 차단합니다.이렇게 하면 다른 스레드가 파이프에서 읽을 때까지 쓰기 스레드가 지연됩니다. Connection개체를 사용하면 기본 파일 설명자에 액세스할 수 있습니다.**nix 시서는다방수있다습니를 방지할 수 .connection.send()를사여차호를 os.set_blocking() 않는 합니다.그러나 파이프 파일에 맞지 않는 항목을 하나만 보내려고 하면 문제가 발생합니다.최신 버전의 Linux에서는 파일 크기를 늘릴 수 있지만 허용되는 최대 크기는 시스템 구성에 따라 다릅니다.로 그므로절안됩니다는서의해지에 는 안 됩니다.Pipe()데이터를 버퍼링합니다.대로 connection.send다른 파이프에서 데이터를 읽을 때까지 차단할 수 있습니다.

결론적으로 데이터를 버퍼링해야 할 경우 큐가 파이프보다 더 나은 선택입니다.두 지점 사이에서만 의사소통이 필요한 경우에도 마찬가지입니다.

만약 - 나처럼 - 당신이 - 를 사용할지 고민하고 있다면multiprocessing건설)Pipe또는Queue당신의 안에서.threading공연을 위한 프로그램들, 저는 마이크 페닝턴의 대본을 비교하기 위해 각색했습니다.queue.Queue그리고.queue.SimpleQueue:

Sending 10000 numbers to mp.Pipe() took 65.051 ms
Sending 10000 numbers to mp.Queue() took 78.977 ms
Sending 10000 numbers to queue.Queue() took 14.781 ms
Sending 10000 numbers to queue.SimpleQueue() took 0.939 ms
Sending 100000 numbers to mp.Pipe() took 449.564 ms
Sending 100000 numbers to mp.Queue() took 811.938 ms
Sending 100000 numbers to queue.Queue() took 149.387 ms
Sending 100000 numbers to queue.SimpleQueue() took 9.264 ms
Sending 1000000 numbers to mp.Pipe() took 4660.451 ms
Sending 1000000 numbers to mp.Queue() took 8499.743 ms
Sending 1000000 numbers to queue.Queue() took 1490.062 ms
Sending 1000000 numbers to queue.SimpleQueue() took 91.238 ms
Sending 10000000 numbers to mp.Pipe() took 45095.935 ms
Sending 10000000 numbers to mp.Queue() took 84829.042 ms
Sending 10000000 numbers to queue.Queue() took 15179.356 ms
Sending 10000000 numbers to queue.SimpleQueue() took 917.562 ms

도 없이, 놀랄것없이, 사는하것을 합니다.queue패키지는 스레드만 있는 경우 훨씬 더 나은 결과를 산출합니다.그렇긴 하지만, 나는 얼마나 성능이 좋은지에 놀랐습니다.queue.SimpleQueue사실은.


"""
pipe_performance.py
"""
import threading as td
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_connection
import time
import typing

def reader_pipe(p_out: mp_connection.Connection) -> None:
    while True:
        msg = p_out.recv()
        if msg=='DONE':
            break

def reader_queue(p_queue: "queue.Queue[typing.Union[str, int]]") -> None:
    while True:
        msg = p_queue.get()
        if msg=='DONE':
            break

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6, 10**7]:
    # first: mp.pipe
        p_mppipe_out, p_mppipe_in = mp.Pipe()
        reader_p = td.Thread(target=reader_pipe, args=((p_mppipe_out),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_mppipe_in.send(ii)
        p_mppipe_in.send('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to mp.Pipe() took {(time.time() - _start)*1e3:.3f} ms")

    # second: mp.Queue
        p_mpqueue  = mp.Queue()
        reader_p = td.Thread(target=reader_queue, args=((p_mpqueue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_mpqueue.put(ii)
        p_mpqueue.put('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to mp.Queue() took {(time.time() - _start)*1e3:.3f} ms")

    # third: queue.Queue
        p_queue = queue.Queue()
        reader_p = td.Thread(target=reader_queue, args=((p_queue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_queue.put(ii)
        p_queue.put('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to queue.Queue() took {(time.time() - _start)*1e3:.3f} ms")

    # fourth: queue.SimpleQueue
        p_squeue = queue.SimpleQueue()
        reader_p = td.Thread(target=reader_queue, args=((p_squeue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_squeue.put(ii)
        p_squeue.put('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to queue.SimpleQueue() took {(time.time() - _start)*1e3:.3f} ms")

동시 사용 시.장래의 일Python에서 하위 프로세스를 실행하기 위한 ProcessPoolExecutor입니다. 다중 프로세스를 통과할 수 없습니다.인수로 큐잉합니다.이 경우 다음과 같은 오류가 표시됩니다.

RuntimeError: Queue objects should only be shared between processes through inheritance

이 경우 한 가지 해결 방법은 멀티프로세싱을 사용하는 것입니다.프로세스에 인수로 전달할 수 있는 대기열을 만드는 관리자입니다.하지만 이런 종류의 큐는 표준 멀티프로세싱보다 훨씬 느립니다.대기열. 이런 종류의 대기열에 대한 벤치마크를 찾지 못해서 제가 직접 실행해봤습니다.이 관리자 큐를 벤치마킹하기 위해 Mike Pennington의 테스트 코드를 수정했습니다.

여기 결과가 있습니다.먼저 표준 대기열 테스트를 다시 실행합니다.

Sending 10000 numbers to Queue() took 0.12702512741088867 seconds
Sending 100000 numbers to Queue() took 0.9972114562988281 seconds
Sending 1000000 numbers to Queue() took 9.9016695022583 seconds

Sending 10000 numbers to manager.Queue() took 1.0181043148040771 seconds
Sending 100000 numbers to manager.Queue() took 10.438829898834229 seconds
Sending 1000000 numbers to manager.Queue() took 102.3624701499939 seconds

결과는 다중 처리에 의해 생성된 대기열을 보여줍니다.관리자는 표준 다중 처리보다 약 10배 느립니다.줄을 서세요. 꽤 큰 차이입니다.성능에 관심이 있는 경우 이러한 대기열을 사용하지 마십시오.

소스 코드:

"""
manager_multi_queue.py
"""

from multiprocessing import Process, Queue, Manager
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    manager = Manager()
    pqueue = manager.Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to manager.Queue() took {1} seconds".format(count, (time.time() - _start)))

새 업데이트:

내 애플리케이션에서는 여러 프로세스가 동시에 대기열에 기록되고 한 프로세스가 결과를 소비합니다.이 경우 이러한 대기열의 성능은 매우 다릅니다.표준 다중 처리.여러 프로세스가 동시에 대기열에 쓸 경우 대기열이 매우 쉽게 과부하 상태가 되고 읽기 성능이 몇 배로 떨어집니다.이러한 상황에서는 훨씬 더 빠른 대안을 사용할 수 있습니다.

여기서 저는 5개의 프로세스에 의해 큐가 연속적으로 쓰여지는 동안 세 종류의 큐의 메시지 크기의 함수로서 읽기 성능을 비교합니다.세 가지 종류의 대기열이 있습니다.

  1. 멀티프로세싱대기열
  2. 멀티프로세싱부장님.대기열
  3. 다중 처리를 사용하는 사용자 지정 대기열입니다.여러 공정에서 안전하게 사용할 수 있도록 잠금 장치가 있는 파이프.

결과 그림을 보려면 여기를 클릭하십시오.

결과는 세 가지 종류의 대기열 간에 성능에 큰 차이가 있음을 보여줍니다.가장 빠른 것은 파이프를 사용하는 것이고, 다음은 관리자를 사용하여 작성된 대기열이고, 다음은 표준 다중 처리입니다.대기열. 대기열을 쓰는 동안 읽기 성능이 중요한 경우 파이프 또는 관리 대기열을 사용하는 것이 가장 좋습니다.

다음은 플롯이 포함된 이 새 테스트의 소스 코드입니다.

소스 코드:

from __future__ import annotations

"""
queue_comparison_plots.py
"""

import asyncio
import random
from dataclasses import dataclass
from itertools import groupby
from multiprocessing import Process, Queue, Manager
import time
from matplotlib import pyplot as plt
import multiprocessing as mp

class PipeQueue():
    pipe_in: mp.connection.Connection
    pipe_out: mp.connection.Connection

    def __init__(self):
        self.pipe_out, self.pipe_in = mp.Pipe(duplex=False)
        self.write_lock = mp.Lock()
        self.read_lock = mp.Lock()

    def get(self):
        with self.read_lock:
            return self.pipe_out.recv()

    def put(self, val):
        with self.write_lock:
            self.pipe_in.send(val)

@dataclass
class Result():
    queue_type: str
    data_size_bytes: int
    num_writer_processes: int
    num_reader_processes: int
    msg_read_rate: float

class PerfTracker():
    def __init__(self):
        self.running = mp.Event()
        self.count = mp.Value("i")
        self.start_time: float | None = None
        self.end_time: float | None = None

    @property
    def rate(self) -> float:
        return (self.count.value)/(self.end_time-self.start_time)

    def update(self):
        if self.running.is_set():
            with self.count.get_lock():
                self.count.value += 1

    def start(self):
        with self.count.get_lock():
            self.count.value = 0
        self.running.set()
        self.start_time = time.time()

    def end(self):
        self.running.clear()
        self.end_time = time.time()


def reader_proc(queue, perf_tracker, num_threads = 1):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(reader_proc_async(queue, perf_tracker, num_threads))


async def reader_proc_async(queue, perf_tracker, num_threads = 1):
    async def thread(queue, perf_tracker):
        while True:
            msg = queue.get()
            perf_tracker.update()

    futures = []
    for i in range(num_threads):
        futures.append(thread(queue, perf_tracker))

    await asyncio.gather(*futures)


def writer_proc(queue, data_size_bytes: int):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(writer_proc_async(queue, data_size_bytes))


async def writer_proc_async(queue, data_size_bytes: int):
    val = random.randbytes(data_size_bytes)

    while True:
        queue.put(val)


async def main():
    num_reader_procs = 1
    num_reader_threads = 1
    num_writer_procs = 5
    test_time = 5

    results = []
    for queue_type in ["Pipe + locks", "Queue using Manager", "Queue"]:
        for data_size_bytes_order_of_magnitude in range(8):
            data_size_bytes = 10 ** data_size_bytes_order_of_magnitude
            perf_tracker = PerfTracker()

            if queue_type == "Queue using Manager":
                manager = Manager()
                pqueue = manager.Queue()
            elif queue_type == "Pipe + locks":
                pqueue = PipeQueue()
            elif queue_type == "Queue":
                pqueue = Queue()
            else:
                raise NotImplementedError()

            reader_ps = []
            for i in range(num_reader_procs):
                reader_p = Process(target=reader_proc, args=(pqueue, perf_tracker, num_reader_threads))
                reader_ps.append(reader_p)


            writer_ps = []
            for i in range(num_writer_procs):
                writer_p = Process(target=writer_proc, args=(pqueue, data_size_bytes))
                writer_ps.append(writer_p)

            for writer_p in writer_ps:
                writer_p.start()

            for reader_p in reader_ps:
                reader_p.start()

            await asyncio.sleep(1)
            print("start")
            perf_tracker.start()
            await asyncio.sleep(test_time)
            perf_tracker.end()
            print(f"Finished. {queue_type} | {data_size_bytes} |  {perf_tracker.rate} msg/sec")

            results.append(
                Result(
                    queue_type = queue_type,
                    data_size_bytes = data_size_bytes,
                    num_writer_processes = num_writer_procs,
                    num_reader_processes = num_reader_procs,
                    msg_read_rate = perf_tracker.rate,
                )
            )
            for writer_p in writer_ps:
                writer_p.kill()

            for reader_p in reader_ps:
                reader_p.kill()

    print(results)

    fig, ax = plt.subplots()

    count = 0
    for queue_type, result_iterator in groupby(results, key=lambda result: result.queue_type):
        grouped_results = list(result_iterator)
        x_coords = [x.data_size_bytes for x in grouped_results]
        y_coords = [x.msg_read_rate for x in grouped_results]
        ax.plot(x_coords, y_coords, label=f"{queue_type}")
        count += 1

    ax.set_title(f"Queue read performance comparison while writing continuously", fontsize=11)
    ax.legend(loc='upper right', fontsize=10)
    ax.set_yscale("log")
    ax.set_xscale("log")
    ax.set_xlabel("Message size (bytes)")
    ax.set_ylabel("Message throughput (messages/second)")
    plt.show()

if __name__=='__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(main())

언급URL : https://stackoverflow.com/questions/8463008/multiprocessing-pipe-vs-queue

반응형