Programing

asyncio는 실제로 어떻게 작동합니까?

lottogame 2020. 12. 3. 07:21
반응형

asyncio는 실제로 어떻게 작동합니까?


이 질문은 내 다른 질문에 의해 동기가 부여되었습니다. cdef에서 기다리는 방법?

에 대한 많은 기사와 블로그 게시물이 웹에 asyncio있지만 모두 매우 피상적입니다. asyncio실제로 구현되는 방법 과 I / O를 비 동기화하는 이유 에 대한 정보를 찾을 수 없습니다 . 소스 코드를 읽으려고했지만 최고 등급의 C 코드가 아닌 수천 줄의 코드로 많은 부분이 보조 객체를 다루고 있지만 가장 중요한 것은 Python 구문과 번역 할 C 코드를 연결하기가 어렵다는 것입니다. 으로.

Asycnio의 자체 문서는 훨씬 덜 유용합니다. 작동 방식에 대한 정보는 없으며 사용 방법에 대한 지침 만 있으며 때로는 오해의 소지가 있거나 매우 잘못 작성되었습니다.

저는 Go의 코 루틴 구현에 익숙하며 Python이 동일한 작업을 수행하기를 바라고 있습니다. 이 경우 위에 링크 된 게시물에서 작성한 코드가 작동했을 것입니다. 그렇지 않았기 때문에 이제 이유를 알아 내려고 노력하고 있습니다. 지금까지 내 최선의 추측은 다음과 같습니다. 내가 틀린 부분을 수정하십시오.

  1. 형식의 프로 시저 정의 async def foo(): ...는 실제로를 상속하는 클래스의 메서드로 해석됩니다 coroutine.
  2. 아마도 async def실제로는 await문에 의해 여러 메서드로 분할되며 , 여기서 이러한 메서드가 호출 된 객체는 지금까지 실행을 통해 수행 한 진행 상황을 추적 할 수 있습니다.
  3. 위의 내용이 사실이라면, 본질적으로 코 루틴의 실행은 일부 글로벌 관리자 (루프?)가 코 루틴 객체의 메서드를 호출하는 것으로 귀결됩니다.
  4. 전역 관리자는 I / O 작업이 Python (오직?) 코드에 의해 수행되는시기를 어떻게 든 (어떻게?) 인식하고 현재 실행중인 메서드가 제어를 포기한 후에 실행할 보류중인 코 루틴 메서드 중 하나를 선택할 수 있습니다 ( await문에 적중) . ).

다른 말로하면, asyncio좀 더 이해할 수있는 것으로 일부 구문을 "desugaring"하려는 시도가 있습니다 .

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

내 추측이 맞다면 문제가 있습니다. 이 시나리오에서 I / O는 실제로 어떻게 발생합니까? 별도의 스레드에서? 전체 통역사가 정지되고 I / O가 통역사 외부에서 발생합니까? I / O는 정확히 무엇을 의미합니까? 내 파이썬 프로 시저가 C open()프로 시저를 호출 하고 커널에 인터럽트를 전송하여 제어를 포기한다면, 파이썬 인터프리터는이를 어떻게 알고 다른 코드를 계속 실행할 수 있는지, 커널 코드는 실제 I / O를 수행하고 원래 인터럽트를 보낸 Python 절차를 깨 웁니다. 원칙적으로 파이썬 인터프리터는 이런 일이 일어나는 것을 어떻게 알 수 있습니까?


asyncio는 어떻게 작동합니까?

이 질문에 답하기 전에 몇 가지 기본 용어를 이해해야합니다. 이미 알고있는 용어가 있으면 건너 뛰십시오.

발전기

생성기는 파이썬 함수의 실행을 일시 중단 할 수있는 객체입니다. 사용자가 선별 한 생성기는 키워드를 사용하여 구현됩니다 yield. yield키워드를 포함하는 일반 함수를 생성하여 해당 함수를 생성기로 전환합니다.

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

보시다시피 next()생성기를 호출 하면 인터프리터가 테스트의 프레임을로드하고 yielded 값을 반환합니다 . next()다시 호출 하면 프레임이 인터프리터 스택에 다시로드되도록하고 yield다른 값을 계속 입력합니다.

