asyncio.Queue生产者消费者实现示例

import asyncio


async def consumer(n, q):
    print('consumer {}: starting'.format(n))
    while True:
        print('consumer {}: waiting for item'.format(n))
        item = await q.get()
        print('consumer {}: has item {}'.format(n, item))
        if item is None:
            # None 是一个停止信号。
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01)
            q.task_done()
    print('consumer {}: ending'.format(n))


async def producer(q, num_workers):
    print('producer: starting')
    # 向队列中添加一些数字来模拟作业
    for i in range(num_workers * 30):
        await q.put(i)
        print('生产者: 添加任务 {} 到队列'.format(i))
        # 添加 None 到队列
    # 发出消费者退出的信号
    print('生产者: 向队列添加停止信号')
    for i in range(num_workers):
        await q.put(None)
    print('producer: waiting for queue to empty')
    await q.join()
    print('producer: ending')


async def main(loop, num_consumers):
    # 创建具有固定大小的队列,用于生产者
    # 将阻塞,直到消费者取出一些item。
    q = asyncio.Queue(maxsize=num_consumers)

    # 消费者任务调度
    consumers = [
        loop.create_task(consumer(i, q))
        for i in range(num_consumers)
    ]

    # 生产者任务调度
    prod = loop.create_task(producer(q, num_consumers))

    # 等待所有协程完成。
    await asyncio.wait(consumers + [prod])


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, 2))
finally:
    event_loop.close()