V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
firejoke
V2EX  ›  Python

关于 asyncio 执行 IO 密集型操作的不解

  •  
  •   firejoke · 64 天前 · 2860 次点击
    这是一个创建于 64 天前的主题,其中的信息可能已经有所发展或是发生改变。

    有一个读文件然后写数据库的操作,想尝试使用协程。
    使用协程的:

    async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):
        ql = len(context_qs)
        i = 0
        # 每一个 Queue 放 step 个数据就切换下一个
        step = 2
        with open(file_path, encoding="utf8") as f:
            for text in f:
                if i // step == ql:
                    i = 0
                context_q = context_qs[i // step]
                context = {}
                text = re.findall(r"\d+", text)
                if text:
                    context = {"解析然后组装成 dict"}
                    await context_q.put(context)
                    # 这里如果不 join ,会一直在这个 for 循环里不出去
                    await context_q.join()
                    i = i + 1
            else:
                await context_q.put("结束标记")
                return
    
    
    async def write_db(context_q: asyncio.Queue, model: ModelBase):
        async with AsyncSession() as session:
            while 1:
                context = await context_q.get()
                if context["结束标记"] == "end":
                    return
                info, obj = None, None
                try:
                    if context["info"]:
                        info = await session.execute(
                            select(InfoModel).filter(
                                InfoModel.attr == context["info"]
                            )
                        )
                        info = info.scalars().one_or_none()
                        if not info:
                            info = InfoModel(attr=context["info"])
                            session.add(info)
                    if context["header"]:
                        obj = await session.execute(
                            select(model).filter(
                                model.header == context["header"]
                            ).options(selectinload(getattr(model, "info")))
                        )
                        obj = obj.scalars().one_or_none()
                        if not obj:
                            obj = model(header=context["header"])
                            session.add(obj)
                    if obj or info:
                        if info not in obj.info:
                            obj.info.append(info)
                            session.add(obj)
                        await session.commit()
                except Exception as e:
                    await session.rollback()
                    raise e
                else:
                    context_q.task_done()
    
    
    async def main():
    	# 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法
        c_q_count = 3
        a_context_qs = [asyncio.Queue() for i in range(c_q_count)]
        b_context_qs = [asyncio.Queue() for i in range(c_q_count)]
        tasks = [
            asyncio.create_task(
                parse_text(Path("a.txt"), a_context_qs)
            ),
            asyncio.create_task(
                parse_text(Path("b.txt"), b_context_qs)
            ),
        ]
        for i in range(c_q_count):
            tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))
            tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))
        await asyncio.gather(*tasks)
    
    
    
    if __name__ == '__main__':
        asyncio.run(main(), debug=settings.DEBUG)
    
    

    不使用协程的:

    def sync_read_file():
        af = Path("a.txt").open(encoding="utf8")
        bf = Path("b.txt").open(encoding="utf8")
        with Session() as session:
            while 1:
                if af:
                    try:
                        text = af.readline()
                        context = parse_text(text)
                        sync_write_db(session, context, AModel)
                    except IOError:
                        af.close()
                        af = None
                if bf:
                    try:
                        text = bf.readline()
                        context = parse_text(text)
                        sync_write_db(session, context, BModel)
                    except IOError:
                        bf.close()
                        bf = None
                if not af and not bf:
                    return
    
    
    def sync_write_db(session, context, model):
        info, obj = None, None
        try:
            if context["info"]:
                info = session.execute(
                    select(Info).filter(
                        Info.attr == context["info"]
                    )
                )
                info = info.scalars().one_or_none()
                if not info:
                    info = Info(attr=context["info"])
                    session.add(info)
            if context["header"]:
                obj = session.execute(
                    select(model).filter(model.info == context["info"]))
                obj = obj.scalars().one_or_none()
                if not obj:
                    obj = model(info=context["info"])
                    session.add(obj)
            if obj or info:
                if info not in obj.info:
                    obj.info.append(info)
                    session.add(obj)
                session.commit()
        except Exception as e:
            session.rollback()
            raise e
    
    
    if __name__ == '__main__':
        sync_read_file()
    
    

    这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
    不知道是我协程的用法有问题?还是说有别的什么原因?

    35 条回复    2021-11-23 22:00:11 +08:00
    long2ice
        1
    long2ice  
       63 天前
    试试 aiofiles 之类的,你的文件 IO 还是同步的
    Trim21
        2
    Trim21  
       63 天前
    with open(file_path, encoding="utf8") as f:
    for text in f:

    这两行都是阻塞的
    firejoke
        3
    firejoke  
    OP
       63 天前
    @long2ice #1
    @Trim21 #2
    文件这里只是读取,然后放进队列里,这也会导致阻塞吗?
    Trim21
        4
    Trim21  
       63 天前
    @firejoke #3 同步 io 会阻塞掉整个事件循环。
    firejoke
        5
    firejoke  
    OP
       63 天前
    @Trim21 #4 所以,也会导致如果不主动用队列的 join 阻塞住,就不会跳到其他 await 的地方?
    Nitroethane
        6
    Nitroethane  
       63 天前 via iPhone
    @firejoke 如果读取文件速度比较慢,而且文件比较大的话影响应该比较明显
    firejoke
        7
    firejoke  
    OP
       63 天前
    @Nitroethane #6 每行数据小于 1kb ,而且是用的 for ,这里相当于一个生成器
    Trim21
        8
    Trim21  
       63 天前
    @firejoke #5 不是,你的代码中仅仅会阻塞在 open 和 for text in f 这两行。在等待这两行底层的同步 io 完成的时间里是不会运行其他 task 的。
    firejoke
        9
    firejoke  
    OP
       63 天前
    @Trim21 #8 我改成了 asyncfiles ,然后把队列的 join 去掉了,这次成功跳到了其他 await 的位置,确实如你所说,感谢!
    但测试发现,虽然没了 io 的阻塞,但写入速度还是没太大变化,他每读一行,切到其他 task ,和我之前没读一行,join 住,就执行流程来说,是不是没差?
    Trim21
        10
    Trim21  
       63 天前 via Android
    我没仔细看完整的代码,只是看到一开始就有同步阻塞的问题就回复了。
    locoz
        11
    locoz  
       63 天前
    目测是正则导致的阻塞...有一说一你这种情况不太适合用 asyncio ,或者说不太适合没有包上隐式多进程的 asyncio ,毕竟不是纯粹的 IO 操作。然后文件操作方面 aiofiles 实际背后也是靠线程池跑的,这一点需要注意一下,有时候可能会导致踩坑。
    documentzhangx66
        12
    documentzhangx66  
       63 天前
    先监视一下设备性能极限。
    iostat -x -m -d 1
    LeeReamond
        13
    LeeReamond  
       63 天前
    大概看了一眼楼上说的应该没问题,并非所有类型的任务都能通过异步加速,你要做好心理准备。另外 aiofiles 的实现其实很丑陋。。楼上说是线程池跑的,我有点忘记具体情况了,只记得以前读源码的印象是很丑陋。。
    Contextualist
        14
    Contextualist  
       63 天前
    看上去没有明显的问题,不过对于任何为了改进性能的重写建议还是先 profile 一下,看看瓶颈到底出在哪个调用上。

    然后异步文件 IO 不是为了提升性能(降低平均延迟)的,而是为了降低尾延迟的,参见: https://trio.readthedocs.io/en/stable/reference-io.html#background-why-is-async-file-i-o-useful-the-answer-may-surprise-you
    2i2Re2PLMaDnghL
        15
    2i2Re2PLMaDnghL  
       63 天前
    (我会尝试先把所有信息读进内存然后 timeit 数据库部分,看瓶颈是不是文件
    lesismal
        16
    lesismal  
       63 天前
    不是说给函数加上异步就是一切都异步了:
    1. 异步的函数 A
    2. A 内部调用 B C D ,B C D 有任意同步阻塞的行为,A 也一样跟着阻塞

    py 的性能痛点远不只是 asyncio 就能解决的了的,how about trying golang -_-
    firejoke
        17
    firejoke  
    OP
       63 天前
    @locoz #11 我也感觉似乎没发挥出 asyncio 的优势,每一条数据都不超过 1kb ,所以可能除了数据库操作稍微耗时长一点,其他地方等待的很少,所以和单线程的性能差不多?另外请教一下,“没有包上隐式多进程” 具体是指什么呢?
    firejoke
        18
    firejoke  
    OP
       63 天前
    @documentzhangx66 #12 设备性能应该没问题,12 核 24 线程,64G 内存,磁盘读取速度也没有跑满,IO 读写也不是特别高。
    firejoke
        19
    firejoke  
    OP
       63 天前
    @LeeReamond #13 嗯,我昨天也想了一下,如果每一步阻塞住的操作实际上都很快,那 asyncio 其实发挥不出切换等待的优势。
    locoz
        20
    locoz  
       63 天前
    @firejoke #17 建议用调试工具或者排除法看看具体是哪里拖慢了,单看代码和前面的讨论我感觉是正则部分导致的。

    前面没讲清楚,“包上隐式多进程的 asyncio”指的是把多进程和协程结合,开一堆子进程然后每个子进程一个 eventloop ,因为之前有看到过一个专门的库把这部分操作给隐式处理了,使用起来两三行搞定,不需要自己写进程管理部分。然后一些框架其实也会隐式地做这种结合处理来提高效率。
    firejoke
        21
    firejoke  
    OP
       63 天前
    @Contextualist #14 看文档的意思,是说用异步文件 IO ,在从内存读取时反倒会变慢,在从磁盘读取的时候会加快,在不同环境下其结果是不可预测的。那我如果单独用一个进程读取文件到内存,然后另一个进程从内存读取然后再操作,应该可以绕开这个问题。
    firejoke
        22
    firejoke  
    OP
       63 天前
    @locoz #20 我昨天最后也是改成用多进程了,一个进程专门读文件,然后放进队列,其他子进程从队列读,然后操作数据库,那看来我思路没跑偏。还有其他的解法吗?多进程和协程的结合,一般都是以多进程为主吗?
    Contextualist
        23
    Contextualist  
       63 天前
    又看了一下你贴出来文件的部分,你是不是就两个大文件(就是说不是大量小文件),那文件 IO 就基本不可能是你的瓶颈,你看到磁盘读取没跑满很有可能是你下游的处理速度没跟上。

    多进程和协程,感觉你自己也总结出来了。协程得用在有长时间等待系统调用 (syscall) 的地方(比如网络、子进程、定时任务)。CPU 密集的操作得用多线程或多进程,但在 Python 里有 GIL ,就只能用多进程。
    firejoke
        24
    firejoke  
    OP
       63 天前
    @Contextualist #23 是的,就是两个大文件,所以我也觉得文件 IO 不是我这里的瓶颈,协程在这个场景中没体现出他的优势,我已经改成了多进程了。
    Contextualist
        25
    Contextualist  
       63 天前
    我对数据库不熟,不过我猜对于很多数据库并发写是不会有性能提升的,用单线程就可以了,但你可能需要 batch / bulk 操作,用来一次性插入数十条、数百条数据,而不是一次插入一条。
    O5oz6z3
        26
    O5oz6z3  
       63 天前
    虽然不懂,看完楼上感觉原因之一在于 asyncio 的上限就是单线程,而单线程吞吐量不如多线程?
    firejoke
        27
    firejoke  
    OP
       63 天前
    @Contextualist #25 对欸!资源是消耗在每一条查询和写入的操作上,如果批量写,就可以降低写入频率,至于查询,我已经在查询字段上加了索引,我改一下试试。感谢~
    然后我看到你之前提到的 trio ,看他的文档像是涉及到异步操作的都有涉及,感觉非常不错啊。
    firejoke
        28
    firejoke  
    OP
       63 天前
    @O5oz6z3 #26 不是,当不存在较长的 io 等待的时候,协程和单线程没差。
    yufpga
        29
    yufpga  
       63 天前
    大概看了下, 这瓶颈显然不是在 parse_text 中的文件读,就算再怎么阻塞,读写本地文件也不至于到每秒才 400 行的程度. 而在 write_db 中, 出现好几处 await 的地方, 这些地方可都是要同步等待结果返回的呀. 一个很好容易验证的方法就是把 write_db 中的 await 用 await asyncio.sleep 替换掉, 尝试不同的 sleep 时间. 实际上上面的问题在于每一次 while 1 的循环循环是同步的, 你必须要先处理完队列中的前一条数据, 才能继续处理下一条数据. 所以处理也很简单, 把每一次的循环异步化掉.
    hustlibraco
        30
    hustlibraco  
       62 天前
    用```async for```替代```for```可以吗?
    firejoke
        31
    firejoke  
    OP
       62 天前
    @hustlibraco #30 换成异步文件读,就可以换成 async for 了。
    firejoke
        32
    firejoke  
    OP
       62 天前
    @yufpga #29 我看日志里,我同时开了好多个 task ,这个 task 的循环里 await query 或 add 或 commit ,就会跳到另一个 task 的循环里的 query 或 add 或 commit 。
    yufpga
        33
    yufpga  
       62 天前
    @firejoke 是我看差了, 我以为只有一个 queue, 而你的代码里是两个 context, 各自 3 个 queue, 也就是总共 6 个 queue, 对应 6 个 write_db 的 task. 当遇到 await 的时候, 确实是会跳转到别的 task 里面执行. 确实比较奇怪,但我仍然觉得瓶颈不大可能在 parse_text, 你可以试着记录一下队列写入数据的速率, 如果这个速率也在 400/s 左右, 那说明确实有可能是 parse_text 慢了
    ohayoo
        34
    ohayoo  
       61 天前
    @firejoke 老哥可以分享下多进程版本的代码吗?
    firejoke
        35
    firejoke  
    OP
       61 天前
    @ohayoo #34 还在调试多进程和协程的组合,后面会贴一下的。
    关于   ·   帮助文档   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2418 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 14:51 · PVG 22:51 · LAX 06:51 · JFK 09:51
    ♥ Do have faith in what you're doing.