세 번째 next()로 호출되면 발전기가 종료되고 StopIteration던져졌습니다.

발전기와 통신

: 발전기의 덜 알려진 기능은, 당신이 그 (것)들을 두 가지 방법을 사용하여 통신 할 수 있다는 사실이다 send()throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

를 호출 gen.send()하면 값이 yield키워드 의 반환 값으로 전달됩니다 .

gen.throw()다른 한편으로, 동일한 지점에서 발생한 예외 yield가 호출 된 상태에서 생성기 내부에서 예외를 throw 할 수 있습니다 .

생성기에서 값 반환

생성기에서 값을 반환하면 값이 StopIteration예외에 포함됩니다. 나중에 예외에서 값을 복구하여 필요에 따라 사용할 수 있습니다.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

새로운 키워드 : yield from

Python 3.4에는 새로운 키워드가 추가되었습니다 : yield from. 이 키워드로 우리가 할 수있는 것은 모든 next(), send()그리고 throw()가장 안쪽의 중첩 생성기로 전달하는 것입니다. 내부 생성기가 값을 반환하면 다음의 반환 값이기도합니다 yield from.

>>> def inner():
...     print((yield 2))
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print(val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen)
2
>>> gen.send("abc")
abc
3
4

함께 모아서

yield fromPython 3.4에서 새로운 키워드를 도입했을 때 , 우리는 이제 터널처럼 생성기 내부에서 생성기를 만들 수있었습니다.이 생성기는 가장 안쪽에서 가장 바깥 쪽 생성기로 데이터를 앞뒤로 전달합니다. 이것은 제너레이터 ( 코 루틴)에 대한 새로운 의미를 낳았습니다 .

코 루틴 은 실행 중에 중지 및 재개 할 수있는 함수입니다. Python에서는 async def키워드를 사용하여 정의됩니다 . 많은 발전기처럼, 그들도 자신의 고유 한 형태로 사용 yield from되는을 await. 이전 async과는 await파이썬 3.5에 도입 된, 우리는 (함께 생성 된 동일한 방식으로 발전기에서 코 루틴을 생성 yield from하는 대신 await).

async def inner():
    return 1

async def outer():
    await inner()

__iter__()메서드 를 구현하는 모든 반복기 또는 생성기와 마찬가지로 코 루틴 은 호출 __await__()될 때마다 계속할 수 있도록 구현 합니다 await coro.

확인해야 할 Python 문서 내부에 멋진 시퀀스 다이어그램 이 있습니다.

asyncio에서는 코 루틴 함수와 별도로 작업미래라는 두 가지 중요한 개체가 있습니다 .

선물

Future는 __await__()메소드가 구현 된 객체 이며, 그 역할은 특정 상태와 결과를 유지하는 것입니다. 상태는 다음 중 하나 일 수 있습니다.

  1. PENDING-future에 결과 또는 예외 세트가 없습니다.
  2. CANCELED-미래가 다음을 사용하여 취소되었습니다. fut.cancel()
  3. FINISHED-다음을 사용하는 결과 세트 fut.set_result()또는 다음을 사용 하는 예외 세트에 의해 미래가 완료되었습니다.fut.set_exception()

당신이 짐작 한 것처럼 그 결과는 반환 될 파이썬 객체이거나 발생할 수있는 예외 일 수 있습니다.

객체의 또 다른 중요한 기능은 . 이 메서드를 사용하면 예외가 발생 했든 완료 되었든 작업이 완료되는 즉시 함수를 호출 할 수 있습니다.futureadd_done_callback()

과제

태스크 객체는 코 루틴을 감싸고 가장 안쪽 및 가장 바깥 쪽 코 루틴과 통신하는 특별한 미래입니다. 코 루틴이 퓨처 일 때마다 퓨처 await는 작업으로 다시 전달되고 (에서와 같이 yield from) 작업은이를 수신합니다.

