V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
dawnzhu
V2EX  ›  问与答

Python 协程任务卡住不动

  •  
  •   dawnzhu · 2020-05-27 09:25:23 +08:00 · 1886 次点击
    这是一个创建于 1441 天前的主题,其中的信息可能已经有所发展或是发生改变。

    在使用 python 协程下载图片中,最终协程的任务数卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久

    这个是运行的结果,在任务数为 80 一直卡着

    队列是否为空.... 80
    队列是否为空.... 80
    .
    .
    .
    
    队列是否为空.... 80
    队列是否为空.... 80
    

    下面贴上代码

    from lxml import etree
    import os
    import pandas as pd
    import asyncio
    import aiohttp
    from random import randint
    import cchardet
    import aiofiles
    import logging
    
    
    class sikupicture_Spider(object):
        def __init__(self):
            # self.seens_url = []
            self.loop = asyncio.get_event_loop()
            self.queue = asyncio.PriorityQueue()
            self._workers = 0  # 当前工作数
            self._max_workers = 150  # 最大工作数
            self.overtime = {}  # {url: times,} 记录失败的 URL 的次数
            self.overtime_threshold = 4
            self.headers = {
                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36",
            }
            self.list_content = []
    
        async def init_url(self):
            info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('')
            for ite in info.itertuples():
                await self.queue.put((randint(1, 5), getattr(ite, 'url')))
    
        async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None):
            _headers = self.headers
            if headers:
                _headers = headers
            try:
                async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                    status_code = resp.status
                    if status_code == 403:
                        print("url-403", url)
                        if url in self.overtime:
                            self.overtime[url] += 1
                            if self.overtime[url] > self.overtime_threshold:
                                pass
                            await self.queue.put((randint(1, 5), url))
                        else:
                            self.overtime[url] = 1
                            await self.queue.put((randint(1, 5), url))
                        status_code = 0
                        html = None
                    if binary:
                        text = await resp.read()
                        encoding = cchardet.detect(text)
                        html = text.encode(encoding, errors='ignore')
                    else:
                        html = await resp.text()
    
            except TimeoutError:
                print("url-overtime", url)
                if url in self.overtime:
                    self.overtime[url] += 1
                    if self.overtime[url] > self.overtime_threshold:
                        pass
                    await self.queue.put((randint(1, 5), url))
                else:
                    self.overtime[url] = 1
                    await self.queue.put((randint(1, 5), url))
                status_code = 0
                html = None
            return status_code, html
    
        async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None):
            _headers = self.headers
            if headers:
                _headers = headers
            try:
                async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                    status_code = resp.status
                    if binary:
                        html = await resp.read()
                    else:
                        html = await resp.text()
            except TimeoutError:
                print("url-overtime", img_url)
                if url in self.overtime:
                    self.overtime[url] += 1
                    if self.overtime[url] > self.overtime_threshold:
                        pass
                    else:
                        await self.queue.put((randint(1, 5), url))
                else:
                    self.overtime[url] = 1
                    await self.queue.put((randint(1, 5), url))
                status_code = 0
                html = None
            return status_code, html
    
        def parse_source(self, source):
            try:
                response_1 = etree.HTML(source)
            except Exception as err:
                logging.error(f'parse error:{err}')
                url = ""
            else:
                img_url = response_1.xpath("//a[@href='javascript:;']/@supsrc")[0] if len(
                    response_1.xpath("//a[@href='javascript:;']/@supsrc")[0]) else ""
            return img_url
    
        async def process(self, session, url, timeout):
            status, source = await self.fetch(session, url, timeout)
            file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "")
            if status == 200:
                img_url = self.parse_source(source)
                img_status, img_source = await self.download_img(session, img_url, timeout, url)
                if img_status == 200:
                    async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f:
                        await f.write(img_source)
                self._workers -= 1
                print("任务完成", self._workers, "url_status", status, "img_status", img_status)
            else:
                self._workers -= 1
                print("任务完成", self._workers, "url_status", status,)
    
        async def loop_crawl(self):
            await self.init_url()
            timeout = aiohttp.ClientTimeout(total=20)
            conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True)
            session = aiohttp.ClientSession(connector=conn, timeout=timeout)
            while True:
                if self._workers >= self._max_workers:
                    print("work 的判断")
                    await asyncio.sleep(5)
                    continue
                if self.queue.empty():
                    print("队列是否为空....", self._workers)
                    await asyncio.sleep(5)
                    if self._workers == 0:
                        break
                    continue
                _, url = await self.queue.get()
                asyncio.ensure_future(self.process(session, url, timeout))
                self._workers += 1
                print("队列剩余数量", self.queue.qsize(), self._workers)
            await session.close()
    
        def run(self):
            try:
                self.loop.run_until_complete(self.loop_crawl())
            except KeyboardInterrupt:
                self.loop.close()
    
    if __name__ == '__main__':
        sp = sikupicture_Spider()
        sp.run()
    
    6 条回复    2020-05-27 14:15:50 +08:00
    itskingname
        1
    itskingname  
       2020-05-27 09:37:02 +08:00
    试一试把 conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True) 里面的 limit 参数调整到 500
    dawnzhu
        2
    dawnzhu  
    OP
       2020-05-27 09:41:11 +08:00
    @itskingname 谢谢大佬。我试下,这个是什么原因呢,并发数量少?
    dawnzhu
        3
    dawnzhu  
    OP
       2020-05-27 09:48:23 +08:00
    @itskingname 不行的
    Vegetable
        4
    Vegetable  
       2020-05-27 09:53:42 +08:00
    fetch 中捕获了超时,其他异常还是有可能向上抛出的,而协程中的异常未处理异常是不会终止程序,只是会输出一段
    Task exception was never retrieved
    这样的信息。
    process 里并没有捕获异常,一旦出现异常会出现_worker 不能正确扣减,while 循环就无法跳出了
    目前只看到这个可能。你这个代码写的很有意思,工工整整的,但是很多地方都挺底层的,比如手动管理 worker 数量而不是 Semaphore,用 aiohttp 而不是 httpx
    dawnzhu
        5
    dawnzhu  
    OP
       2020-05-27 09:57:38 +08:00
    @Vegetable 明白了,应该是这个问题,谢谢了
    ruanimal
        6
    ruanimal  
       2020-05-27 14:15:50 +08:00
    @Vegetable 估计是 c 程序员出身
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3491 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 11:48 · PVG 19:48 · LAX 04:48 · JFK 07:48
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.