Featured image of post Urllib3

Urllib3

主要介绍爬虫技术中重要的组件Urllib3的一些特性和特性的测试,以及一些比较重要的使用方法和使用场景。

Urllib3

特点:

  • 连接池管理:在客户端和服务器之间复用已有连接,避免每次请求都重新建立新连接,核心是PoolManager,内部维护一个或多个ConnectionPool;
  • 线程安全:适合在多线程环境下进行并发请求;
  • 重试机制:请求失败自动重试;
  • SSL/TLS验证:建立 HTTPS 连接时,客户端校验服务器提供的数字证书是否可信,并通过 TLS 协议完成数据加密通道的协商;
  • 代理支持:在客户端(你)访问目标网站时,通过一个中间服务器(代理服务器)中转请求和响应,而不是直接访问目标网站;
  • 文件上传:支持 multipart 文件上传;
  • 编码处理:自动处理响应内容的编码问题;

核心类与方法:

  1. PoolManager:最核心类,负责管理连接池和所有请求。

    1
    2
    3
    4
    5
    6
    7
    8
    
    http = urllib3.PoolManager(
    	num_pools = 50,          #连接池数量
        maxsize = 10,            #每个连接池最大连接数
        block = True,            #连接池满时是否阻塞等待
        timeout = 30.0,	         #请求超时时间
        retries = 3,  			 #默认重试次数
        headers={'User-Agent':''}
    )
    
  2. GET请求:

    1
    2
    3
    4
    5
    
    response = http.request(
    	'GET',
        'http://',
        fields = {'arg':'value'} #查询参数
    )
    
  3. POST请求:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    #表单数据
    response = http.request(
    	'POST',
        'http://',
        fields = {'field':'value'}
    )
    
    #JSON数据
    import json
    response = http.request(
    	'POST',
        'http://',
        body = json.dumps({'key':'value'}).encode('utf-8'),
        headers = {'Content-Type':'application/json'}
    )
    
  4. PUT/DELETE请求:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    #PUT请求
    response = http.request(
    	'PUT',
        'http://',
        body = b 'data to put'
    )
    
    #DELETE请求
    response = http.request(
    	'DELETE',
        'http://'
    )
    
  5. 文件上传:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    with open('example.txt','rb') as f:
        file_data = f.read()
    
    response = http.request(
    	'POST',
        'http://',
        fields = {
            'filefield':('example.txt',file_data,'text/plain'),
            'description':'File upload example'
        }
    )
    

响应处理与重要属性

  1. 响应对象属性:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    response = http.request('GET','http://exmaple.com')
    
    #状态码
    print(response.status)
    #响应头
    print(reponse.headers)
    #响应体
    print(response.data)
    print(response.data.decode('utf-8'))#解码为字符串
    #重新定向历史
    print(response.redirect_location)
    #消耗时间
    print(response.elapsed)
    
  2. 响应内容处理:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    #JSON响应处理
    import json
    json_response = json.loads(response.data.decode('utf-8'))
    
    #流式响应处理
    response = http.request(
    	'GET',
        'http://exmaple.com/largefile',
        preload_content=False
    )
    
    try:
        for chunk in response.stream(1024):#每次读1024字节
            process_chunk(chunk)
    finally:
         response.release_conn()#释放连接   
    

高级特性与配置

  1. 重试机制:

    1
    2
    3
    4
    5
    6
    7
    8
    
    from urllib3.util.retry import Retry
    
    retry_strategy = Retry(
    	total = 3, 			#重试总次数
        backoff_factor = 1, #重试间隔增长因子
        status_forcelist = [500,502,503,504]#指定哪些状态码会触发自动重试
    )
    http = urllib3.PoolManager(retries = retry_strategy)
    
  2. 超时设置:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    #全局超时
    http = urllib3.PoolManager(timeout=2.0)
    
    #单个请求超时 
    response = http.request(
        'GET',
        'http://example.com',
        timeout=5.0 
    )
    
    #分别设置连接和读取超时 
    response = http.request(
        'GET',
        'http://example.com',
        timeout=urllib3.Timeout(connect=2.0, read=10.0)
    )
    
  3. SSL/TLS配置:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    #禁用证书验证(不推荐生产环境使用)
    http = urllib3.PoolManager(
        cert_reqs='CERT_NONE',#不校验证书有效性
        assert_hostname=False #不检查证书是否匹配域名
    )
    
    
    #自定义CA证书 
    http = urllib3.PoolManager(
        cert_reqs='CERT_REQUIRED',
        ca_certs='/path/to/certificate.pem')#自定义CA
    
    #客户端证书认证 
    http = urllib3.PoolManager(
        #证书文件路径
        cert_file='/path/to/client_cert.pem',
        #证书对应私钥文件路径
        key_file='/path/to/client_key.pem')
    
  4. 代理配置:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    
    #HTTP代理 
    http = urllib3.ProxyManager(
        #代理服务器地址
        'http://proxy.example.com:8080/',
        #身份认证信息
        proxy_headers={'Proxy-Authorization': 'Basic ...'})
    
    #SOCKS代理(需要安装PySocks)
    pip install pysocks 
    
    from urllib3.contrib.socks import SOCKSProxyManager 
    proxy = SOCKSProxyManager(
        'socks5://user:password@127.0.0.1:1080/'
    )
    