다음으로 작업은 자신을 미래에 연결합니다. add_done_callback()미래 를 부름으로써 그렇게합니다 . 지금부터 취소되거나 예외를 전달하거나 결과적으로 Python 객체를 전달하여 미래가 완료되면 작업의 콜백이 호출되고 다시 존재하게됩니다.

Asyncio

우리가 답해야 할 마지막 질문은 IO가 어떻게 구현됩니까?

asyncio 내부에는 이벤트 루프가 있습니다. 작업의 이벤트 루프. 이벤트 루프의 역할은 작업이 준비 될 때마다 작업을 호출하고 모든 노력을 하나의 작업 기계로 조정하는 것입니다.

이벤트 루프의 IO 부분은라는 단일 중요 함수를 기반으로합니다 select. Select는 아래의 운영 체제에 의해 구현되는 차단 기능으로, 소켓에서 들어 오거나 나가는 데이터를 대기 할 수 있습니다. 데이터가 수신되면 깨어나서 데이터를 수신 한 소켓 또는 쓰기 준비가 된 소켓을 반환합니다.

asyncio를 통해 소켓을 통해 데이터를 받거나 보내려고 할 때 실제로 아래에서 일어나는 일은 소켓에 즉시 읽거나 보낼 수있는 데이터가 있는지 먼저 확인하는 것입니다. 그것의 경우 .send()버퍼가 가득, 또는 .recv()버퍼가 비어있는 소켓은에 등록 select(단순히 목록 중 하나에 추가하여 기능 rlist에 대한 recvwlist대한 send) 및 해당 기능 awaitSA는 새로 만든 future그 소켓에 연결, 오브젝트.

사용 가능한 모든 작업이 퓨처를 기다리고있을 때 이벤트 루프는 호출 select하고 기다립니다. 소켓 중 하나에 들어오는 데이터가 있거나 send버퍼가 비워지면 asyncio는 해당 소켓에 연결된 미래 객체를 확인하고 완료로 설정합니다.

이제 모든 마법이 일어납니다. 미래는 완료로 설정되고 이전에 추가 된 작업이 add_done_callback()다시 살아 .send()나고 가장 안쪽의 코 루틴 ( await체인 때문에)을 재개하는 코 루틴을 호출 하고 근처 버퍼에서 새로 수신 된 데이터를 읽습니다. 유출되었습니다.

다음과 같은 경우 다시 메소드 체인 recv():

  1. select.select 기다립니다.
  2. 데이터가있는 준비 소켓이 리턴됩니다.
  3. 소켓의 데이터는 버퍼로 이동됩니다.
  4. future.set_result() 호출됩니다.
  5. 추가 된 작업 add_done_callback()이 이제 깨어납니다.
  6. 태스크는 .send()가장 안쪽의 코 루틴으로가는 코 루틴을 호출 하고 깨 웁니다.
  7. 데이터는 버퍼에서 읽고 겸손한 사용자에게 반환됩니다.

요약하면 asyncio는 기능을 일시 중지하고 다시 시작할 수있는 생성기 기능을 사용합니다. yield from가장 안쪽의 생성기에서 가장 바깥쪽으로 데이터를 앞뒤로 전달할 수있는 기능을 사용 합니다. IO가 완료되기를 기다리는 동안 (OS select기능 을 사용하여 ) 기능 실행을 중지하기 위해이 모든 것을 사용합니다 .

그리고 무엇보다도? 한 기능이 일시 중지 된 동안 다른 기능이 실행되고 섬세한 패브릭 (asyncio)과 인터리브 될 수 있습니다.


에 대해 이야기 async/await하고하는 것은 asyncio같은 일이 아닙니다. 첫 번째는 기본적인 저수준 구조 (코 루틴)이고, 후자는 이러한 구조를 사용하는 라이브러리입니다. 반대로, 궁극적 인 답은 하나도 없습니다.

다음은 방법에 대한 일반적인 설명입니다 async/awaitasyncio-like 라이브러리 작동합니다. 즉, 위에 다른 트릭이있을 수 있지만 (...) 직접 구축하지 않는 한 중요하지 않습니다. 그러한 질문을 할 필요가 없을만큼 이미 충분히 알고 있지 않는 한 그 차이는 무시할 만합니다.

