一次算法读图超时引起的urllib3源码分析

号主从事深度学习算法服务开发多年,2022年二月的最后一天,出炉一个刚接触算法服务时困扰许久的“头号难题”。介于篇幅源码较多,预计耗时27分钟,各位人才看官调整好心情给个好评:点赞、评论、转发

故事上下文

算法服务处理处理流程:

输入image_url -> 读取图片image -> 图片预处理(解压缩/RGB_BGR/缩放等) -> 算法推理 -> 输出结果

问题:发现某算法A,单独测试推理<50ms,但是整个流程花费200ms~3s,明显不正常,头大!!!

首先进行读图性能测试

读图方式

读小图性能(约10k)

读大图性能(约700k)

备注

外网读图

0.0208

0.0713

内网读图

0.0119

0.0592

本地读图

0.0006

0.0283

对比分析,发现可能造成超时的原因包含两种,“网络速度”或者“网络抖动”

直接上解决方案

  • 问题1:网络环境引起的超时问题?
    • 读图组件代码固定,切换图片链接至内网读图,快速解决问题
  • 问题2:网络抖动引起的等待超时问题?
    • 快速断开,快速重试解决问题

由于此处读图选用的urllib3,后续的章节我们着重分析下urllib3的超时部分源码

urllib架构

urllib3是一个功能强大,条理清晰,用于HTTP客户端的Python库,许多Python的原生系统已经开始使用urllib3

image1

urllib源码分析

分析__init__.py就可以得出对外提供的功能

__all__ = (
    "HTTPConnectionPool",  # http模式连接池
    "HTTPHeaderDict",      # 请求头词典
    "HTTPSConnectionPool", # https模式连接池
    "PoolManager",      # 池管理类,self.poos映射类型,保存连接信息
    "ProxyManager",     # 代理池管理类,行为同PoolManager
    "HTTPResponse",     # 返回对象
    "Retry",       # 精细化控制重试和重定向 retries=Retry(3, redirect=2)
    "Timeout",     # 精细化控制超时 timeout=Timeout(connect=1.0, read=2.0)
    "add_stderr_logger",    # 修改默认日志的级别
    "connection_from_url",  # 返回HTTPConnectionPool或HTTPSConnectionPool实例
    "disable_warnings",     # 禁用warnings
    "encode_multipart_formdata", # dict 转换成 form-data
    "make_headers",              # 生成request headers 快捷函数
    "proxy_from_url",            # 返回ProxyManager对象
    "request",                   # 请求方法
)
  • RetryTimeout参数为对重试和超时逻辑的简单封装
  • HTTPResponse参数为对返回数据的Model封装

以下是urllib3 主干类层次结构

image2

分析源码的方式有很多中,其中问题导向最可靠,以下我们从Timeout进行分析urllib3源码

urllib3.exceptions.ConnectTimeoutError: (<urllib3.connection.HTTPConnection object at 0x7fc862ecb518>, 'Connection to xxx.xxx.com timed out. (connect timeout=0.0001)')

urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='xxx.xxx.com', port=80): Read timed out. (read timeout=0.0001)

ConnectTimeoutError:连接超时;ReadTimeoutError:读取超时,我们先看一个请求验证的Demo:

# -*- coding: utf-8 -*-
def image_url_demo1(image_url, timeout=3):
    import socket
    import urllib2
    try:
        data = urllib2.urlopen(image_url, timeout=timeout).read()
        return data
    except urllib2.URLError as e:
        raise e
    except socket.timeout as e:
        raise e
    except Exception as e:
        raise e

import urllib3
def image_url_demo2(image_url, timeout=urllib3.Timeout(connect=5, read=5)):
    try:
        api_http = urllib3.PoolManager()
        r = api_http.request('GET', image_url, timeout=timeout)
        return r.data
    except urllib3.exceptions.MaxRetryError as e:
        raise e
    except urllib3.exceptions.ConnectTimeoutError as e:
        raise e
    except urllib3.exceptions.ReadTimeoutError as e:
        raise e
    except Exception as e:
        raise e

