La recherche en IA te passionne ?
Les papers et avancées qui comptent, expliqués simplement, chaque soir. Gratuit.
Inclus dès l'inscription : notre sélection des meilleurs guides & comparatifs IA.
Choisis ton rythme
Gratuit · Pas de spam · Désabonnement en 1 clic
Async / Await : La Clé des Applications LLM Performantes
Dans le développement d'applications basées sur des modèles de langage de grande taille (LLM), une réalité s'impose : le code passe une grande partie de son temps à attendre. Que ce soit pour recevoir une réponse d'un LLM, pour interagir avec une API d'embedding ou pour accéder à une base de données vectorielle, l'attente est omniprésente. L'utilisation de la programmation asynchrone, via les mots-clés async et await, permet de transformer cette attente en opportunité. Sans ces outils, une application ne peut servir qu'un utilisateur à la fois. En revanche, avec async, elle peut gérer des milliers d'utilisateurs simultanément, et ce, sur un seul fil d'exécution.
Comprendre le Fonctionnement de l'Expression Await
Lorsqu'une expression await est rencontrée dans un programme Python, la coroutine en cours est mise en pause, et le contrôle est rendu à la boucle d'événements. Cette boucle examine alors les tâches prêtes à être exécutées, avance dans leur traitement, puis retourne à la coroutine initiale une fois que le résultat attendu est disponible. Ce processus ne nécessite ni threads ni changements de contexte au niveau du système d'exploitation, mais repose sur un multitâche coopératif pur.
import asyncio
import [anthropic](/dossier/anthropic)
client = anthropic.AsyncAnthropic()
async def ask_claude(prompt: str, label: str) -> str:
message = await client.messages.create(
model="[claude](/outil/claude)-opus-4-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return f"[{label}]{message.content[0].text}"
async def main():
questions = [
("What is a transformer architecture?", "A"),
("Explain RAG in one paragraph.", "B"),
("What is chain-of-thought prompting?", "C"),
("Describe the attention mechanism briefly.", "D"),
("What is a vector database used for?", "E"),
]
results = await asyncio.gather(*[ask_claude(q, l) for q, l in questions])
for r in results:
print(r)
asyncio.run(main())
⚡ L'Impact Concret dans le Monde Réel
Prenons un exemple : effectuer des appels séquentiels à un LLM pour 100 documents, chacun prenant 3 secondes, totalise environ 5 minutes. En utilisant asyncio.gather(), ces appels peuvent être exécutés simultanément, réduisant le temps total à environ 3 à 5 secondes. Cela représente un gain de performance de 60 fois, sans nécessiter de matériel supplémentaire.
Gestion des Tâches : Lancer et Collecter Plus Tard
Avec asyncio.create_task(), une coroutine peut être planifiée immédiatement sans attendre son achèvement. Cette fonctionnalité est idéale pour les pipelines RAG, où il est possible de récupérer simultanément des données d'un magasin vectoriel et d'une recherche web.
async def rag_pipeline(query: str) -> str:
task_vector = asyncio.create_task(search_vector_db(query))
task_web = asyncio.create_task(search_web(query))
system_prompt = "You are a helpful research assistant"
vector_hits, web_hits = await task_vector, await task_web
context = build_context(vector_hits, web_hits)
return await call_llm(system_prompt, context, query)
Streaming de Tokens en Temps Réel avec des Générateurs Asynchrones
Pour offrir une expérience utilisateur fluide, semblable à celle de ChatGPT, où les tokens apparaissent au fur et à mesure de leur génération, les générateurs asynchrones sont indispensables. Plutôt que d'attendre une réponse complète, chaque token est renvoyé dès qu'il est disponible et transmis immédiatement au client.
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_response(prompt: str):
async with client.messages.stream(
model="claude-opus-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
yield text
async def handle_request(prompt: str):
full_text = ""
async for token in stream_response(prompt):
print(token, end="", flush=True)
full_text += token
print()
return full_text
asyncio.run(handle_request("Explain diffusion models simply."))
Verrous : Assurer la Cohérence de l'État Partagé
Bien que asyncio fonctionne sur un seul thread, des conditions de course peuvent survenir. Par exemple, si deux coroutines accèdent et modifient un compteur partagé sans verrouillage, cela peut entraîner des résultats incorrects. L'utilisation de asyncio.Lock garantit qu'une seule coroutine peut accéder à la section critique à la fois.
import asyncio
from collections import defaultdict
request_counts: dict[str, int] = defaultdict(int)
lock = asyncio.Lock()
async def tracked_embed(text: str, model: str) -> list[float]:
async with lock:
request_counts[model] += 1
if request_counts[model] > 1000:
raise RuntimeError(f"Daily limit hit for {model}")
return await call_embedding_api(text, model)
Multithreading : Quand l'Asynchrone n'est pas une Option
Certaines bibliothèques Python, comme requests ou certains pilotes de base de données, ne supportent pas l'asynchrone. Dans ces cas, le multithreading devient une solution viable pour ne pas sacrifier les performances.
Le GIL : Comprendre ses Limitations et ses Opportunités
Le Global Interpreter Lock (GIL) est un verrou dans CPython qui empêche plusieurs threads d'exécuter du bytecode Python simultanément. Bien que cela puisse sembler rendre le multithreading inutile, le GIL est libéré pendant les opérations d'E/S et lors de l'exécution de code C, comme avec NumPy ou PyTorch. Ainsi, pour les opérations de calcul intensif en Python pur, le multiprocessing est recommandé, car les boucles CPU en Python pur ne libèrent jamais le GIL.
from concurrent.futures import ThreadPoolExecutor, as_completed
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-V2")
def embed_text(text: str, idx: int) -> tuple:
embedding = model.encode(text)
return idx, embedding.tolist()
texts = [f"Document chunk {i}" for i in range(50)]
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(embed_text, t, i): i for i, t in enumerate(texts)}
results = {}
for future in as_completed(futures):
idx, embedding = future.result()
results[idx] = embedding
print(f"Embedded {len(results)} chunks")
Outils de Synchronisation : Une Boîte à Outils Complète
import threading
import time
model_ready = threading.Event()
api_sem = threading.Semaphore(5) # max 5 inférences concurrentes
def load_model():
print("Loading model weights...")
time.sleep(3) # simuler le chargement d'un modèle de 7B paramètres
model_ready.set() # débloque tous les threads en attente
print("Model ready!")
def inference_worker(worker_id: int):
model_ready.wait() # bloque ici jusqu'à ce que le modèle soit chargé
with api_sem: # au maximum 5 [...]