1. 너트 쉘의 코 루틴 대 서브 루틴

서브 루틴 (함수, 프로 시저, ...) 과 마찬가지로 코 루틴 (생성자, ...)은 호출 스택과 명령어 포인터의 추상화입니다. 실행중인 코드 조각의 스택이 있으며 각각은 특정 명령어에 있습니다.

의 구별 def대는 async def명확성을 위해 단지이다. 실제 차이는 returnyield. 이것에서 await또는 yield from개별 호출에서 전체 스택으로의 차이를 가져옵니다.

1.1. 서브 루틴

서브 루틴은 지역 변수를 보유하는 새로운 스택 레벨과 끝에 도달하기위한 명령어의 단일 순회를 나타냅니다. 다음과 같은 서브 루틴을 고려하십시오.

def subfoo(bar):
     qux = 3
     return qux * bar

실행하면

  1. bar및에 대한 스택 공간 할당qux
  2. 재귀 적으로 첫 번째 명령문을 실행하고 다음 명령문으로 이동
  3. 한 번에 return값을 호출 스택으로 푸시합니다.
  4. 스택 (1.) 및 명령 포인터 (2.)를 지 웁니다.

특히 4.는 서브 루틴이 항상 동일한 상태에서 시작 함을 의미합니다. 기능 자체에 대한 모든 것은 완료시 손실됩니다. 이후에 지침이 있어도 기능을 재개 할 수 없습니다 return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. 영구 서브 루틴으로서의 코 루틴

코 루틴은 서브 루틴과 비슷하지만 상태 파괴 하지 않고 종료 할 수 있습니다 . 다음과 같은 코 루틴을 고려하십시오.

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

실행하면

  1. bar및에 대한 스택 공간 할당qux
  2. 재귀 적으로 첫 번째 명령문을 실행하고 다음 명령문으로 이동
    1. 한 번에 yield해당 값을 호출 스택에 푸시 하지만 스택 및 명령어 포인터를 저장합니다.
    2. 를 호출하면 yield스택 및 명령어 포인터를 복원하고 인수를qux
  3. 한 번에 return값을 호출 스택으로 푸시합니다.
  4. 스택 (1.) 및 명령 포인터 (2.)를 지 웁니다.

2.1과 2.2가 추가되었습니다. 코 루틴은 미리 정의 된 지점에서 일시 중지 및 재개 될 수 있습니다. 이것은 다른 서브 루틴을 호출하는 동안 서브 루틴이 일시 중단되는 방식과 유사합니다. 차이점은 활성 코 루틴이 호출 스택에 엄격하게 바인딩되지 않는다는 것입니다. 대신, 일시 중단 된 코 루틴은 별도의 격리 된 스택의 일부입니다.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

즉, 중단 된 코 루틴을 스택간에 자유롭게 저장하거나 이동할 수 있습니다. 코 루틴에 액세스 할 수있는 모든 호출 스택은 코 루틴을 다시 시작할 수 있습니다.

1.3. 호출 스택 탐색

지금까지 코 루틴은 yield. 서브 루틴은 아래로 갈 수있는 최대 와 호출 스택 return(). 완전성을 위해 코 루틴은 호출 스택으로 올라가는 메커니즘도 필요합니다. 다음과 같은 코 루틴을 고려하십시오.

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

실행하면 서브 루틴처럼 스택과 명령어 포인터를 여전히 할당한다는 의미입니다. 일시 중단되면 여전히 서브 루틴을 저장하는 것과 같습니다.

그러나 yield from않습니다 모두 . 스택 및 명령 포인터를 일시 중단 wrap 하고 실행 cofoo합니다. 이 참고 wrap될 때까지 중지 상태로 남아 있습니다 cofoo완전히 마감. cofoo일시 중단되거나 무언가가 전송 될 때마다 cofoo호출 스택에 직접 연결됩니다.

1.4. 끝까지 코 루틴

설정된대로 yield from다른 중간 범위에 걸쳐 두 범위를 연결할 수 있습니다. 재귀 적으로 적용 하면 스택 맨 위 를 스택 맨 아래연결할 수 있습니다 .

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

