Brief IA : Python : Concepts Clés pour Développeurs d'IA Générative
🔬 Recherche

Python : Concepts Clés pour Développeurs d'IA Générative

Brief IA
Tom Levy·5 min·0 vues

L'utilisation d'async/await en Python permet de gérer des milliers d'utilisateurs simultanément sur une application LLM. Les générateurs asynchrones facilitent le streaming de tokens en temps réel, optimisant les interactions utilisateur. Le multithreading reste crucial pour les bibliothèques Python non-asynchrones, malgré les limitations du GIL.

En bref
1L'utilisation d'async/await en Python permet de gérer des milliers d'utilisateurs simultanément sur une application LLM.
2Les générateurs asynchrones facilitent le streaming de tokens en temps réel, optimisant les interactions utilisateur.
3Le multithreading reste crucial pour les bibliothèques Python non-asynchrones, malgré les limitations du GIL.
💡Pourquoi c'est importantMaîtriser ces concepts Python est essentiel pour optimiser les performances et l'efficacité des applications d'IA générative.
Le brief IA que lisent les pros

La recherche en IA te passionne ?

Les papers et avancées qui comptent, expliqués simplement, chaque soir. Gratuit.

Inclus dès l'inscription : notre sélection des meilleurs guides & comparatifs IA.

Choisis ton rythme

Gratuit · Pas de spam · Désabonnement en 1 clic

📄
L'analyse en français

Async / Await : La Clé des Applications LLM Performantes

Dans le développement d'applications basées sur des modèles de langage de grande taille (LLM), une réalité s'impose : le code passe une grande partie de son temps à attendre. Que ce soit pour recevoir une réponse d'un LLM, pour interagir avec une API d'embedding ou pour accéder à une base de données vectorielle, l'attente est omniprésente. L'utilisation de la programmation asynchrone, via les mots-clés async et await, permet de transformer cette attente en opportunité. Sans ces outils, une application ne peut servir qu'un utilisateur à la fois. En revanche, avec async, elle peut gérer des milliers d'utilisateurs simultanément, et ce, sur un seul fil d'exécution.

Comprendre le Fonctionnement de l'Expression Await

Lorsqu'une expression await est rencontrée dans un programme Python, la coroutine en cours est mise en pause, et le contrôle est rendu à la boucle d'événements. Cette boucle examine alors les tâches prêtes à être exécutées, avance dans leur traitement, puis retourne à la coroutine initiale une fois que le résultat attendu est disponible. Ce processus ne nécessite ni threads ni changements de contexte au niveau du système d'exploitation, mais repose sur un multitâche coopératif pur.

import asyncio
import [anthropic](/dossier/anthropic)

client = anthropic.AsyncAnthropic()

async def ask_claude(prompt: str, label: str) -> str:
    message = await client.messages.create(
        model="[claude](/outil/claude)-opus-4-5",
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}]
    )
    return f"[{label}]{message.content[0].text}"

async def main():
    questions = [
        ("What is a transformer architecture?", "A"),
        ("Explain RAG in one paragraph.", "B"),
        ("What is chain-of-thought prompting?", "C"),
        ("Describe the attention mechanism briefly.", "D"),
        ("What is a vector database used for?", "E"),
    ]
    
    results = await asyncio.gather(*[ask_claude(q, l) for q, l in questions])
    for r in results:
        print(r)

asyncio.run(main())

⚡ L'Impact Concret dans le Monde Réel

Prenons un exemple : effectuer des appels séquentiels à un LLM pour 100 documents, chacun prenant 3 secondes, totalise environ 5 minutes. En utilisant asyncio.gather(), ces appels peuvent être exécutés simultanément, réduisant le temps total à environ 3 à 5 secondes. Cela représente un gain de performance de 60 fois, sans nécessiter de matériel supplémentaire.

Gestion des Tâches : Lancer et Collecter Plus Tard

Avec asyncio.create_task(), une coroutine peut être planifiée immédiatement sans attendre son achèvement. Cette fonctionnalité est idéale pour les pipelines RAG, où il est possible de récupérer simultanément des données d'un magasin vectoriel et d'une recherche web.

