Concurrencia en Python: threading, asyncio y multiprocessing

📅 Actualizado en marzo 2026 📊 Nivel: Intermedio-Avanzado ⏱️ 22 min de lectura

Python tiene fama de ser lento y de no poder aprovechar múltiples núcleos. Esa fama es parcialmente merecida y profundamente malentendida. Lento comparado con C, sí — pero un servidor FastAPI que maneja diez mil conexiones simultáneas con asyncio no es lento. El GIL impide paralelismo CPU en un proceso, sí — pero multiprocessing lo resuelve con elegancia. Saber cuándo y cómo usar cada modelo de concurrencia es la diferencia entre código que escala y código que se atasca.

En esta lección aprendemos los tres modelos de concurrencia que ofrece Python: threading para I/O concurrente con la API del sistema operativo, asyncio para I/O masivo con un event loop eficiente, y multiprocessing para paralelismo CPU real. Veremos el GIL sin mitos, async/await desde los fundamentos, y concurrent.futures como API unificada que elige automáticamente el modelo correcto. Al final, sabrás exactamente qué herramienta usar para cada problema.

Medallón de Zenón de Elea, filósofo griego presocrático famoso por sus paradojas del movimiento
Οὐδέν κινεῖται
«Nada se mueve»
Zenón de Elea · Filósofo griego · 490 a.C. – 430 a.C.
Zenón formuló sus célebres paradojas para demostrar que el movimiento es una ilusión: Aquiles nunca alcanza a la tortuga porque para llegar a donde estaba la tortuga, primero debe recorrer la mitad del camino, y antes la mitad de esa mitad, en una serie infinita que nunca termina. El argumento es obviamente falso en la realidad, pero matemáticamente desconcertante durante dos mil años. El GIL de Python es la paradoja de Zenón del paralelismo: en un proceso Python con threads, parece que múltiples hilos se ejecutan simultáneamente —y en cierto sentido lo hacen, intercalándose miles de veces por segundo— pero en realidad solo uno ejecuta bytecode Python a la vez. El movimiento paralelo es una ilusión cuidadosamente orquestada. Como Zenón nos enseñó, la ilusión puede ser perfectamente funcional: para I/O concurrente, esa ilusión es exactamente lo que necesitas. Para paralelismo CPU real, necesitas múltiples procesos — múltiples Aquiles, no múltiples ilusiones de uno.

Concurrencia vs paralelismo: la distinción clave

Concurrencia significa gestionar múltiples tareas a la vez, aunque no se ejecuten literalmente al mismo tiempo. Paralelismo significa ejecutar múltiples tareas simultáneamente, en varios núcleos de CPU a la vez. La diferencia no es semántica: determina qué herramienta usar.

# ── La analogía del chef ──────────────────────────────────────
# Un chef solo (un hilo) preparando tres platos:
#
# SECUENCIAL: termina el primero, luego el segundo, luego el tercero
#   pasta (20min) → ensalada (5min) → postre (10min) = 35 min
#
# CONCURRENTE: mientras el agua hierve, corta la ensalada
#   pasta en fuego, mientras: ensalada → postre → vigilar pasta = 22 min
#   UN solo chef, pero aprovecha los tiempos de ESPERA (I/O)
#
# PARALELO: tres chefs, cada uno un plato
#   pasta || ensalada || postre = 20 min (el más lento)
#   Requiere MÚLTIPLES ejecutores reales

# ── En Python: ────────────────────────────────────────────────
# I/O-bound (peticiones HTTP, BD, archivos):
#   → Mucho tiempo ESPERANDO respuestas externas
#   → Solución: threading o asyncio (concurrencia)
#
# CPU-bound (cálculos numéricos, procesamiento de imágenes):
#   → Mucho tiempo CALCULANDO en CPU
#   → Solución: multiprocessing (paralelismo real)
#
# La pregunta diagnóstica:
#   ¿Mi programa pasa más tiempo ESPERANDO o CALCULANDO?
#   Esperando → threading/asyncio   Calculando → multiprocessing

import time

def tarea_io_simulada(n: int) -> str:
    """Simula una tarea I/O: la mayor parte del tiempo es espera."""
    time.sleep(0.5)   # simula latencia de red / BD
    return f"resultado_{n}"