性能优化技巧

  1. 连接池调优:

    1
    2
    3
    4
    5
    6
    7
    
    #根据应用场景调整连接池参数 
    http = urllib3.PoolManager(
        num_pools=10,      # 适合大多数应用 
        maxsize=10,        # 每个连接池最大连接数 
        block=True,        # 连接池满时阻塞而非创建新连接 
        timeout=60.0       # 适当延长超时时间 
    )
    
  2. 连接重用:

    1
    2
    3
    4
    
    #使用上下文管理器确保连接正确释放 
    with http.request('GET', 'http://example.com', preload_content=False) as response:
        process_response(response)
    #连接自动返回到连接池 
    
  3. 批处理请求(使用线程池并发发起请求,并收集响应对象):

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    from concurrent.futures import ThreadPoolExecutor 
    
    urls = ['http://example.com/1',
            'http://example.com/2',
            'http://example.com/3']
    
    def fetch(url):
        return http.request('GET', url)
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        results = list(executor.map(fetch, urls))
    

6.常见的应用场景

  1. Web API调用:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    import json 
    from urllib.parse import urlencode 
    
    base_url = "https://api.example.com/v1"
    
    def get_user(user_id):
        response = http.request(
            'GET',
            f"{base_url}/users/{user_id}",
            headers={'Authorization': 'Bearer token123'}
        )
        return json.loads(response.data.decode('utf-8'))
    
    def search_users(query, limit=10):
        params = {'q': query, 'limit': limit}
        response = http.request(
            'GET',
            f"{base_url}/users/search?{urlencode(params)}"
        )
        return json.loads(response.data.decode('utf-8'))
    
  2. 网页抓取:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    from bs4 import BeautifulSoup 
    
    def scrape_website(url):
        response = http.request('GET', url)
        if response.status == 200:
            soup = BeautifulSoup(response.data, 'html.parser')
            # 提取数据...
            return {
                'title': soup.title.string,
                #遍历所有的a标签,并提取其中的href属性
                'links': [a['href'] for a in soup.find_all('a')]
            }
        return None 
    
  3. 文件下载:

    1
    2
    3
    4
    5
    6
    7
    8
    
    def download_file(url, save_path):
        with http.request('GET', url, preload_content=False) as response:
            if response.status == 200:
                with open(save_path, 'wb') as f:
                    for chunk in response.stream(1024):
                        f.write(chunk)
                return True 
        return False 
    
  4. 微服务通信:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    import json 
    
    def call_service(service_url, method, payload=None):
        headers = {
            'Content-Type': 'application/json',
            'X-Request-ID': 'unique-id-123'
        }
    
        body = json.dumps(payload).encode('utf-8') if payload else None 
    
        response = http.request(
            method.upper(),
            service_url,
            headers=headers,
            body=body 
        )
    
        if response.status >= 400:
            raise Exception(f"Service error: {response.status}")
    
        return json.loads(response.data.decode('utf-8'))
    

常见问题

  1. 异常处理:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    import urllib3.exceptions 
    
    try:
        response = http.request('GET', 'http://example.com')
    except urllib3.exceptions.HTTPError as e:
        print(f"HTTP错误: {e}")
    except urllib3.exceptions.SSLError as e:
        print(f"SSL错误: {e}")
    except urllib3.exceptions.TimeoutError as e:
        print(f"请求超时: {e}")
    except urllib3.exceptions.RequestError as e:
        print(f"请求错误: {e}")
    except Exception as e:
        print(f"其他错误: {e}")
    
  2. 调试技巧:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    #启用调试日志 
    import logging 
    logging.basicConfig(level=logging.DEBUG)
    
    #或者只启用urllib3的调试日志 
    logger = logging.getLogger('urllib3')
    logger.setLevel(logging.DEBUG)
    
    #查看连接池状态 
    print(http.connection_pool_kw)
    print(http.pools)
    

与request库的对比

  1. 更底层的控制:直接访问连接池和底层配置
  2. 更小的内存占用:没有 requests 的额外抽象层
  3. 更早的错误检测:在请求发送前就能检测到某些问题
  4. 更灵活的流处理:对大型文件或流式API更友好
  5. 大多数情况用request,精细控制或高性能用urllib3

一些问题