async def rag_pipeline(query: str) -> str:
    task_vector = asyncio.create_task(search_vector_db(query))
    task_web = asyncio.create_task(search_web(query))
    
    system_prompt = "You are a helpful research assistant"
    
    vector_hits, web_hits = await task_vector, await task_web
    context = build_context(vector_hits, web_hits)
    
    return await call_llm(system_prompt, context, query)

Streaming de Tokens en Temps Réel avec des Générateurs Asynchrones

Pour offrir une expérience utilisateur fluide, semblable à celle de ChatGPT, où les tokens apparaissent au fur et à mesure de leur génération, les générateurs asynchrones sont indispensables. Plutôt que d'attendre une réponse complète, chaque token est renvoyé dès qu'il est disponible et transmis immédiatement au client.

import anthropic

client = anthropic.AsyncAnthropic()

async def stream_response(prompt: str):
    async with client.messages.stream(
        model="claude-opus-4.5",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            yield text

async def handle_request(prompt: str):
    full_text = ""
    async for token in stream_response(prompt):
        print(token, end="", flush=True)
        full_text += token
    print()
    return full_text

asyncio.run(handle_request("Explain diffusion models simply."))

Verrous : Assurer la Cohérence de l'État Partagé

Bien que asyncio fonctionne sur un seul thread, des conditions de course peuvent survenir. Par exemple, si deux coroutines accèdent et modifient un compteur partagé sans verrouillage, cela peut entraîner des résultats incorrects. L'utilisation de asyncio.Lock garantit qu'une seule coroutine peut accéder à la section critique à la fois.

import asyncio
from collections import defaultdict

request_counts: dict[str, int] = defaultdict(int)
lock = asyncio.Lock()

async def tracked_embed(text: str, model: str) -> list[float]:
    async with lock:
        request_counts[model] += 1
        if request_counts[model] > 1000:
            raise RuntimeError(f"Daily limit hit for {model}")
        return await call_embedding_api(text, model)

Multithreading : Quand l'Asynchrone n'est pas une Option

Certaines bibliothèques Python, comme requests ou certains pilotes de base de données, ne supportent pas l'asynchrone. Dans ces cas, le multithreading devient une solution viable pour ne pas sacrifier les performances.

Le GIL : Comprendre ses Limitations et ses Opportunités

Le Global Interpreter Lock (GIL) est un verrou dans CPython qui empêche plusieurs threads d'exécuter du bytecode Python simultanément. Bien que cela puisse sembler rendre le multithreading inutile, le GIL est libéré pendant les opérations d'E/S et lors de l'exécution de code C, comme avec NumPy ou PyTorch. Ainsi, pour les opérations de calcul intensif en Python pur, le multiprocessing est recommandé, car les boucles CPU en Python pur ne libèrent jamais le GIL.

from concurrent.futures import ThreadPoolExecutor, as_completed
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-V2")

def embed_text(text: str, idx: int) -> tuple:
    embedding = model.encode(text)
    return idx, embedding.tolist()

texts = [f"Document chunk {i}" for i in range(50)]
with ThreadPoolExecutor(max_workers=8) as pool:
    futures = {pool.submit(embed_text, t, i): i for i, t in enumerate(texts)}
    results = {}
    for future in as_completed(futures):
        idx, embedding = future.result()
        results[idx] = embedding

print(f"Embedded {len(results)} chunks")

Outils de Synchronisation : Une Boîte à Outils Complète

import threading
import time

model_ready = threading.Event()
api_sem = threading.Semaphore(5)  # max 5 inférences concurrentes

def load_model():
    print("Loading model weights...")
    time.sleep(3)  # simuler le chargement d'un modèle de 7B paramètres
    model_ready.set()  # débloque tous les threads en attente
    print("Model ready!")

def inference_worker(worker_id: int):
    model_ready.wait()  # bloque ici jusqu'à ce que le modèle soit chargé
    with api_sem:  # au maximum 5 [...]
Commentaires