def tarea_cpu(n: int) -> int:
    """Simula tarea CPU: cálculo intensivo."""
    return sum(i*i for i in range(n))   # sin I/O, solo cálculo

# Secuencial (baseline)
t0 = time.perf_counter()
resultados = [tarea_io_simulada(i) for i in range(10)]
print(f"Secuencial: {time.perf_counter()-t0:.2f}s")   # ~5.0s

# Con threading/asyncio sería ~0.5s (todas esperando en paralelo)
# Con multiprocessing no mejoraría (el cuello es el sleep, no la CPU)
📋 I/O-bound vs CPU-bound — el diagnóstico correcto importa. Aplicar threading a código CPU-bound no solo no mejora: puede empeorar el rendimiento por el overhead del cambio de contexto entre threads y la contención del GIL. Aplicar multiprocessing a código I/O-bound desperdicia memoria y tiempo de arranque de procesos. El diagnóstico correcto es la primera decisión.

El GIL: por qué los threads no son lo que parecen

El GIL (Global Interpreter Lock) es un mutex que protege el estado interno del intérprete CPython. Garantiza que solo un thread ejecute bytecode Python a la vez, lo que simplifica la gestión de memoria (el contador de referencias para el garbage collector no necesita ser thread-safe a nivel de cada objeto). La consecuencia: dos threads Python no pueden ejecutar cómputo Python simultáneamente en el mismo proceso.

import threading
import time

# ── Demostración del GIL: threads CPU-bound no ayudan ────────
contador = 0

def incrementar(n: int):
    global contador
    for _ in range(n):
        contador += 1   # operación NO atómica: read-modify-write

# SECUENCIAL
t0 = time.perf_counter()
incrementar(5_000_000)
incrementar(5_000_000)
tiempo_sec = time.perf_counter() - t0
print(f"Secuencial: {tiempo_sec:.3f}s, contador={contador:,}")

# CON 2 THREADS (CPU-bound)
contador = 0
t0 = time.perf_counter()
t1 = threading.Thread(target=incrementar, args=(5_000_000,))
t2 = threading.Thread(target=incrementar, args=(5_000_000,))
t1.start(); t2.start()
t1.join();  t2.join()
tiempo_thr = time.perf_counter() - t0
print(f"Threading:  {tiempo_thr:.3f}s, contador={contador:,}")
# ← Threading no es más rápido (GIL) y el contador puede estar MAL
#   (race condition: ambos threads leen el mismo valor y lo sobreescriben)


# ── Cuándo el GIL se libera automáticamente ───────────────────
# 1. Operaciones de I/O: time.sleep(), socket.recv(), file.read()...
# 2. Extensiones C que llaman Py_BEGIN_ALLOW_THREADS:
#    NumPy (operaciones vectoriales), OpenCV, zlib, hashlib...
# 3. Cada ~5ms (sys.getswitchinterval()), el intérprete fuerza un switch

import sys
print(f"Switch interval: {sys.getswitchinterval()*1000:.0f}ms")   # 5ms

# ── Por tanto, threading SÍ es útil para: ────────────────────
# - Descargas HTTP simultáneas (requests lanza I/O → GIL liberado)
# - Consultas a BD en paralelo
# - Llamadas a APIs externas
# - Lectura de múltiples archivos

# ── Y NO es útil para: ───────────────────────────────────────
# - Cálculos matemáticos puros en Python
# - Procesamiento de texto intensivo
# - Compresión/descompresión en Python puro
# → Para eso: multiprocessing
Diagrama del GIL de Python, comparación threading vs asyncio vs multiprocessing, y líneas de tiempo de ejecución de cada modelo
El mapa completo de la concurrencia en Python: cómo el GIL controla los threads, las líneas de tiempo de cada modelo (threading intercalado, asyncio cooperativo, multiprocessing paralelo real), y cuándo cada herramienta es la elección correcta. Infografía: Ciberaula.

threading: múltiples hilos para I/O

El módulo threading es la herramienta adecuada cuando necesitas ejecutar tareas I/O-bound de forma concurrente usando código síncrono existente. Cada thread es un hilo del sistema operativo: ligero en comparación con un proceso, pero más pesado que una coroutine de asyncio.

