Brief IA : Python : Les Concepts Clés pour Maîtriser l'IA Générative en 2023
🔬 Recherche

Python : Les Concepts Clés pour Maîtriser l'IA Générative en 2023

Brief IA
Tom Levy·5 min·1 vues

L'utilisation d'async/await en Python permet de gérer efficacement des milliers de requêtes simultanées, ce qui est essentiel pour les applications d'IA générative. Le multithreading reste pertinent pour les bibliothèques non-asynchrones, malgré les limitations du GIL, tandis que les générateurs asynchrones facilitent le streaming de tokens en temps réel, crucial pour des applications comme ChatGPT.

En bref
1L'utilisation d'async/await en Python permet de gérer efficacement des milliers de requêtes simultanées sans threads supplémentaires.
2Le multithreading reste pertinent pour les bibliothèques Python non-asynchrones, malgré les limitations du GIL.
3Les générateurs asynchrones facilitent le streaming de tokens en temps réel, crucial pour des applications comme ChatGPT.
💡Pourquoi c'est importantLa maîtrise de ces concepts Python est essentielle pour optimiser les performances des applications d'IA générative, cruciales dans un contexte de forte demande technologique.
Le brief IA que lisent les pros

La recherche en IA te passionne ?

Les papers et avancées qui comptent, expliqués simplement, chaque soir. Gratuit.

Inclus dès l'inscription : notre sélection des meilleurs guides & comparatifs IA.

Choisis ton rythme

Gratuit · Pas de spam · Désabonnement en 1 clic

📄
L'analyse en français

Async / Await : La Colonne Vertébrale des Applications LLM

Dans le développement d'applications basées sur les modèles de langage de grande taille (LLM), une réalité s'impose : le code passe une grande partie de son temps à attendre. Que ce soit pour obtenir une réponse d'un LLM, pour interagir avec une API d'embedding ou pour accéder à une base de données vectorielle, l'attente est omniprésente. L'absence de la programmation asynchrone, ou async, limite une application à servir un utilisateur à la fois. En revanche, l'intégration d'async permet de gérer simultanément des milliers de requêtes avec un seul thread.

Le Fonctionnement de l'Instruction Await

Lorsqu'une expression await est rencontrée dans un code Python, la coroutine en cours est mise en pause et le contrôle est rendu à la boucle d'événements. Cette boucle examine les tâches prêtes à être exécutées, avance dans leur traitement, puis revient à la coroutine initiale une fois le résultat disponible. Ce processus se déroule sans recours aux threads ou aux changements de contexte du système d'exploitation, illustrant un multitâche coopératif pur.

import asyncio
import [anthropic](/dossier/anthropic)

client = anthropic.AsyncAnthropic()

async def ask_claude(prompt: str, label: str) -> str:
    message = await client.messages.create(
        model="[claude](/outil/claude)-opus-4-5",
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}]
    )
    return f"[{label}]{message.content[0].text}"

async def main():
    questions = [
        ("What is a [transformer](/glossaire/transformer) architecture?", "A"),
        ("Explain RAG in one paragraph.", "B"),
        ("What is chain-of-thought prompting?", "C"),
        ("Describe the attention mechanism briefly.", "D"),
        ("What is a vector database used for?", "E"),
    ]
    
    results = await asyncio.gather(
        *[ask_claude(q, l) for q, l in questions]
    )
    
    for r in results:
        print(r)

asyncio.run(main())

⚡ Impact Pratique

Dans une situation où des appels LLM doivent être effectués de manière séquentielle pour 100 documents avec un temps de réponse de 3 secondes chacun, le processus prendrait 5 minutes. Cependant, en utilisant asyncio.gather(), ces appels s'exécutent en parallèle, réduisant le temps total à environ 3 à 5 secondes, soit un gain de performance de 60 fois sans nécessiter de matériel supplémentaire.

Lancer et Oublier avec les Tâches

La fonction asyncio.create_task() permet de planifier une coroutine immédiatement, sans attendre son achèvement. Cela est particulièrement utile pour les pipelines RAG, où des données doivent être récupérées simultanément à partir d'une base de données vectorielle et d'une recherche web.

