Python: Key Concepts to Master Generative AI in 2023

Le brief IA que les pros lisent chaque soir
Les 7 actus IA du jour, décryptées en 5 min. Gratuit.
Inclus dès l'inscription : notre sélection des meilleurs guides & comparatifs IA.
Choisis ton rythme
Gratuit · Pas de spam · Désabonnement en 1 clic
Async / Await: The Backbone of LLM Applications
In the development of applications based on large language models (LLM), one reality stands out: code spends a significant amount of its time waiting. Whether it's to get a response from an LLM, to interact with an embedding API, or to access a vector database, waiting is ubiquitous. The absence of asynchronous programming, or async, limits an application to serving one user at a time. In contrast, integrating async allows for handling thousands of requests simultaneously with a single thread.
How the Await Instruction Works
When an await expression is encountered in Python code, the current coroutine is paused, and control is returned to the event loop. This loop examines tasks that are ready to be executed, processes them, and then returns to the initial coroutine once the result is available. This process occurs without resorting to threads or operating system context switches, illustrating pure cooperative multitasking.
import asyncio
import [anthropic](/dossier/anthropic)
client = anthropic.AsyncAnthropic()
async def ask_claude(prompt: str, label: str) -> str:
message = await client.messages.create(
model="[claude](/outil/claude)-opus-4-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return f"[{label}]{message.content[0].text}"
async def main():
questions = [
("What is a [transformer](/glossaire/transformer) architecture?", "A"),
("Explain RAG in one paragraph.", "B"),
("What is chain-of-thought prompting?", "C"),
("Describe the attention mechanism briefly.", "D"),
("What is a vector database used for?", "E"),
]
results = await asyncio.gather(
*[ask_claude(q, l) for q, l in questions]
)
for r in results:
print(r)
asyncio.run(main())
⚡ Practical Impact
In a scenario where LLM calls must be made sequentially for 100 documents with a response time of 3 seconds each, the process would take 5 minutes. However, by using asyncio.gather(), these calls execute in parallel, reducing the total time to about 3 to 5 seconds, achieving a performance gain of 60 times without requiring additional hardware.
Fire-and-Forget with Tasks
The function asyncio.create_task() allows for scheduling a coroutine immediately, without waiting for its completion. This is particularly useful for RAG pipelines, where data needs to be retrieved simultaneously from a vector database and a web search.
async def rag_pipeline(query: str) -> str:
task_vector = asyncio.create_task(search_vector_db(query))
task_web = asyncio.create_task(search_web(query))
system_prompt = "You are a helpful research assistant"
vector_hits, web_hits = await task_vector, await task_web
context = build_context(vector_hits, web_hits)
return await call_llm(system_prompt, context, query)
Real-Time Token Streaming
For applications like ChatGPT, where tokens need to appear as they are generated, asynchronous generators are essential. Instead of waiting for a complete response, each token is returned as soon as it is available.
import anthropic
client = anthropic.AsyncAnthropic()
async def stream_response(prompt: str):
async with client.messages.stream(
model="claude-opus-4.5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
yield text
async def handle_request(prompt: str):
full_text = ""
async for token in stream_response(prompt):
print(token, end="", flush=True)
full_text += token
print()
return full_text
asyncio.run(handle_request("Explain diffusion models simply."))
Locks to Protect Shared State
Although asyncio operates on a single thread, concurrency issues can arise. For example, if two coroutines access and modify a shared counter without protection, the results can be incorrect. Using asyncio.Lock ensures that only one coroutine accesses the critical section at a time.
import asyncio
from collections import defaultdict
request_counts: dict[str, int] = defaultdict(int)
lock = asyncio.Lock()
async def tracked_embed(text: str, model: str) -> list[float]:
async with lock:
request_counts[model] += 1
if request_counts[model] > 1000:
raise RuntimeError(f"Daily limit hit for {model}")
return await call_embedding_api(text, model)
Multithreading: When Async is Not Possible
Some Python libraries, such as requests or certain database drivers, do not support asynchrony. In these cases, multithreading becomes a viable solution to avoid sacrificing performance.
Understanding the GIL
The Global Interpreter Lock (GIL) is a locking mechanism in CPython that prevents the simultaneous execution of multiple threads on Python bytecode. While this may seem to render multithreading useless, the GIL is actually released during certain operations, including I/O operations, C extensions, and heavy pure Python computations. However, for CPU-intensive operations, multiprocessing is often more appropriate.
from concurrent.futures import ThreadPoolExecutor, as_completed
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-V2")
def embed_text(text: str, idx: int) -> tuple:
embedding = model.encode(text)
return idx, embedding.tolist()
texts = [f"Document chunk {i}" for i in range(50)]
with ThreadPoolExecutor(max_workers=8) as pool:
futures = {pool.submit(embed_text, t, i): i for i, t in enumerate(texts)}
results = {}
for future in as_completed(futures):
idx, embedding = future.result()
results[idx] = embedding
print(f"Embedded {len(results)} chunks")
Synchronization Primitives
Synchronization primitives such as events and semaphores are essential for managing access to shared resources between threads. For example, a threading.Event can be used to signal that a model has finished loading, while a threading.Semaphore limits the number of threads that can execute a task simultaneously.
import threading
import time
model_ready = threading.Event()
api_sem = threading.Semaphore(5) # max 5 concurrent inferences
def load_model():
print("Loading model weights...")
time.sleep(3) # simulate loading a 7B parameter model
model_ready.set() # unlocks all waiting threads
print("Model ready!")
def inference_worker(worker_id: int):
model_ready.wait() # blocks here until the model is loaded
with api_sem: # at most 5
# perform inference
Brief IA — L'actualité IA en français
L'essentiel de l'actualité de l'intelligence artificielle, décrypté et expliqué chaque jour.