import threading
import time
import queue

# ── Thread básico ─────────────────────────────────────────────
def descargar(url: str, resultados: list):
    time.sleep(0.3)   # simula latencia de red
    resultados.append(f"✓ {url}")

urls = [f"https://api.ejemplo.com/dato/{i}" for i in range(8)]
resultados = []
threads = []

t0 = time.perf_counter()
for url in urls:
    t = threading.Thread(target=descargar, args=(url, resultados))
    t.start()
    threads.append(t)

for t in threads:
    t.join()   # esperar a que termine cada thread

print(f"Descargados {len(resultados)} en {time.perf_counter()-t0:.2f}s")
# ~0.3s en lugar de ~2.4s secuencial


# ── Lock: proteger recursos compartidos ───────────────────────
class Contador:
    def __init__(self):
        self._valor = 0
        self._lock  = threading.Lock()

    def incrementar(self):
        with self._lock:       # solo un thread a la vez
            self._valor += 1   # operación protegida

    @property
    def valor(self):
        return self._valor

contador = Contador()
threads  = [threading.Thread(target=contador.incrementar) for _ in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
print(contador.valor)   # siempre 1000, sin race conditions


# ── Queue: comunicación thread-safe entre threads ─────────────
# El patrón productor-consumidor es el más robusto con threads

def productor(cola: queue.Queue, items: list):
    for item in items:
        time.sleep(0.1)            # simula generación lenta
        cola.put(item)
    cola.put(None)                 # señal de fin (sentinel)

def consumidor(cola: queue.Queue, resultados: list):
    while True:
        item = cola.get()
        if item is None:
            break
        resultados.append(item * 2)
        cola.task_done()

cola       = queue.Queue(maxsize=5)   # máximo 5 items en la cola
resultados = []
datos      = list(range(10))

prod = threading.Thread(target=productor,  args=(cola, datos))
cons = threading.Thread(target=consumidor, args=(cola, resultados))
prod.start(); cons.start()
prod.join();  cons.join()
print(resultados)   # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


# ── Event: sincronizar threads ────────────────────────────────
datos_listos = threading.Event()

def preparar_datos():
    time.sleep(0.5)
    print("Datos preparados")
    datos_listos.set()   # señal

def procesar_cuando_listos():
    datos_listos.wait()   # bloquea hasta que se llame a set()
    print("Procesando datos")

t1 = threading.Thread(target=preparar_datos)
t2 = threading.Thread(target=procesar_cuando_listos)
t1.start(); t2.start()
t1.join();  t2.join()


# ── daemon threads: threads de fondo que mueren con el proceso ─
def monitor():
    while True:
        time.sleep(1)
        print("Sistema OK")

monitor_t = threading.Thread(target=monitor, daemon=True)
monitor_t.start()
# El proceso termina normalmente aunque monitor_t siga corriendo
Sala de control de tráfico aéreo con múltiples pantallas de radar y controladores coordinando vuelos simultáneamente
Un centro de control de tráfico aéreo es asyncio en la vida real: hay un solo controlador jefe (el event loop) que coordina decenas de vuelos (coroutines) simultáneamente. El controlador no pilota ningún avión — solo gestiona quién puede hablar y cuándo. Cuando un avión está en trayecto estable (operación I/O en progreso), el controlador atiende a otro. No hay paralelismo real: hay una gestión inteligente de la espera que convierte la atención secuencial en coordinación eficiente. Fuente: Pexels (licencia libre).

asyncio: un hilo, muchas tareas

asyncio implementa concurrencia cooperativa con un event loop: un bucle infinito que gestiona coroutines, decidiendo cuál ejecutar en cada momento. Las coroutines ceden el control voluntariamente con await cuando están esperando I/O. El resultado: un solo thread puede manejar miles de conexiones simultáneas con overhead mínimo.

import asyncio
import time

# ── Coroutine básica: función async ───────────────────────────
async def saludar(nombre: str, delay: float):
    await asyncio.sleep(delay)   # cede control al event loop durante delay
    print(f"Hola, {nombre}!")
    return f"saludado_{nombre}"

# Ejecutar una coroutine desde código síncrono
resultado = asyncio.run(saludar("Ana", 0.1))
print(resultado)   # saludado_Ana


# ── asyncio.gather: ejecutar múltiples coroutines a la vez ────
async def main_gather():
    t0 = time.perf_counter()

    # Las tres coroutines se ejecutan CONCURRENTEMENTE
    resultados = await asyncio.gather(
        saludar("Ana",    0.3),
        saludar("Carlos", 0.3),
        saludar("Beatriz",0.3),
    )

    print(f"Tiempo: {time.perf_counter()-t0:.2f}s")   # ~0.3s, no ~0.9s
    print(resultados)   # ['saludado_Ana', 'saludado_Carlos', 'saludado_Beatriz']

asyncio.run(main_gather())


# ── asyncio.create_task: lanzar sin esperar inmediatamente ────
async def tarea_larga(nombre: str, duracion: float):
    print(f"{nombre} → inicio")
    await asyncio.sleep(duracion)
    print(f"{nombre} → fin")
    return nombre

async def main_tasks():
    # Crear tareas (se lanzan de inmediato en el background)
    task1 = asyncio.create_task(tarea_larga("T1", 0.5))
    task2 = asyncio.create_task(tarea_larga("T2", 0.3))
    task3 = asyncio.create_task(tarea_larga("T3", 0.4))

    # Hacer otra cosa mientras corren...
    await asyncio.sleep(0.1)
    print("Haciendo otras cosas mientras las tareas corren")

    # Ahora esperar todas
    resultados = await asyncio.gather(task1, task2, task3)
    print(resultados)   # ['T1', 'T2', 'T3'] — en orden de creación, no de fin

asyncio.run(main_tasks())


# ── Timeout y cancelación ─────────────────────────────────────
async def operacion_lenta():
    await asyncio.sleep(10)
    return "listo"

async def con_timeout():
    try:
        resultado = await asyncio.wait_for(operacion_lenta(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Timeout: la operación tardó más de 2s")
        resultado = None
    return resultado

asyncio.run(con_timeout())


# ── asyncio.Queue: productor-consumidor asíncrono ─────────────
async def productor_async(cola: asyncio.Queue, n: int):
    for i in range(n):
        await asyncio.sleep(0.05)   # simula generación async
        await cola.put(i)
        print(f"Producido: {i}")
    await cola.put(None)   # sentinel

async def consumidor_async(cola: asyncio.Queue, resultados: list):
    while True:
        item = await cola.get()
        if item is None:
            break
        resultados.append(item * 2)
        print(f"Consumido: {item} → {item*2}")
        cola.task_done()

async def main_queue():
    cola       = asyncio.Queue(maxsize=3)
    resultados = []
    await asyncio.gather(
        productor_async(cola, 5),
        consumidor_async(cola, resultados),
    )
    print(f"Resultados: {resultados}")

asyncio.run(main_queue())

async/await en la práctica

Usar async/await con librerías reales cambia la forma en que se escribe el código de red. Las librerías async nativas (aiohttp, httpx, asyncpg, aiofiles) liberan el event loop durante sus operaciones, permitiendo que otras coroutines avancen mientras se espera la respuesta.

import asyncio
import aiohttp   # pip install aiohttp

# ── Múltiples peticiones HTTP concurrentes ────────────────────
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    """Fetches una URL y devuelve el JSON de respuesta."""
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
        resp.raise_for_status()
        return await resp.json()

async def fetch_todos(urls: list[str]) -> list[dict]:
    """Descarga todas las URLs de forma concurrente."""
    async with aiohttp.ClientSession() as session:
        tareas = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tareas, return_exceptions=True)

# Comparación: 10 peticiones a una API pública
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]

async def main():
    t0 = asyncio.get_event_loop().time()
    resultados = await fetch_todos(urls)
    elapsed = asyncio.get_event_loop().time() - t0

    exitosos = [r for r in resultados if not isinstance(r, Exception)]
    print(f"{len(exitosos)}/10 OK en {elapsed:.2f}s")
    # ~0.5s para 10 peticiones en paralelo vs ~5s secuencial

asyncio.run(main())


# ── Semáforo: limitar la concurrencia ─────────────────────────
# Sin límite, 1000 coroutines simultáneas pueden saturar el servidor
async def fetch_con_semaforo(
    session: aiohttp.ClientSession,
    url: str,
    semaforo: asyncio.Semaphore
) -> dict:
    async with semaforo:   # máximo N peticiones simultáneas
        return await fetch(session, url)

async def fetch_con_limite(urls: list[str], max_concurrentes: int = 10):
    semaforo = asyncio.Semaphore(max_concurrentes)
    async with aiohttp.ClientSession() as session:
        tareas = [fetch_con_semaforo(session, url, semaforo) for url in urls]
        return await asyncio.gather(*tareas, return_exceptions=True)


# ── asyncio con httpx (alternativa moderna) ───────────────────
# httpx tiene API síncrona Y asíncrona — migración mínima desde requests
import httpx

async def fetch_httpx(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=10) as client:
        tareas = [client.get(url) for url in urls]
        respuestas = await asyncio.gather(*tareas)
        return [r.json() for r in respuestas if r.status_code == 200]


# ── Integrar código síncrono en asyncio ───────────────────────
# Si necesitas llamar a una función síncrona bloqueante desde async:
import time

def funcion_sincrona_lenta(n: int) -> int:
    time.sleep(2)   # bloquearía el event loop si se llama directamente
    return n * n

async def main_con_sincrono():
    loop = asyncio.get_event_loop()
    # run_in_executor lanza la función síncrona en un ThreadPoolExecutor
    # sin bloquear el event loop
    resultado = await loop.run_in_executor(None, funcion_sincrona_lenta, 5)
    print(resultado)   # 25

asyncio.run(main_con_sincrono())
💡 La regla de la contaminación async. Una función async def solo puede llamarse con await desde otra función async def. Esto significa que la asincronía se "propaga hacia arriba" en el call stack — si una función necesita ser async, sus llamadores también. Por eso los frameworks modernos como FastAPI hacen que todo el stack sea async desde el principio, en lugar de mezclar síncrono y asíncrono.

multiprocessing: paralelismo real

El módulo multiprocessing lanza múltiples procesos del sistema operativo, cada uno con su propio intérprete Python y su propio GIL. No hay GIL compartido, no hay contención: el paralelismo es real. El precio es el overhead de arranque de procesos y la necesidad de serializar (pickle) los datos que se pasan entre ellos.

import multiprocessing as mp
import time
import os

# ── Process básico ─────────────────────────────────────────────
def calcular_cuadrados(inicio: int, fin: int, resultado: mp.Queue):
    suma = sum(i*i for i in range(inicio, fin))
    resultado.put(suma)
    print(f"Proceso {os.getpid()}: suma parcial = {suma:,}")

if __name__ == "__main__":   # IMPORTANTE: siempre proteger con __name__ == "__main__"
    resultado_q = mp.Queue()
    n = 10_000_000

    # Secuencial
    t0 = time.perf_counter()
    total_sec = sum(i*i for i in range(n))
    print(f"Secuencial: {time.perf_counter()-t0:.2f}s")

    # Paralelo con 4 procesos
    t0 = time.perf_counter()
    chunk   = n // 4
    procesos = [
        mp.Process(target=calcular_cuadrados,
                   args=(i*chunk, (i+1)*chunk, resultado_q))
        for i in range(4)
    ]
    for p in procesos: p.start()
    for p in procesos: p.join()

    total_par = sum(resultado_q.get() for _ in range(4))
    print(f"Paralelo 4p: {time.perf_counter()-t0:.2f}s")
    assert total_sec == total_par


# ── Pool: gestión automática de workers ──────────────────────
def tarea_cpu(n: int) -> int:
    return sum(i*i for i in range(n))

if __name__ == "__main__":
    datos = [1_000_000] * 8   # 8 tareas CPU-intensivas

    # Pool.map: distribución automática entre workers
    with mp.Pool(processes=mp.cpu_count()) as pool:
        t0 = time.perf_counter()
        resultados = pool.map(tarea_cpu, datos)
        print(f"Pool.map: {time.perf_counter()-t0:.2f}s")
        print(f"Total workers: {mp.cpu_count()}")

    # Pool.starmap: para funciones con múltiples argumentos
    with mp.Pool() as pool:
        pares = [(i, i+1) for i in range(8)]
        sumas = pool.starmap(lambda a, b: a + b, pares)


# ── Memoria compartida: Value y Array ────────────────────────
if __name__ == "__main__":
    contador   = mp.Value('i', 0)   # 'i' = integer
    array_comp = mp.Array('d', [0.0] * 10)   # 'd' = double

    def incrementar_shared(lock_obj, contador_obj, n: int):
        for _ in range(n):
            with lock_obj:
                contador_obj.value += 1

    lock = mp.Lock()
    procs = [mp.Process(target=incrementar_shared,
                        args=(lock, contador, 100)) for _ in range(10)]
    for p in procs: p.start()
    for p in procs: p.join()
    print(f"Contador final: {contador.value}")   # 1000
Vías de tren paralelas extendiéndose hacia el horizonte en una estación de ferrocarril, múltiples trenes en vías separadas
Las vías de tren paralelas ilustran multiprocessing: cada tren es un proceso independiente que circula por su propia vía, sin interferir con los demás y sin compartir raíles. Pueden avanzar todos al mismo tiempo, verdaderamente en paralelo. No hay un único controlador que les dé turno — cada locomotora tiene su propio motor. Así funciona multiprocessing: procesos completamente independientes, cada uno con su propio intérprete Python y su propia memoria, moviéndose en paralelo real sobre los núcleos disponibles. Fuente: Pexels (licencia libre).

concurrent.futures: la API unificada

concurrent.futures proporciona una interfaz de alto nivel para ejecutar callables de forma concurrente, con una API idéntica para threads y procesos. Es la herramienta recomendada para la mayoría de casos prácticos: más simple que gestionar threads o procesos manualmente, con soporte nativo para futuros, callbacks y manejo de errores.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import requests   # pip install requests

# ── ThreadPoolExecutor: I/O concurrente ──────────────────────
def descargar_url(url: str) -> dict:
    """Función síncrona — ThreadPoolExecutor la paraleliza."""
    resp = requests.get(url, timeout=10)
    return {"url": url, "status": resp.status_code, "bytes": len(resp.content)}

urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]

