程序不是越快越好。限速调用外部 API、轮询等待资源就绪、模拟用户操作节奏——这些场景都需要让代码"停一停"。Python 提供了不止一种暂停方式,选错工具会让整个进程卡死,选对了则既不浪费 CPU 又不拖慢其他任务。
下面从最基础的 time.sleep() 开始,逐步升级到装饰器、多线程和 asyncio,每种方式附带可直接运行的代码。
最直白的暂停:time.sleep()
time.sleep(seconds) 让当前线程挂起指定秒数,精度取决于操作系统调度(通常在毫秒级):
import time
print("开始")
time.sleep(2.5) # 暂停 2.5 秒
print("继续")
几个容易踩的坑:
- 参数只接受正数,传负数会抛
ValueError。 - 信号中断:在 Linux 上,
sleep可能被信号提前打断,返回值不为零时说明没睡够;Windows 上则不可中断。 - 精度有限:
time.sleep(0.001)实际延迟往往远大于 1 ms,别用它做精确计时。
典型用途:批量请求 API 时做简单限速。
import time
import requests
API_URL = "https://api.example.com/data"
for page in range(1, 11):
resp = requests.get(f"{API_URL}?page={page}")
print(f"page {page}: {resp.status_code}")
time.sleep(1) # 每次请求间隔 1 秒,避免触发限频
用装饰器把延迟封装起来
当延迟是某种"策略"而非临时补丁时,把它抽成装饰器更干净——比如给所有重试函数加上固定等待:
import time
from functools import wraps
def delay(seconds):
"""在函数执行前暂停指定秒数"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
time.sleep(seconds)
return func(*args, **kwargs)
return wrapper
return decorator
@delay(2)
def fetch_user(uid):
print(f"获取用户 {uid} ...")
fetch_user(42) # 先等 2 秒,再执行函数体
装饰器的好处:延迟逻辑和业务逻辑解耦,改等待时间只改装饰器参数,不用翻进函数体。
还可以更进一步,写一个重试 + 退避装饰器,每次失败后等待时间翻倍:
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise
wait = base_delay * (2 ** attempt)
print(f"第 {attempt + 1} 次失败,{wait}s 后重试: {e}")
time.sleep(wait)
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=1)
def call_flaky_api():
import random
if random.random() < 0.7:
raise ConnectionError("服务不可用")
return "成功"
print(call_flaky_api())
多线程里的 sleep:别冻住整个程序
time.sleep() 只冻住调用它的那个线程。在多线程程序中,一个线程睡觉,其他线程照跑——这正是做定时任务或进度动画的基础:
import time
import threading
def countdown(name, seconds):
for i in range(seconds, 0, -1):
print(f"[{name}] 剩余 {i}s")
time.sleep(1)
print(f"[{name}] 完成!")
# 两个倒计时并行运行
t1 = threading.Thread(target=countdown, args=("任务A", 5))
t2 = threading.Thread(target=countdown, args=("任务B", 3))
t1.start()
t2.start()
t1.join()
t2.join()
print("全部结束")
输出会交错出现,说明两个线程各自 sleep、各自推进,互不阻塞。
注意:如果主线程需要随时终止子线程,不要用 time.sleep() 做长等待——它不响应线程停止信号。改用 Event.wait(timeout):
import threading
stop_event = threading.Event()
def worker():
while not stop_event.is_set():
print("工作中...")
stop_event.wait(2) # 等待 2 秒,但可被 event.set() 立即唤醒
t = threading.Thread(target=worker)
t.start()
import time
time.sleep(6) # 主线程 6 秒后下令停止
stop_event.set()
t.join()
print("优雅退出")
asyncio 的 sleep:非阻塞等待的正确做法
在异步代码里用 time.sleep() 是致命错误——它会冻住整个事件循环,所有协程一起卡。正确做法是 asyncio.sleep(),它只让当前协程让出控制权,其他协程继续跑:
import asyncio
async def fetch(name, delay):
print(f"{name} 开始请求,预计 {delay}s")
await asyncio.sleep(delay) # 非阻塞等待
print(f"{name} 返回结果")
return f"{name}:data"
async def main():
# 三个请求并发执行,总耗时 ≈ max(2, 1, 3) = 3s,而非 2+1+3=6s
results = await asyncio.gather(
fetch("A", 2),
fetch("B", 1),
fetch("C", 3),
)
print(results)
asyncio.run(main())
对比一下:如果把上面 await asyncio.sleep(delay) 换成 time.sleep(delay),三个任务会串行执行,总耗时变成 6 秒,而且期间事件循环完全无响应。
asyncio.sleep(0) 也有妙用——它让当前协程主动让步一次,给其他协程执行机会,类似"让出 CPU 时间片":
async def heavy_task():
for i in range(100000):
# 每处理一批就让步一次,避免长时间独占事件循环
if i % 1000 == 0:
await asyncio.sleep(0)
print("重任务完成")
选型清单
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 简单限速、一次性等待 | time.sleep() |
代码最少,够用就行 |
| 重试/退避策略 | 装饰器封装 time.sleep() |
逻辑复用,改参数不动业务 |
| 多线程定时任务或可中断等待 | Event.wait(timeout) |
可被外部信号唤醒,不浪费线程 |
| 异步并发请求、协程间协作 | asyncio.sleep() |
不阻塞事件循环,并发才有效 |
| 需要精确短延迟(<1ms) | 不用 sleep,改用 time.perf_counter() 轮询 |
OS 调度精度不够,sleep 不可靠 |
一句话总结:同步代码用 time.sleep(),异步代码用 asyncio.sleep(),可中断等待用 Event,策略性延迟用装饰器。选错工具的代价远大于多写几行代码的代价——把上面示例复制到本地跑一遍,感受阻塞与非阻塞的差异,下次就不会选错。