La recherche en IA te passionne ?
Les papers et avancées qui comptent, expliqués simplement, chaque soir. Gratuit.
Inclus dès l'inscription : notre sélection des meilleurs guides & comparatifs IA.
Choisis ton rythme
Gratuit · Pas de spam · Désabonnement en 1 clic
Async / Await : La Colonne Vertébrale des Applications LLM
Dans le développement d'applications basées sur les modèles de langage de grande taille (LLM), une réalité s'impose : le code passe une grande partie de son temps à attendre. Que ce soit pour obtenir une réponse d'un LLM, pour interagir avec une API d'embedding ou pour accéder à une base de données vectorielle, l'attente est omniprésente. L'absence de la programmation asynchrone, ou async, limite une application à servir un utilisateur à la fois. En revanche, l'intégration d'async permet de gérer simultanément des milliers de requêtes avec un seul thread.
Le Fonctionnement de l'Instruction Await
Lorsqu'une expression await est rencontrée dans un code Python, la coroutine en cours est mise en pause et le contrôle est rendu à la boucle d'événements. Cette boucle examine les tâches prêtes à être exécutées, avance dans leur traitement, puis revient à la coroutine initiale une fois le résultat disponible. Ce processus se déroule sans recours aux threads ou aux changements de contexte du système d'exploitation, illustrant un multitâche coopératif pur.
import asyncio
import [anthropic](/dossier/anthropic)
client = anthropic.AsyncAnthropic()
async def ask_claude(prompt: str, label: str) -> str:
message = await client.messages.create(
model="[claude](/outil/claude)-opus-4-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return f"[{label}]{message.content[0].text}"
async def main():
questions = [
("What is a [transformer](/glossaire/transformer) architecture?", "A"),
("Explain RAG in one paragraph.", "B"),
("What is chain-of-thought prompting?", "C"),
("Describe the attention mechanism briefly.", "D"),
("What is a vector database used for?", "E"),
]
results = await asyncio.gather(
*[ask_claude(q, l) for q, l in questions]
)
for r in results:
print(r)
asyncio.run(main())
⚡ Impact Pratique
Dans une situation où des appels LLM doivent être effectués de manière séquentielle pour 100 documents avec un temps de réponse de 3 secondes chacun, le processus prendrait 5 minutes. Cependant, en utilisant asyncio.gather(), ces appels s'exécutent en parallèle, réduisant le temps total à environ 3 à 5 secondes, soit un gain de performance de 60 fois sans nécessiter de matériel supplémentaire.
Lancer et Oublier avec les Tâches
La fonction asyncio.create_task() permet de planifier une coroutine immédiatement, sans attendre son achèvement. Cela est particulièrement utile pour les pipelines RAG, où des données doivent être récupérées simultanément à partir d'une base de données vectorielle et d'une recherche web.
async def rag_pipeline(query: str) -> str:
task_vector = asyncio.create_task(search_vector_db(query))
task_web = asyncio.create_task(search_web(query))
system_prompt = "You are a helpful research assistant"
vector_hits, web_hits = await task_vector, await task_web
context = build_context(vector_hits, web_hits)
return await call_llm(system_prompt, context, query)
Streaming de Tokens en Temps Réel
Pour des applications comme ChatGPT, où les tokens doivent apparaître au fur et à mesure de leur génération, les générateurs asynchrones sont essentiels. Plutôt que d'attendre une réponse complète, chaque token est renvoyé dès qu'il est disponible.
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_response(prompt: str):
async with client.messages.stream(
model="claude-opus-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
yield text
async def handle_request(prompt: str):
full_text = ""
async for token in stream_response(prompt):
print(token, end="", flush=True)
full_text += token
print()
return full_text
asyncio.run(handle_request("Explain diffusion models simply."))
Verrous pour Protéger l'État Partagé
Bien que asyncio fonctionne sur un seul thread, des problèmes de concurrence peuvent survenir. Par exemple, si deux coroutines accèdent et modifient un compteur partagé sans protection, les résultats peuvent être incorrects. L'utilisation de asyncio.Lock garantit qu'une seule coroutine accède à la section critique à la fois.
import asyncio
from collections import defaultdict
request_counts: dict[str, int] = defaultdict(int)
lock = asyncio.Lock()
async def tracked_embed(text: str, model: str) -> list[float]:
async with lock:
request_counts[model] += 1
if request_counts[model] > 1000:
raise RuntimeError(f"Daily limit hit for {model}")
return await call_embedding_api(text, model)
Multithreading : Quand l'Async n'est Pas Possible
Certaines bibliothèques Python, telles que requests ou certains pilotes de base de données, ne supportent pas l'asynchronisme. Dans ces cas, le multithreading devient une solution viable pour ne pas sacrifier la performance.
Comprendre le GIL
Le Global Interpreter Lock (GIL) est un mécanisme de verrouillage dans CPython qui empêche l'exécution simultanée de plusieurs threads sur le bytecode Python. Bien que cela puisse sembler rendre le multithreading inutile, le GIL est en fait libéré lors de certaines opérations, notamment les opérations d'E/S, les extensions C, et les calculs lourds en Python pur. Cependant, pour les opérations CPU intensives, le multiprocessing est souvent plus approprié.
from concurrent.futures import ThreadPoolExecutor, as_completed
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-V2")
def embed_text(text: str, idx: int) -> tuple:
embedding = model.encode(text)
return idx, embedding.tolist()
texts = [f"Document chunk {i}" for i in range(50)]
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(embed_text, t, i): i for i, t in enumerate(texts)}
results = {}
for future in as_completed(futures):
idx, embedding = future.result()
results[idx] = embedding
print(f"Embedded {len(results)} chunks")
Primitives de Synchronisation
Les primitives de synchronisation telles que les événements et les sémaphores sont essentielles pour gérer l'accès aux ressources partagées entre les threads. Par exemple, un threading.Event peut être utilisé pour signaler que le chargement d'un modèle est terminé, tandis qu'un threading.Semaphore limite le nombre de threads pouvant exécuter une tâche simultanément.
import threading
import time
model_ready = threading.Event()
api_sem = threading.Semaphore(5) # max 5 inférences concurrentes
def load_model():
print("Loading model weights...")
time.sleep(3) # simuler le chargement d'un modèle de 7B paramètres
model_ready.set() # débloque tous les threads en attente
print("Model ready!")
def inference_worker(worker_id: int):
model_ready.wait() # bloque ici jusqu'à ce que le modèle soit chargé
with api_sem: # au maximum 5
# exécution de l'inférence