# submit() + as_completed: procesar resultados a medida que llegan
t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=10) as executor:
    futuros = {executor.submit(descargar_url, url): url for url in urls}

    for futuro in as_completed(futuros):
        url = futuros[futuro]
        try:
            resultado = futuro.result()
            print(f"✓ {resultado['url']}: {resultado['bytes']} bytes")
        except Exception as e:
            print(f"✗ {url}: {e}")

print(f"10 URLs en {time.perf_counter()-t0:.2f}s")


# ── map(): más simple cuando no necesitas gestión individual ──
with ThreadPoolExecutor(max_workers=10) as executor:
    resultados = list(executor.map(descargar_url, urls, timeout=30))
    # executor.map preserva el orden de las URLs originales
    for r in resultados:
        print(f"{r['url']}: {r['status']}")


# ── ProcessPoolExecutor: CPU-bound ───────────────────────────
import math

def es_primo_lento(n: int) -> bool:
    """Versión deliberadamente lenta para demostrar paralelismo."""
    if n < 2: return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0: return False
    return True

candidatos = list(range(10**6, 10**6 + 200))   # números grandes

# Secuencial
t0 = time.perf_counter()
primos_sec = [n for n in candidatos if es_primo_lento(n)]
print(f"Secuencial: {time.perf_counter()-t0:.3f}s, {len(primos_sec)} primos")