rootcoro_b서로에 대해 알고하지 않습니다. 이것은 코 루틴을 콜백보다 훨씬 깨끗하게 만듭니다. 코 루틴은 여전히 ​​서브 루틴과 같은 1 : 1 관계에 구축됩니다. 코 루틴은 일반 호출 지점까지 전체 기존 실행 스택을 일시 중단하고 다시 시작합니다.

특히 root재개 할 코 루틴을 임의의 수로 가질 수 있습니다. 그러나 동시에 둘 이상을 재개 할 수는 없습니다. 동일한 루트의 코 루틴은 동시이지만 병렬이 아닙니다!

1.5. 파이썬 asyncawait

설명은 지금까지 생성기의 용어 yieldyield from어휘를 명시 적으로 사용했습니다 . 기본 기능은 동일합니다. 새로운 Python3.5 구문 asyncawait주로 명확성을 위해 존재한다.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

async forasync with당신이 휴식 때문에 문이 필요하다 yield from/await베어 체인 forwith문을.

2. 간단한 이벤트 루프의 구조

그 자체로 코 루틴은 다른 코 루틴에 대한 제어권을 양보하는 개념이 없습니다 . 코 루틴 스택의 맨 아래에있는 호출자에게만 제어를 양보 할 수 있습니다. 이 호출자는 다른 코 루틴으로 전환하여 실행할 수 있습니다.

여러 코 루틴의이 루트 노드는 일반적으로 이벤트 루프입니다 . 일시 중단시 코 루틴은 재개하려는 이벤트생성합니다 . 결과적으로 이벤트 루프는 이러한 이벤트가 발생하기를 효율적으로 기다릴 수 있습니다. 이를 통해 다음에 실행할 코 루틴 또는 재개하기 전에 대기하는 방법을 결정할 수 있습니다.

이러한 디자인은 루프가 이해하는 미리 정의 된 이벤트 집합이 있음을 의미합니다. await마지막으로 이벤트가 await편집 될 때까지 여러 코 루틴이 서로 연결 됩니다. 이 이벤트는 제어 를 통해 이벤트 루프와 직접 통신 할 수 있습니다 yield.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

핵심은 코 루틴 서스펜션을 통해 이벤트 루프와 이벤트가 직접 통신 할 수 있다는 것입니다. 중간 코 루틴 스택은 필요하지 않습니다 어떤 루프를 실행중인 대한 지식이나 방법 이벤트 작업을.

2.1.1. 시간 이벤트

처리하기 가장 간단한 이벤트는 특정 시점에 도달하는 것입니다. 이것은 스레드 코드의 기본 블록이기도합니다. 스레드 sleep는 조건이 참이 될 때까지 반복적으로 s입니다. 그러나 일반 sleep블록 실행 자체는 다른 코 루틴이 차단되지 않기를 원합니다. 대신 현재 코 루틴 스택을 재개해야하는시기를 이벤트 루프에 알려야합니다.

2.1.2. 이벤트 정의

이벤트는 단순히 열거 형, 유형 또는 기타 ID를 통해 식별 할 수있는 값입니다. 목표 시간을 저장하는 간단한 클래스로 이것을 정의 할 수 있습니다. 이벤트 정보 저장 하는 것 외에도 await클래스에 직접 허용 할 수 있습니다.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

이 클래스 는 이벤트 저장 합니다. 실제로 이벤트를 처리하는 방법은 언급하지 않습니다.

유일한 특징은 __await__- 그것이 무엇 인 await에 대한 키워드 보인다. 실제로는 반복자이지만 일반 반복 기계에는 사용할 수 없습니다.

2.2.1. 이벤트를 기다리는 중

이제 이벤트가 생겼으니 코 루틴은 어떻게 반응할까요? 우리는 동등한 표현 할 수 있어야한다 sleep에 의해 await우리의 이벤트를 보내고. 무슨 일이 일어나고 있는지 더 잘보기 위해 절반의 시간 동안 두 번 기다립니다.

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

