Python: Key Concepts for Generative AI Developers

Le brief IA que les pros lisent chaque soir
Les 7 actus IA du jour, décryptées en 5 min. Gratuit.
Inclus dès l'inscription : notre sélection des meilleurs guides & comparatifs IA.
Choisis ton rythme
Gratuit · Pas de spam · Désabonnement en 1 clic
Async / Await: The Key to High-Performance LLM Applications
In the development of applications based on large language models (LLMs), one reality stands out: code spends a significant amount of its time waiting. Whether it's waiting for a response from an LLM, interacting with an embedding API, or accessing a vector database, waiting is ubiquitous. The use of asynchronous programming, through the async and await keywords, allows this waiting to be transformed into an opportunity. Without these tools, an application can only serve one user at a time. In contrast, with async, it can handle thousands of users simultaneously, all on a single thread of execution.
Understanding How the Await Expression Works
When an await expression is encountered in a Python program, the current coroutine is paused, and control is returned to the event loop. This loop then examines the tasks ready to be executed, processes them, and returns to the initial coroutine once the expected result is available. This process does not require threads or context switches at the operating system level; it relies purely on cooperative multitasking.
import asyncio
import [anthropic](/dossier/anthropic)
client = anthropic.AsyncAnthropic()
async def ask_claude(prompt: str, label: str) -> str:
message = await client.messages.create(
model="[claude](/outil/claude)-opus-4-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return f"[{label}]{message.content[0].text}"
async def main():
questions = [
("What is a [transformer](/glossaire/transformer) architecture?", "A"),
("Explain RAG in one paragraph.", "B"),
("What is chain-of-thought prompting?", "C"),
("Describe the attention mechanism briefly.", "D"),
("What is a vector database used for?", "E"),
]
results = await asyncio.gather(*[ask_claude(q, l) for q, l in questions])
for r in results:
print(r)
asyncio.run(main())
⚡ The Concrete Impact in the Real World
Let's take an example: making sequential calls to an LLM for 100 documents, each taking 3 seconds, totals about 5 minutes. By using asyncio.gather(), these calls can be executed simultaneously, reducing the total time to about 3 to 5 seconds. This represents a performance gain of 60 times, without requiring additional hardware.
Task Management: Launch and Collect Later
With asyncio.create_task(), a coroutine can be scheduled immediately without waiting for its completion. This feature is ideal for RAG pipelines, where it is possible to simultaneously retrieve data from a vector store and perform a web search.
async def rag_pipeline(query: str) -> str:
task_vector = asyncio.create_task(search_vector_db(query))
task_web = asyncio.create_task(search_web(query))
system_prompt = "You are a helpful research assistant"
vector_hits, web_hits = await task_vector, await task_web
context = build_context(vector_hits, web_hits)
return await call_llm(system_prompt, context, query)
Real-Time Token Streaming with Asynchronous Generators
To provide a smooth user experience, similar to that of ChatGPT, where tokens appear as they are generated, asynchronous generators are essential. Instead of waiting for a complete response, each token is returned as soon as it is available and immediately sent to the client.
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_response(prompt: str):
async with client.messages.stream(
model="claude-opus-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
yield text
async def handle_request(prompt: str):
full_text = ""
async for token in stream_response(prompt):
print(token, end="", flush=True)
full_text += token
print()
return full_text
asyncio.run(handle_request("Explain diffusion models simply."))
Locks: Ensuring Consistency of Shared State
Although asyncio operates on a single thread, race conditions can occur. For example, if two coroutines access and modify a shared counter without locking, it can lead to incorrect results. Using asyncio.Lock ensures that only one coroutine can access the critical section at a time.
import asyncio
from collections import defaultdict
request_counts: dict[str, int] = defaultdict(int)
lock = asyncio.Lock()
async def tracked_embed(text: str, model: str) -> list[float]:
async with lock:
request_counts[model] += 1
if request_counts[model] > 1000:
raise RuntimeError(f"Daily limit hit for {model}")
return await call_embedding_api(text, model)
Multithreading: When Asynchronous is Not an Option
Some Python libraries, like requests or certain database drivers, do not support asynchronous operations. In these cases, multithreading becomes a viable solution to avoid sacrificing performance.
The GIL: Understanding Its Limitations and Opportunities
The Global Interpreter Lock (GIL) is a lock in CPython that prevents multiple threads from executing Python bytecode simultaneously. While this may seem to render multithreading useless, the GIL is released during I/O operations and when executing C code, such as with NumPy or PyTorch. Thus, for CPU-intensive operations in pure Python, multiprocessing is recommended, as pure Python CPU loops never release the GIL.
from concurrent.futures import ThreadPoolExecutor, as_completed
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-V2")
def embed_text(text: str, idx: int) -> tuple:
embedding = model.encode(text)
return idx, embedding.tolist()
texts = [f"Document chunk {i}" for i in range(50)]
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(embed_text, t, i): i for i, t in enumerate(texts)}
results = {}
for future in as_completed(futures):
idx, embedding = future.result()
results[idx] = embedding
print(f"Embedded {len(results)} chunks")
Synchronization Tools: A Complete Toolbox
import threading
import time
model_ready = threading.Event()
api_sem = threading.Semaphore(5) # max 5 concurrent inferences
def load_model():
print("Loading model weights...")
time.sleep(3) # simulate loading a 7B parameter model
model_ready.set() # unlock all waiting threads
print("Model ready!")
def inference_worker(worker_id: int):
model_ready.wait() # block here until the model is loaded
with api_sem: # at most 5 [...]
Brief IA — L'actualité IA en français
L'essentiel de l'actualité de l'intelligence artificielle, décrypté et expliqué chaque jour.