# Paralelo con ProcessPoolExecutor
t0 = time.perf_counter()
with ProcessPoolExecutor() as executor:   # usa cpu_count() workers por defecto
    resultados = list(executor.map(es_primo_lento, candidatos))
    primos_par = [n for n, es in zip(candidatos, resultados) if es]
print(f"Paralelo:   {time.perf_counter()-t0:.3f}s, {len(primos_par)} primos")


# ── Future: objeto para resultado asíncrono ───────────────────
with ThreadPoolExecutor(max_workers=3) as executor:
    futuro1 = executor.submit(time.sleep, 0.5)   # devuelve Future
    futuro2 = executor.submit(sum, range(1000))

    # Comprobar estado sin bloquear
    print(f"futuro1 completado: {futuro1.done()}")   # False (aún corriendo)
    print(f"futuro2 resultado: {futuro2.result()}")   # 499500 (bloquea si necesario)

    futuro1.result()   # espera a que termine
    print(f"futuro1 completado: {futuro1.done()}")   # True
Ficha de referencia rápida de concurrencia en Python: threading, asyncio, multiprocessing y concurrent.futures con sus APIs principales
La referencia completa de concurrencia en Python en una página: las APIs principales de threading, asyncio, multiprocessing y concurrent.futures, con patrones comunes como Queue, Lock, Semaphore y las decisiones clave de diseño. Ficha de referencia: Ciberaula.