이 코 루틴을 직접 인스턴스화하고 실행할 수 있습니다. 생성기와 유사하게 using coroutine.sendyield결과가 나올 때까지 코 루틴 실행합니다 .

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

이것은 우리에게 두 개의 AsyncSleep이벤트를 제공 StopIteration하고 코 루틴이 완료되면 하나를 제공합니다. 유일한 지연은 time.sleep루프 에서 오는 것 입니다! 각각 AsyncSleep은 현재 시간의 오프셋 만 저장합니다.

2.2.2. 이벤트 + 수면

이 시점에서 우리는 두 가지 별도의 메커니즘을 사용할 수 있습니다.

  • AsyncSleep 코 루틴 내부에서 생성 될 수있는 이벤트
  • time.sleep 코 루틴에 영향을주지 않고 기다릴 수있는

특히,이 두 가지는 직교합니다. 둘 중 어느 것도 다른 것에 영향을 미치거나 트리거하지 않습니다. 결과적으로 .NET sleepFramework의 지연에 대처하기 위한 자체 전략을 마련 할 수 있습니다 AsyncSleep.

2.3. 순진한 이벤트 루프

코 루틴 여러 개인 경우 각 코 루틴이 깨어나고 싶은시기를 알 수 있습니다. 그런 다음 첫 번째 항목이 재개되기를 원할 때까지 기다린 다음 이후 항목을 기다릴 수 있습니다. 특히, 각 지점에서 우리는 다음 항목 에만 관심이 있습니다.

이것은 간단한 스케줄링을 만듭니다.

  1. 원하는 깨우기 시간으로 코 루틴 정렬
  2. 일어나고 싶은 첫 번째 선택
  3. 이 시점까지 기다려
  4. 이 코 루틴을 실행
  5. 1부터 반복합니다.

사소한 구현에는 고급 개념이 필요하지 않습니다. A list는 날짜별로 코 루틴을 정렬 할 수 있습니다. 기다리는 것은 규칙적 time.sleep입니다. 코 루틴을 실행하는 것은 coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

물론 이것은 개선의 여지가 충분합니다. 대기 큐에 힙을 사용하거나 이벤트에 디스패치 테이블을 사용할 수 있습니다. 에서 반환 값을 가져 와서 StopIteration코 루틴에 할당 할 수도 있습니다. 그러나 기본 원칙은 동일합니다.

2.4. 협력 대기

AsyncSleep이벤트와 run이벤트 루프는 타임 이벤트를 완벽하게 작업을 구현합니다.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

이렇게하면 5 개의 코 루틴이 각각 협력 적으로 전환되어 각각 0.1 초 동안 중단됩니다. 이벤트 루프는 동기식이지만 2.5 초가 아닌 0.5 초만에 작업을 실행합니다. 각 코 루틴은 상태를 유지하고 독립적으로 작동합니다.

3. I / O 이벤트 루프

지원하는 이벤트 루프 sleep폴링에 적합합니다 . 그러나 파일 핸들에서 I / O를 기다리는 것은보다 효율적으로 수행 할 수 있습니다. 운영 체제는 I / O를 구현하므로 어떤 핸들이 준비되었는지 알고 있습니다. 이상적으로 이벤트 루프는 명시 적 "I / O 준비"이벤트를 지원해야합니다.

3.1. select전화

Python에는 이미 OS에 읽기 I / O 핸들을 쿼리하는 인터페이스가 있습니다. 읽기 또는 쓰기 핸들과 함께 호출되면 읽기 또는 쓰기 준비 가 된 핸들을 반환합니다 .

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

예를 들어, open쓸 파일을 작성하고 준비 될 때까지 기다릴 수 있습니다.

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

선택이 반환되면 writeable열린 파일이 포함됩니다.

3.2. 기본 I / O 이벤트

AsyncSleep요청과 마찬가지로 I / O에 대한 이벤트를 정의해야합니다. 기본 select로직에서 이벤트는 읽을 수있는 객체 (예 : open파일)를 참조해야 합니다. 또한 읽을 데이터의 양을 저장합니다.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