Q 当我向https://www.xiaohongshu.com/发起请求失败时,urllib3会自动重新定向到https://www.xiaohongshu.com/explore,这是为什么,程序为什么会知道https://www.xiaohongshu.com/explore是正确的
A 不是urllib3“知道正确地址”,重新定向是服务端的行为,当向https://www.xiaohongshu.com/发起请求后,小红书的服务器会返回一个HTTP 301或302响应,将正确的网址返回给客户端,同时urllib会默认自动跟随重定向。
Q JSON响应处理和流式响应处理的区别
A 对于JSON响应处理,response.data一次性读取全部内容,再将其解析为Python对象;流式响应处理,不会把整个响应体加载进内存。JSON响应处理适用于REST API、结构化数据;流式响应处理适用于下载大文件、音视频流等场景
Q 线程与并发的关系
A 线程是并发的一种实现方式,但并发并不限于线程,线程像“工人”,并发像“工人排班的调度方式”,一个线程处理一个任务,多个线程合作是实现
Q PoolManager和ProxyManager的区别
A ProxyManager是继承自PoolManager,专门用来处理从代理服务器发出的请求,其重写了PoolManager的部分方法,确保所有的请求都从预定的代理进行,并且它能根据目标的URL的方案和代理类型,智能的选择使用HTTP转发或HTTP CONNECT通道

一些测试代码

  1. 一些库导入

    1
    2
    3
    4
    5
    6
    7
    
    import urllib3
    import threading
    import time
    from urllib3.exceptions import EmptyPoolError
    import json
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
  2. 测试连接复用:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    
    # 创建连接池,最大连接数设置为 1(便于观察复用)
    http = urllib3.HTTPConnectionPool("httpbin.org", maxsize=1)
    
    # 第一次请求
    print("第1次请求")
    r1 = http.request("GET", "/get")
    print("响应状态码:", r1.status)
    
    # 暂停 1 秒
    time.sleep(1)
    # 第二次请求
    print("\n第2次请求")
    r2 = http.request("GET", "/get")
    print("响应状态码:", r2.status)
    
    '''
    #通过如下方式,检查底层 socket 是否复用
    print("连接池中连接数量:", len(http.pool))
    for conn in http.pool:
        print("连接对象:", conn)
    '''
    
    '''
    #测试“不复用的情况”,可以设置 retries=0,并且让服务器关闭连接
    r = http.request("GET", "/get", headers={"Connection": "close"})
    '''
    
  3. 测试限制连接数+阻塞模式:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
    # 设置连接池为 maxsize=1,block=False
    http = urllib3.HTTPConnectionPool("httpbin.org", maxsize=1, block=False)
    
    # 请求函数:保持连接占用一段时间
    def make_request(name):
        print(f"[{name}] 准备发送请求")
        try:
            r = http.request("GET", "/delay/5", preload_content=False, timeout=10.0)
            print(f"[{name}] 已发送请求,延迟读取内容")
            time.sleep(6)  # 保持连接被占用
            r.release_conn()  # 主动释放连接
            print(f"[{name}] 释放连接")
        except EmptyPoolError as e:
            print(f"[{name}] 连接池耗尽异常: {e}")
        except Exception as e:
            print(f"[{name}] 其他异常: {e}")
    
    # 第一个线程:占住唯一连接
    t1 = threading.Thread(target=make_request, args=("线程1",))
    # 第二个线程:争抢连接
    t2 = threading.Thread(target=make_request, args=("线程2",))
    
    t1.start()
    time.sleep(0.2)  # 保证线程1先占住连接
    t2.start()
    
    t1.join()
    t2.join()
    '''
    连接池已满且不允许阻塞新的请求时,新的连接请求将会被丢弃。这意味着连接池无法处理所有的请求,可能会导致一些请求失败或被延迟处理。
    所以在触发连接池已满的情况下不会报错且能正常运行,但在DEBUG日志下会有警告。
    '''
    
  4. 验证连接池(num_pool)作用:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    'num_pool控制的是最多允许缓存多少个不同主机的连接池,每一个host:port组合会对应一个connectionpool实例,当请求的host数超过num_pools时,旧的连接池将会被清理(LRU策略)'
    # 创建连接池管理器,最多2个主机连接池
    http = urllib3.PoolManager(num_pools=2, maxsize=1)
    
    hosts = [
        "http://httpbin.org/get",
        "https://www.xiaohongshu.com/explore",
        "http://baidu.com"  # 任意有效网址也可
    ]
    
    def fetch(url):
        try:
            r = http.request("GET", url)
            print(f"访问 {url} 状态码: {r.status}")
        except Exception as e:
            print(f"访问 {url} 出错: {e}")
    
    # 访问3个不同host,超出num_pools限制
    for i in range(2):
        for url in hosts:
            fetch(url)
            time.sleep(0.5)
    '''
    可以观察到每次发送请求前都得先建立连接,这是因为只能允许存在两个连接池,当继续往里添加连接池时,会把旧的清理,使得再次连接时需要重新建立连接
    '''
    
恍如昨日,嗤笑今朝
使用 Hugo 构建
主题 StackJimmy 设计