def image_url_demo3(image_url, timeout=urllib3.Timeout(connect=5, read=5)):
    try:
        api_http = urllib3.PoolManager()
        # r = api_http.request('GET', image_url, timeout=timeout)
        r = api_http.request('GET', image_url, timeout=timeout, retries=False)
        return r.data
    except urllib3.exceptions.MaxRetryError as e:
        raise e
    except urllib3.exceptions.ConnectTimeoutError as e:
        raise e
    except urllib3.exceptions.ReadTimeoutError as e:
        raise e
    except Exception as e:
        raise e

if __name__ == '__main__':
    image_url = 'http://xxx.xxx.com/dxGnGYXcNDbNpERoLBxSoekayqw9E.jpg'
    # image = image_url_demo1(image_url, 1)
    # image = image_url_demo1(image_url, 0.000001)
    # print(type(image))

    # image = image_url_demo2(image_url)
    # image = image_url_demo2(image_url, urllib3.Timeout(connect=0.0001, read=1))
    # image = image_url_demo2(image_url, urllib3.Timeout(connect=1, read=0.0001))
    # print(type(image))

    image = image_url_demo3(image_url)
    # image = image_url_demo3(image_url, urllib3.Timeout(connect=0.0001, read=1))
    image = image_url_demo3(image_url, urllib3.Timeout(connect=1, read=0.0001))
    print(type(image))

由上可知urllib3的超时设置,其实就是封装了socket的超时设置,以下是socket的超时设置逻辑,包含请求超时接收超时

请求建立超时设置

  import socket
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.settimeout(5)
  sock.connect((host, port))
  # 恢复默认超时设置,设置某些情况下socket进入阻塞模式(如makefile)
  sock.settimeout(None)
  sock.connect((host, port))
  sock.sendall('xxx')
  sock.recv(1024)
  sock.close()

数据接收超时设置

  import socket
  socket.setdefaulttimeout(5)
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.connect((host, port))
  sock.sendall('xxx')
  # 连接和接收的时候都设置一次超时
  sock.settimeout(5)
  sock.recv(1024)
  sock.close()

由上urllib3主干类层次结构可知,请求的建立的源码封装在HTTPConnection中,所以,我们想查看的ConnectTimeoutError源码也在该类中,具体如下:

 // https://github.com/urllib3/urllib3/blob/main/src/urllib3/connection.py#L191
 def _new_conn(self) -> socket.socket:
        """Establish a socket connection and set nodelay settings on it.
        :return: New socket connection.
        """
        try:
            conn = connection.create_connection(
                (self._dns_host, self.port),
                self.timeout,
                source_address=self.source_address,
                socket_options=self.socket_options,
            )
        except socket.gaierror as e:
            raise NameResolutionError(self.host, self, e) from e
        except SocketTimeout as e:
            raise ConnectTimeoutError(
                self,
                f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
            ) from e
        except OSError as e:
            raise NewConnectionError(
                self, f"Failed to establish a new connection: {e}"
            ) from e

        return conn

connection.create_connection具体实现了连接的创建,具体如下:

// https://github.com/urllib3/urllib3/blob/main/src/urllib3/util/connection.py#L29
def create_connection(
    address: Tuple[str, int],
    timeout: Optional[float] = SOCKET_GLOBAL_DEFAULT_TIMEOUT,
    source_address: Optional[Tuple[str, int]] = None,
    socket_options: Optional[_TYPE_SOCKET_OPTIONS] = None,
) -> socket.socket:
   
    host, port = address
    if host.startswith("["):
        host = host.strip("[]")
    err = None

    # Using the value from allowed_gai_family() in the context of getaddrinfo lets
    # us select whether to work with IPv4 DNS records, IPv6 records, or both.
    # The original create_connection function always returns all records.
    family = allowed_gai_family()

    try:
        host.encode("idna")
    except UnicodeError:
        raise LocationParseError(f"'{host}', label empty or too long") from None

    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
        af, socktype, proto, canonname, sa = res
        sock = None
        try:
         // 和直接使用 socket 设置方式一致
            sock = socket.socket(af, socktype, proto)

            # If provided, set socket level options before connecting.
            _set_socket_options(sock, socket_options)

            if timeout is not SOCKET_GLOBAL_DEFAULT_TIMEOUT:
                sock.settimeout(timeout)
            if source_address:
                sock.bind(source_address)
            sock.connect(sa)
            # Break explicitly a reference cycle
            err = None
            return sock

        except OSError as _:
            err = _
            if sock is not None:
                sock.close()

    if err is not None:
        try:
            raise err
        finally:
            # Break explicitly a reference cycle
            err = None
    else:
        raise OSError("getaddrinfo returns an empty list")

由上urllib3主干类层次结构可知,请求的读取的源码封装在HTTPConnectionPool中,所以,我们想查看的ReadTimeoutError源码也在该类中,具体如下:

 // https://github.com/urllib3/urllib3/blob/main/src/urllib3/connectionpool.py#L362
 def _raise_timeout(
        self,
        err: Union[BaseSSLError, OSError, SocketTimeout],
        url: str,
        timeout_value: _TYPE_TIMEOUT,
    ) -> None:
        """Is the error actually a timeout? Will raise a ReadTimeout or pass"""

        if isinstance(err, SocketTimeout):
            raise ReadTimeoutError(
                self, url, f"Read timed out. (read timeout={timeout_value})"
            ) from err

        # See the above comment about EAGAIN in Python 3.
        if hasattr(err, "errno") and err.errno in _blocking_errnos:
            raise ReadTimeoutError(
                self, url, f"Read timed out. (read timeout={timeout_value})"
            ) from err

    def _make_request(
        self,
        conn: HTTPConnection,
        method: str,
        url: str,
        timeout: _TYPE_TIMEOUT = _Default,
        chunked: bool = False,
        **httplib_request_kw: Any,
    ) -> _HttplibHTTPResponse:
        
        self.num_requests += 1

        timeout_obj = self._get_timeout(timeout)
        timeout_obj.start_connect()
        conn.timeout = timeout_obj.connect_timeout  # type: ignore[assignment]

        # Trigger any extra validation we need to do.
        // 请求连接验证过程中的超时,也属于 ReadTimeoutError
        try:
            self._validate_conn(conn)
        except (SocketTimeout, BaseSSLError) as e:
            self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
            raise

        # conn.request() calls http.client.*.request, not the method in
        # urllib3.request. It also calls makefile (recv) on the socket.
        try:
         //  请求体发送
            if chunked:
                conn.request_chunked(method, url, **httplib_request_kw)
            else:
                conn.request(method, url, **httplib_request_kw)

        # We are swallowing BrokenPipeError (errno.EPIPE) since the server is
        # legitimately able to close the connection after sending a valid response.
        # With this behaviour, the received response is still readable.
        except BrokenPipeError:
            pass
        except OSError as e:
            # MacOS/Linux
            # EPROTOTYPE is needed on macOS
            # https://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
            if e.errno != errno.EPROTOTYPE:
                raise

        # Reset the timeout for the recv() on the socket
        // 重置请求socket超时时间
        read_timeout = timeout_obj.read_timeout

        if conn.sock:
            if read_timeout == 0:
                raise ReadTimeoutError(
                    self, url, f"Read timed out. (read timeout={read_timeout})"
                )
            if read_timeout is Timeout.DEFAULT_TIMEOUT:
                conn.sock.settimeout(socket.getdefaulttimeout())
            else:  # None or a value
                conn.sock.settimeout(read_timeout)

        # Receive the response from the server
        try:
            httplib_response = conn.getresponse()
        except (BaseSSLError, OSError) as e:
            self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
            raise
            
        ...
        
        return httplib_response