와 마찬가지로 AsyncSleep우리 대부분은 단지 기본 시스템 호출에 필요한 데이터를 저장합니다. 이번에 __await__는 원하는 amount내용을 읽을 때까지 여러 번 재개 할 수 있습니다. 또한 우리 return는 재개하는 대신 I / O 결과를 얻습니다.

3.3. 읽기 I / O로 이벤트 루프 확대

이벤트 루프의 기초는 여전히 run이전 정의 된 것입니다. 먼저 읽기 요청을 추적해야합니다. 이것은 더 이상 정렬 된 일정이 아니며 읽기 요청 만 코 루틴에 매핑합니다.

# new
waiting_read = {}  # type: Dict[file, coroutine]

이후 select.select시간 초과 매개 변수를, 우리는 대신에 사용할 수 있습니다 time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

이것은 우리에게 읽을 수있는 모든 파일을 제공합니다. 만약 있다면 해당 코 루틴을 실행합니다. 없는 경우 현재 코 루틴이 실행될 때까지 충분히 기다렸습니다.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

마지막으로 실제로 읽기 요청을 들어야합니다.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. 합치기

위의 내용은 약간 단순화되었습니다. 우리가 항상 읽을 수 있다면 잠자는 코 루틴을 굶주 리지 않도록 약간의 전환을해야합니다. 우리는 읽을 것이 없거나 기다릴 것이없는 것을 처리해야합니다. 그러나 최종 결과는 여전히 30 LOC에 맞습니다.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. 협력 I / O

AsyncSleep, AsyncReadrun구현은 이제 수면 및 / 또는 읽기에 완벽하게 작동합니다. 에서와 마찬가지로 sleepy읽기를 테스트하는 도우미를 정의 할 수 있습니다.

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

이를 실행하면 I / O가 대기 작업과 인터리브되는 것을 볼 수 있습니다.

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. 비 블로킹 I / O

파일의 I / O가 개념을 이해하는 동안에는 다음과 같은 라이브러리에는 적합하지 않습니다 asyncio. select호출은 항상 파일대해 반환 되며 둘 다 open둘 다 무기한 차단read 될 수 있습니다 . 이것은 이벤트 루프의 모든 코 루틴을 차단합니다. 스레드 및 동기화와 같은 라이브러리 는 파일의 비 차단 I / O 및 이벤트를 위조합니다.aiofiles

그러나 소켓은 비 차단 I / O를 허용하며 고유 한 대기 시간으로 인해 훨씬 ​​더 중요합니다. 이벤트 루프에서 사용될 때, 데이터 대기 및 재 시도는 아무것도 차단하지 않고 래핑 될 수 있습니다.

4.1. 비 블로킹 I / O 이벤트

우리 AsyncRead유사하게 소켓에 대한 suspend-and-read 이벤트를 정의 할 수 있습니다. 파일을 가져 오는 대신 소켓을 가져옵니다.이 소켓은 차단되지 않아야합니다. 또한, 우리의 __await__사용 socket.recv대신에 file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

반면에 AsyncRead, __await__수행은 참으로 I / O를 비 차단. 데이터를 사용할 수 있으면 항상 읽습니다. 사용 가능한 데이터가 없으면 항상 일시 중지됩니다. 즉, 유용한 작업을 수행하는 동안에 만 이벤트 루프가 차단됩니다.

4.2. 이벤트 루프 차단 해제

이벤트 루프에 관한 한 큰 변화는 없습니다. 수신 대기 할 이벤트는으로 준비된 것으로 표시된 파일 설명자인 파일과 동일합니다 select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

이 시점에서, 그 명백해야 AsyncRead하고 AsyncRecv이벤트 같은 종류입니다. 교환 가능한 I / O 구성 요소 가있는 하나의 이벤트로 쉽게 리팩터링 할 수 있습니다 . 실제로 이벤트 루프, 코 루틴 및 이벤트 는 스케줄러, 임의의 중간 코드 및 실제 I / O를 명확하게 분리 합니다.

4.3. 논 블로킹 I / O의 추악한면