¿Cuándo usar qué? Guía de decisión

La elección entre threading, asyncio y multiprocessing no es una preferencia estética: depende del tipo de tarea, el volumen de concurrencia y las librerías que uses. Esta guía cubre el 95% de los casos.

# ── Árbol de decisión ─────────────────────────────────────────
#
# ¿El código pasa más tiempo esperando (I/O) o calculando (CPU)?
#
# I/O-bound:
#   ¿Las librerías que uso tienen versión async (aiohttp, asyncpg...)?
#     SÍ → asyncio + async/await
#     NO → ThreadPoolExecutor (envuelve código síncrono)
#   ¿Necesito más de ~500 operaciones simultáneas?
#     SÍ → asyncio (un thread, bajo overhead)
#     NO → ThreadPoolExecutor (suficiente para la mayoría de casos)
#
# CPU-bound:
#   ¿El cuello es una librería C que libera el GIL (NumPy, OpenCV)?
#     SÍ → threading puede ayudar
#     NO → ProcessPoolExecutor / multiprocessing.Pool
#
# ¿Es un script simple de un solo uso?
#   → concurrent.futures (API uniforme, más simple)
#
# ¿Es parte de un servicio web?
#   → FastAPI/Starlette (asyncio gestionado por el framework)


# ── Ejemplos de la vida real ──────────────────────────────────
# 1. Scraper web (100 URLs) → ThreadPoolExecutor(max_workers=20)
# 2. API que hace N consultas a BD por request → asyncio + asyncpg
# 3. Procesamiento de imágenes (OpenCV) → ProcessPoolExecutor
# 4. Servidor FastAPI → asyncio (el framework ya lo gestiona)
# 5. Script de análisis de datos → multiprocessing.Pool para transformaciones