urllib其他常用姿势

响应方式

所用的响应都通过HTTPResponse对象提供statusdataheaders属性。

import urllib3
http = urllib3.PoolManager()
r = http.request('GET', 'http://httpbin.org/ip')
r.status
r.data
r.headers

# 输出
200
b'{\\n  "origin": "137.59.103.52"\\n}\\n'
HTTPHeaderDict({'Date': 'Fri, 05 Nov 2021 05:38:24 GMT', 'Content-Type': 'application/json', 'Content-Length': '32', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true'})

JSON响应

JSON内容可以通过解码和反序列化来加载data请求的属性:

import json
import urllib3
http = urllib3.PoolManager()
r = http.request('GET', 'http://httpbin.org/ip')
json.loads(r.data.decode('utf-8'))

# 输出
{'origin': '137.59.103.52'}

二进制响应

data响应的属性始终设置为表示响应内容的字节字符串:

import urllib3
http = urllib3.PoolManager()
r = http.request('GET', 'http://httpbin.org/bytes/8')
r.data

# 输出
b'S\\x04e\\to\\x12NN'

注:对于更大的响应,有时可以使用stream进行接收

IO响应

有时候你想用io.TextIOWrapper或类似的对象,如直接使用HTTPResponse数据。要使这两个接口很好地结合在一起,需要使用auto_close通过将其设置为False。默认情况下,读取所有字节后关闭HTTP响应,以上设置这将禁用该行为:

import io
import urllib3
http = urllib3.PoolManager()
r = http.request('GET', 'https://www.baidu.com/', preload_content=False)
r.auto_close = False
for line in io.TextIOWrapper(r):
 print(line)

请求方式

GETHEADDELETE请求比较常规,将请求参数作为字典传递到fields参数即可,如:fields={'arg': 'value'}。下面我们主要说说POSTPUT请求。

首先,POSTPUT通过URL传参请求,需要在URL中进行手动编码参数:

import urllib3
http = urllib3.PoolManager()
from urllib.parse import urlencode
encoded_args = urlencode({'arg': 'value'})
url = 'http://httpbin.org/post?' + encoded_args
r = http.request('POST', url)
json.loads(r.data.decode('utf-8'))['args']

# 输出
{'arg': 'value'}

表单POST

表单方式,将参数作为字典传递到fields参数进行请求:

import urllib3
http = urllib3.PoolManager()
r = http.request('POST', 'http://httpbin.org/post', fields={'field': 'value'})
json.loads(r.data.decode('utf-8'))['form']

# 输出
{'field': 'value'}

注:表单方式默认以String类型进行传递

JSON POST

JSON方式,将指定编码数据作为JSON请求发送body参数和设置Content-Type参数进行请求:

import json
import urllib3
http = urllib3.PoolManager()
data = {'attribute': 'value'}
encoded_data = json.dumps(data).encode('utf-8')
r = http.request('POST','http://httpbin.org/post', body=encoded_data, headers={'Content-Type': 'application/json'})
json.loads(r.data.decode('utf-8'))['json']

# 输出
{'attribute': 'value'}

文件和二进制 POST

使用multipart/form-data编码进行二进制文件传参请求,比如上传图片或其他文件,由于这种场景已经不再适用,这块不继续讲解

参考文档

  • https://nining.website/python/spider/urllib3
  • https://github.com/python/cpython/blob/main/Lib/socket.py
  • https://github.com/python/cpython/blob/v2.7.18/Lib/urllib.py
  • https://github.com/python/cpython/blob/v2.7.18/Lib/urllib2.py
  • https://github.com/urllib3/urllib3/blob/main/src/urllib3/connectionpool.py#L362
本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
一次算法读图超时引起的urllib3源码分析
问题:发现某算法A,单独测试推理<50ms,但是整个流程花费200ms~3s,明显不正常,头大!!!
<<上一篇
下一篇>>