async def rag_pipeline(query: str) -> str:
    task_vector = asyncio.create_task(search_vector_db(query))
    task_web = asyncio.create_task(search_web(query))
    
    system_prompt = "You are a helpful research assistant"
    
    vector_hits, web_hits = await task_vector, await task_web
    context = build_context(vector_hits, web_hits)
    
    return await call_llm(system_prompt, context, query)

Streaming de Tokens en Temps Réel

Pour des applications comme ChatGPT, où les tokens doivent apparaître au fur et à mesure de leur génération, les générateurs asynchrones sont essentiels. Plutôt que d'attendre une réponse complète, chaque token est renvoyé dès qu'il est disponible.

import anthropic

client = anthropic.AsyncAnthropic()

async def stream_response(prompt: str):
    async with client.messages.stream(
        model="claude-opus-4.5",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            yield text

async def handle_request(prompt: str):
    full_text = ""
    async for token in stream_response(prompt):
        print(token, end="", flush=True)
        full_text += token
    print()
    return full_text

asyncio.run(handle_request("Explain diffusion models simply."))

Verrous pour Protéger l'État Partagé

Bien que asyncio fonctionne sur un seul thread, des problèmes de concurrence peuvent survenir. Par exemple, si deux coroutines accèdent et modifient un compteur partagé sans protection, les résultats peuvent être incorrects. L'utilisation de asyncio.Lock garantit qu'une seule coroutine accède à la section critique à la fois.

import asyncio
from collections import defaultdict

request_counts: dict[str, int] = defaultdict(int)
lock = asyncio.Lock()

async def tracked_embed(text: str, model: str) -> list[float]:
    async with lock:
        request_counts[model] += 1
        if request_counts[model] > 1000:
            raise RuntimeError(f"Daily limit hit for {model}")
        return await call_embedding_api(text, model)

Multithreading : Quand l'Async n'est Pas Possible

Certaines bibliothèques Python, telles que requests ou certains pilotes de base de données, ne supportent pas l'asynchronisme. Dans ces cas, le multithreading devient une solution viable pour ne pas sacrifier la performance.

Comprendre le GIL

Le Global Interpreter Lock (GIL) est un mécanisme de verrouillage dans CPython qui empêche l'exécution simultanée de plusieurs threads sur le bytecode Python. Bien que cela puisse sembler rendre le multithreading inutile, le GIL est en fait libéré lors de certaines opérations, notamment les opérations d'E/S, les extensions C, et les calculs lourds en Python pur. Cependant, pour les opérations CPU intensives, le multiprocessing est souvent plus approprié.

from concurrent.futures import ThreadPoolExecutor, as_completed
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-V2")

def embed_text(text: str, idx: int) -> tuple:
    embedding = model.encode(text)
    return idx, embedding.tolist()

texts = [f"Document chunk {i}" for i in range(50)]

with ThreadPoolExecutor(max_workers=8) as pool:
    futures = {pool.submit(embed_text, t, i): i for i, t in enumerate(texts)}
    results = {}
    
    for future in as_completed(futures):
        idx, embedding = future.result()
        results[idx] = embedding

print(f"Embedded {len(results)} chunks")

Primitives de Synchronisation

Les primitives de synchronisation telles que les événements et les sémaphores sont essentielles pour gérer l'accès aux ressources partagées entre les threads. Par exemple, un threading.Event peut être utilisé pour signaler que le chargement d'un modèle est terminé, tandis qu'un threading.Semaphore limite le nombre de threads pouvant exécuter une tâche simultanément.

import threading
import time

model_ready = threading.Event()
api_sem = threading.Semaphore(5)  # max 5 inférences concurrentes

def load_model():
    print("Loading model weights...")
    time.sleep(3)  # simuler le chargement d'un modèle de 7B paramètres
    model_ready.set()  # débloque tous les threads en attente
    print("Model ready!")

def inference_worker(worker_id: int):
    model_ready.wait()  # bloque ici jusqu'à ce que le modèle soit chargé
    with api_sem:  # au maximum 5
        # exécution de l'inférence

Suivez Brief IA

L'actu IA du jour, aussi dans votre fil.

Commentaires