# ── Resumen de propiedades clave ──────────────────────────────
#
# │ Herramienta       │ Modelo         │ Ideal para          │ Overhead │
# │───────────────────│────────────────│─────────────────────│──────────│
# │ threading         │ Preemptivo OS  │ I/O, código sync    │ Medio    │
# │ asyncio           │ Cooperativo    │ I/O masivo, async   │ Bajo     │
# │ multiprocessing   │ Paralelo real  │ CPU-bound           │ Alto     │
# │ concurrent.futures│ Unificado      │ Uso general         │ Varía    │


# ── Patrones para no reinventar la rueda ──────────────────────
import asyncio
from concurrent.futures import ThreadPoolExecutor

# Patrón híbrido: asyncio + ThreadPoolExecutor para código síncrono
async def procesar_con_threads(items: list, funcion_sincrona):
    """Ejecuta código síncrono bloqueante desde asyncio."""
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor(max_workers=10) as pool:
        tareas = [
            loop.run_in_executor(pool, funcion_sincrona, item)
            for item in items
        ]
        return await asyncio.gather(*tareas)

# Uso: llamar a requests.get() desde asyncio sin bloquear el event loop
async def main():
    import requests
    resultados = await procesar_con_threads(
        [f"https://httpbin.org/delay/0" for _ in range(5)],
        lambda url: requests.get(url).status_code
    )
    print(resultados)   # [200, 200, 200, 200, 200]

asyncio.run(main())

❓ Preguntas frecuentes

❓ Preguntas frecuentes sobre Concurrencia en Python: threading, asyncio y multiprocessing

Las dudas más comunes respondidas de forma clara y directa.