원칙적으로이 시점에서해야 할 일은 for 의 논리를 복제 read하는 것 recv입니다 AsyncRecv. 그러나 이것은 훨씬 더 추악합니다. 커널 내부에서 함수가 차단 될 때 조기 리턴을 처리해야하지만 제어권을 양보해야합니다. 예를 들어, 연결을 여는 것보다 파일을 여는 것이 훨씬 더 깁니다.

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

간단히 말해서, 남은 것은 수십 줄의 예외 처리입니다. 이벤트와 이벤트 루프는 이미이 시점에서 작동합니다.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

추가

github의 예제 코드


귀하의 coro탈당은 개념적으로 정확하지만 약간 불완전합니다.

await무조건 일시 중단되지는 않지만 차단 호출이 발생한 경우에만 중단됩니다. 통화가 차단되고 있음을 어떻게 알 수 있습니까? 이것은 대기중인 코드에 의해 결정됩니다. 예를 들어 대기 가능한 소켓 읽기 구현은 다음과 같이 정의 할 수 있습니다.

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

실제 asyncio에서 동등한 코드Future매직 값을 반환 하는 대신 a의 상태를 수정 하지만 개념은 동일합니다. 생성기와 같은 객체에 적절하게 적용되면 위의 코드를 await편집 할 수 있습니다 .

발신자 측에서 코 루틴이 다음을 포함 할 때 :

data = await read(sock, 1024)

그것은 다음과 가까운 것으로 분해됩니다.

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

People familiar with generators tend to describe the above in terms of yield from which does the suspension automatically.

The suspension chain continues all the way up to the event loop, which notices that the coroutine is suspended, removes it from the runnable set, and goes on to execute coroutines that are runnable, if any. If no coroutines are runnable, the loop waits in select() until either a file descriptor a coroutine is interested in becomes ready for IO. (The event loop maintains a file-descriptor-to-coroutine mapping.)

In the above example, once select() tells the event loop that sock is readable, it will re-add coro to the runnable set, so it will be continued from the point of suspension.

In other words:

  1. Everything happens in the same thread by default.

  2. The event loop is responsible for scheduling the coroutines and waking them up when whatever they were waiting for (typically an IO call that would normally block, or a timeout) becomes ready.

For insight on coroutine-driving event loops, I recommend this talk by Dave Beazley, where he demonstrates coding an event loop from scratch in front of live audience.


It all boils down to the two main challenges that asyncio is addressing:

  • How to perform multiple I/O in a single thread?
  • How to implement cooperative multitasking?

The answer to the first point has been around for a long while and is called a select loop. In python, it is implemented in the selectors module.

The second question is related to the concept of coroutine, i.e. functions that can stop their execution and be restored later on. In python, coroutines are implemented using generators and the yield from statement. That's what is hiding behind the async/await syntax.

More resources in this answer.


EDIT: Addressing your comment about goroutines:

The closest equivalent to a goroutine in asyncio is actually not a coroutine but a task (see the difference in the documentation). In python, a coroutine (or a generator) knows nothing about the concepts of event loop or I/O. It simply is a function that can stop its execution using yield while keeping its current state, so it can be restored later on. The yield from syntax allows for chaining them in a transparent way.

Now, within an asyncio task, the coroutine at the very bottom of the chain always ends up yielding a future. This future then bubbles up to the event loop, and gets integrated into the inner machinery. When the future is set to done by some other inner callback, the event loop can restore the task by sending the future back into the coroutine chain.


EDIT: Addressing some of the questions in your post:

How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter?

No, nothing happens in a thread. I/O is always managed by the event loop, mostly through file descriptors. However the registration of those file descriptors is usually hidden by high-level coroutines, making the dirty work for you.

What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?

An I/O is any blocking call. In asyncio, all the I/O operations should go through the event loop, because as you said, the event loop has no way to be aware that a blocking call is being performed in some synchronous code. That means you're not supposed to use a synchronous open within the context of a coroutine. Instead, use a dedicated library such aiofiles which provides an asynchronous version of open.

참고URL : https://stackoverflow.com/questions/49005651/how-does-asyncio-actually-work

반응형