Urllib3
特点:
- 连接池管理:在客户端和服务器之间复用已有连接,避免每次请求都重新建立新连接,核心是PoolManager,内部维护一个或多个ConnectionPool;
- 线程安全:适合在多线程环境下进行并发请求;
- 重试机制:请求失败自动重试;
- SSL/TLS验证:建立 HTTPS 连接时,客户端校验服务器提供的数字证书是否可信,并通过 TLS 协议完成数据加密通道的协商;
- 代理支持:在客户端(你)访问目标网站时,通过一个中间服务器(代理服务器)中转请求和响应,而不是直接访问目标网站;
- 文件上传:支持 multipart 文件上传;
- 编码处理:自动处理响应内容的编码问题;
核心类与方法:
-
PoolManager:最核心类,负责管理连接池和所有请求。
1 2 3 4 5 6 7 8http = urllib3.PoolManager( num_pools = 50, #连接池数量 maxsize = 10, #每个连接池最大连接数 block = True, #连接池满时是否阻塞等待 timeout = 30.0, #请求超时时间 retries = 3, #默认重试次数 headers={'User-Agent':''} ) -
GET请求:
1 2 3 4 5response = http.request( 'GET', 'http://', fields = {'arg':'value'} #查询参数 ) -
POST请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15#表单数据 response = http.request( 'POST', 'http://', fields = {'field':'value'} ) #JSON数据 import json response = http.request( 'POST', 'http://', body = json.dumps({'key':'value'}).encode('utf-8'), headers = {'Content-Type':'application/json'} ) -
PUT/DELETE请求:
1 2 3 4 5 6 7 8 9 10 11 12#PUT请求 response = http.request( 'PUT', 'http://', body = b 'data to put' ) #DELETE请求 response = http.request( 'DELETE', 'http://' ) -
文件上传:
1 2 3 4 5 6 7 8 9 10 11with open('example.txt','rb') as f: file_data = f.read() response = http.request( 'POST', 'http://', fields = { 'filefield':('example.txt',file_data,'text/plain'), 'description':'File upload example' } )
响应处理与重要属性
-
响应对象属性:
1 2 3 4 5 6 7 8 9 10 11 12 13response = http.request('GET','http://exmaple.com') #状态码 print(response.status) #响应头 print(reponse.headers) #响应体 print(response.data) print(response.data.decode('utf-8'))#解码为字符串 #重新定向历史 print(response.redirect_location) #消耗时间 print(response.elapsed) -
响应内容处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16#JSON响应处理 import json json_response = json.loads(response.data.decode('utf-8')) #流式响应处理 response = http.request( 'GET', 'http://exmaple.com/largefile', preload_content=False ) try: for chunk in response.stream(1024):#每次读1024字节 process_chunk(chunk) finally: response.release_conn()#释放连接
高级特性与配置
-
重试机制:
1 2 3 4 5 6 7 8from urllib3.util.retry import Retry retry_strategy = Retry( total = 3, #重试总次数 backoff_factor = 1, #重试间隔增长因子 status_forcelist = [500,502,503,504]#指定哪些状态码会触发自动重试 ) http = urllib3.PoolManager(retries = retry_strategy) -
超时设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16#全局超时 http = urllib3.PoolManager(timeout=2.0) #单个请求超时 response = http.request( 'GET', 'http://example.com', timeout=5.0 ) #分别设置连接和读取超时 response = http.request( 'GET', 'http://example.com', timeout=urllib3.Timeout(connect=2.0, read=10.0) ) -
SSL/TLS配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18#禁用证书验证(不推荐生产环境使用) http = urllib3.PoolManager( cert_reqs='CERT_NONE',#不校验证书有效性 assert_hostname=False #不检查证书是否匹配域名 ) #自定义CA证书 http = urllib3.PoolManager( cert_reqs='CERT_REQUIRED', ca_certs='/path/to/certificate.pem')#自定义CA #客户端证书认证 http = urllib3.PoolManager( #证书文件路径 cert_file='/path/to/client_cert.pem', #证书对应私钥文件路径 key_file='/path/to/client_key.pem') -
代理配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15#HTTP代理 http = urllib3.ProxyManager( #代理服务器地址 'http://proxy.example.com:8080/', #身份认证信息 proxy_headers={'Proxy-Authorization': 'Basic ...'}) #SOCKS代理(需要安装PySocks) pip install pysocks from urllib3.contrib.socks import SOCKSProxyManager proxy = SOCKSProxyManager( 'socks5://user:password@127.0.0.1:1080/' )
性能优化技巧
-
连接池调优:
1 2 3 4 5 6 7#根据应用场景调整连接池参数 http = urllib3.PoolManager( num_pools=10, # 适合大多数应用 maxsize=10, # 每个连接池最大连接数 block=True, # 连接池满时阻塞而非创建新连接 timeout=60.0 # 适当延长超时时间 ) -
连接重用:
1 2 3 4#使用上下文管理器确保连接正确释放 with http.request('GET', 'http://example.com', preload_content=False) as response: process_response(response) #连接自动返回到连接池 -
批处理请求(使用线程池并发发起请求,并收集响应对象):
1 2 3 4 5 6 7 8 9 10 11from concurrent.futures import ThreadPoolExecutor urls = ['http://example.com/1', 'http://example.com/2', 'http://example.com/3'] def fetch(url): return http.request('GET', url) with ThreadPoolExecutor(max_workers=5) as executor: results = list(executor.map(fetch, urls))
6.常见的应用场景
-
Web API调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20import json from urllib.parse import urlencode base_url = "https://api.example.com/v1" def get_user(user_id): response = http.request( 'GET', f"{base_url}/users/{user_id}", headers={'Authorization': 'Bearer token123'} ) return json.loads(response.data.decode('utf-8')) def search_users(query, limit=10): params = {'q': query, 'limit': limit} response = http.request( 'GET', f"{base_url}/users/search?{urlencode(params)}" ) return json.loads(response.data.decode('utf-8')) -
网页抓取:
1 2 3 4 5 6 7 8 9 10 11 12 13from bs4 import BeautifulSoup def scrape_website(url): response = http.request('GET', url) if response.status == 200: soup = BeautifulSoup(response.data, 'html.parser') # 提取数据... return { 'title': soup.title.string, #遍历所有的a标签,并提取其中的href属性 'links': [a['href'] for a in soup.find_all('a')] } return None -
文件下载:
1 2 3 4 5 6 7 8def download_file(url, save_path): with http.request('GET', url, preload_content=False) as response: if response.status == 200: with open(save_path, 'wb') as f: for chunk in response.stream(1024): f.write(chunk) return True return False -
微服务通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21import json def call_service(service_url, method, payload=None): headers = { 'Content-Type': 'application/json', 'X-Request-ID': 'unique-id-123' } body = json.dumps(payload).encode('utf-8') if payload else None response = http.request( method.upper(), service_url, headers=headers, body=body ) if response.status >= 400: raise Exception(f"Service error: {response.status}") return json.loads(response.data.decode('utf-8'))
常见问题
-
异常处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14import urllib3.exceptions try: response = http.request('GET', 'http://example.com') except urllib3.exceptions.HTTPError as e: print(f"HTTP错误: {e}") except urllib3.exceptions.SSLError as e: print(f"SSL错误: {e}") except urllib3.exceptions.TimeoutError as e: print(f"请求超时: {e}") except urllib3.exceptions.RequestError as e: print(f"请求错误: {e}") except Exception as e: print(f"其他错误: {e}") -
调试技巧:
1 2 3 4 5 6 7 8 9 10 11#启用调试日志 import logging logging.basicConfig(level=logging.DEBUG) #或者只启用urllib3的调试日志 logger = logging.getLogger('urllib3') logger.setLevel(logging.DEBUG) #查看连接池状态 print(http.connection_pool_kw) print(http.pools)
与request库的对比
- 更底层的控制:直接访问连接池和底层配置
- 更小的内存占用:没有 requests 的额外抽象层
- 更早的错误检测:在请求发送前就能检测到某些问题
- 更灵活的流处理:对大型文件或流式API更友好
- 大多数情况用request,精细控制或高性能用urllib3
一些问题
| Q | 当我向https://www.xiaohongshu.com/发起请求失败时,urllib3会自动重新定向到https://www.xiaohongshu.com/explore,这是为什么,程序为什么会知道https://www.xiaohongshu.com/explore是正确的 |
|---|---|
| A | 不是urllib3“知道正确地址”,重新定向是服务端的行为,当向https://www.xiaohongshu.com/发起请求后,小红书的服务器会返回一个HTTP 301或302响应,将正确的网址返回给客户端,同时urllib会默认自动跟随重定向。 |
| Q | JSON响应处理和流式响应处理的区别 |
|---|---|
| A | 对于JSON响应处理,response.data一次性读取全部内容,再将其解析为Python对象;流式响应处理,不会把整个响应体加载进内存。JSON响应处理适用于REST API、结构化数据;流式响应处理适用于下载大文件、音视频流等场景 |
| Q | 线程与并发的关系 |
|---|---|
| A | 线程是并发的一种实现方式,但并发并不限于线程,线程像“工人”,并发像“工人排班的调度方式”,一个线程处理一个任务,多个线程合作是实现 |
| Q | PoolManager和ProxyManager的区别 |
|---|---|
| A | ProxyManager是继承自PoolManager,专门用来处理从代理服务器发出的请求,其重写了PoolManager的部分方法,确保所有的请求都从预定的代理进行,并且它能根据目标的URL的方案和代理类型,智能的选择使用HTTP转发或HTTP CONNECT通道 |
一些测试代码
-
一些库导入
1 2 3 4 5 6 7import urllib3 import threading import time from urllib3.exceptions import EmptyPoolError import json import logging logging.basicConfig(level=logging.DEBUG) -
测试连接复用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26# 创建连接池,最大连接数设置为 1(便于观察复用) http = urllib3.HTTPConnectionPool("httpbin.org", maxsize=1) # 第一次请求 print("第1次请求") r1 = http.request("GET", "/get") print("响应状态码:", r1.status) # 暂停 1 秒 time.sleep(1) # 第二次请求 print("\n第2次请求") r2 = http.request("GET", "/get") print("响应状态码:", r2.status) ''' #通过如下方式,检查底层 socket 是否复用 print("连接池中连接数量:", len(http.pool)) for conn in http.pool: print("连接对象:", conn) ''' ''' #测试“不复用的情况”,可以设置 retries=0,并且让服务器关闭连接 r = http.request("GET", "/get", headers={"Connection": "close"}) ''' -
测试限制连接数+阻塞模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32# 设置连接池为 maxsize=1,block=False http = urllib3.HTTPConnectionPool("httpbin.org", maxsize=1, block=False) # 请求函数:保持连接占用一段时间 def make_request(name): print(f"[{name}] 准备发送请求") try: r = http.request("GET", "/delay/5", preload_content=False, timeout=10.0) print(f"[{name}] 已发送请求,延迟读取内容") time.sleep(6) # 保持连接被占用 r.release_conn() # 主动释放连接 print(f"[{name}] 释放连接") except EmptyPoolError as e: print(f"[{name}] 连接池耗尽异常: {e}") except Exception as e: print(f"[{name}] 其他异常: {e}") # 第一个线程:占住唯一连接 t1 = threading.Thread(target=make_request, args=("线程1",)) # 第二个线程:争抢连接 t2 = threading.Thread(target=make_request, args=("线程2",)) t1.start() time.sleep(0.2) # 保证线程1先占住连接 t2.start() t1.join() t2.join() ''' 连接池已满且不允许阻塞新的请求时,新的连接请求将会被丢弃。这意味着连接池无法处理所有的请求,可能会导致一些请求失败或被延迟处理。 所以在触发连接池已满的情况下不会报错且能正常运行,但在DEBUG日志下会有警告。 ''' -
验证连接池(num_pool)作用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25'num_pool控制的是最多允许缓存多少个不同主机的连接池,每一个host:port组合会对应一个connectionpool实例,当请求的host数超过num_pools时,旧的连接池将会被清理(LRU策略)' # 创建连接池管理器,最多2个主机连接池 http = urllib3.PoolManager(num_pools=2, maxsize=1) hosts = [ "http://httpbin.org/get", "https://www.xiaohongshu.com/explore", "http://baidu.com" # 任意有效网址也可 ] def fetch(url): try: r = http.request("GET", url) print(f"访问 {url} 状态码: {r.status}") except Exception as e: print(f"访问 {url} 出错: {e}") # 访问3个不同host,超出num_pools限制 for i in range(2): for url in hosts: fetch(url) time.sleep(0.5) ''' 可以观察到每次发送请求前都得先建立连接,这是因为只能允许存在两个连接池,当继续往里添加连接池时,会把旧的清理,使得再次连接时需要重新建立连接 '''