No, pero limita un caso específico: el paralelismo CPU en un solo proceso. El GIL (Global Interpreter Lock) es un mutex que protege el estado interno del intérprete CPython y permite que solo un hilo ejecute bytecode Python a la vez. Esto significa que dos threads Python no pueden ejecutar código Python simultáneamente en el mismo proceso, aunque tengas 16 cores disponibles. Sin embargo, el GIL se libera automáticamente durante operaciones de I/O (lectura de red, archivos, bases de datos) y durante extensiones C que lo liberan explícitamente (NumPy, OpenCV). Para I/O concurrente, threading y asyncio funcionan perfectamente. Para paralelismo CPU real, multiprocessing usa múltiples procesos (cada uno con su propio GIL) y soluciona el problema. Python 3.13 introduce el "free-threaded mode" que elimina el GIL como opción experimental, aunque con penalizaciones de rendimiento en uso no concurrente.
La regla general: asyncio para I/O masivo con alta concurrencia, threading para tareas que mezclan I/O con código síncrono o librerías que no soportan async. asyncio puede manejar decenas de miles de conexiones concurrentes con un solo hilo porque el event loop cede control voluntariamente entre tareas (cooperative multitasking). Threading usa el sistema operativo para planificar la ejecución, tiene overhead mayor por cambio de contexto, y puede tener condiciones de carrera si no se protegen recursos compartidos. En la práctica: si usas httpx, aiohttp, asyncpg u otras librerías async-first, usa asyncio. Si usas requests, psycopg2 u otras librerías síncronas y quieres paralelizarlas sin reescribirlas, usa ThreadPoolExecutor. FastAPI y todos los frameworks modernos están basados en asyncio.
Una condición de carrera ocurre cuando el resultado de un programa depende del orden no determinista en que se ejecutan múltiples threads. El ejemplo clásico: dos threads leen un contador (valor=5), ambos lo incrementan en memoria (valor=6), y ambos escriben de vuelta — el resultado es 6 en lugar de 7. En threading se evita con threading.Lock(): solo un thread puede adquirir el lock a la vez, el resto espera. En asyncio no existe el problema en la mayoría de casos porque solo una coroutine se ejecuta a la vez (cooperative multitasking) — la ejecución solo cede en los puntos await. Con multiprocessing, los procesos no comparten memoria por defecto, pero si se usan recursos compartidos explícitos (Value, Array, Manager), también hay que protegerlos. La forma más segura es diseñar el código para evitar estado compartido mutable desde el principio: pasar datos entre workers por colas (Queue, asyncio.Queue) en lugar de variables compartidas.
asyncio.run() (Python 3.7+) es la API de alto nivel recomendada para la mayoría de usos. Crea un nuevo event loop, ejecuta la coroutine hasta completarla, cierra el loop correctamente y llama a los finalizers pendientes. Es la forma correcta de usar asyncio desde código síncrono (punto de entrada de un script). event_loop.run_until_complete() es la API de bajo nivel, útil cuando ya tienes un loop en marcha y necesitas controlarlo manualmente, o en entornos donde el loop ya está gestionado por un framework (Jupyter, FastAPI, Django async). En notebooks Jupyter hay un loop ya corriendo, por lo que asyncio.run() lanza un error — allí se usa await directamente o loop.run_until_complete(). La regla: usa asyncio.run() en scripts y puntos de entrada; usa await directamente en código dentro de funciones async.
Para ThreadPoolExecutor: el límite práctico depende del tipo de I/O. Para I/O de red (requests HTTP), valores de 10-50 son razonables; más no mejora porque la CPU pasa a ser el cuello de botella en gestionar los threads. Python usa min(32, os.cpu_count() + 4) como default. Para operaciones muy lentas (30s+ por tarea), más threads pueden ayudar. Para ProcessPoolExecutor: el número óptimo de workers es os.cpu_count() para tareas CPU-intensivas — más procesos que cores no mejoran el rendimiento y añaden overhead de comunicación. Para tareas con mezcla de CPU e I/O, valores de cpu_count * 2 pueden ser eficientes. La herramienta definitiva es medirlo: itera sobre distintos valores con timeit y el perfil de tu carga de trabajo real.
Valora este artículo

💬 Foro de discusión

¿Tienes dudas sobre Concurrencia en Python: threading, asyncio y multiprocessing? Comparte tu pregunta con la comunidad.

¿Tienes cuenta? o comenta como invitado ↓

Todavía no hay mensajes. ¡Sé el primero en participar!