48.7 Streams: async Readers and Writers
Streams in asyncio provide a high-level API for working with network connections using async/await syntax. They offer an abstraction over the lower-level transport and protocol interfaces, making it easier to implement network clients and servers without dealing with callbacks directly. The streams API is built on top of the protocol layer and provides reader/writer objects that manage the underlying I/O operations.
StreamReader and StreamWriter Fundamentals
The core of asyncio streams consists of two primary classes: StreamReader and StreamWriter. The StreamReader handles incoming data from the socket, while the StreamWriter manages outgoing data and connection control. When you establish a connection using asyncio.open_connection() or accept one with asyncio.start_server(), you receive these objects to manage the data flow.
The StreamReader uses an internal buffer that accumulates data as it arrives. This design allows your coroutines to read data at their own pace without worrying about partial data reception. When you call reading methods like read(), readline(), or readexactly(), they will wait until the requested amount of data is available in the buffer, making stream handling much more intuitive than working with raw callbacks.
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
writer.write(message.encode())
await writer.drain() # Ensure data is sent
data = await reader.read(100)
print(f'Received: {data.decode()}')
writer.close()
await writer.wait_closed() # Ensure connection is closed
asyncio.run(tcp_echo_client('Hello World!'))
Establishing Connections and Servers
Creating client connections is straightforward with asyncio.open_connection(), which returns a reader/writer pair. For servers, asyncio.start_server() accepts incoming connections and calls a callback with the reader/writer objects for each client. The writer object is particularly important as it manages the write buffer and provides methods like drain() to ensure proper flow control.
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message} from {addr}")
writer.write(data)
await writer.drain() # Ensure response is sent
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
Flow Control with drain() and write()
A critical aspect of stream writers is flow control. The write() method doesn’t immediately send data to the socket; instead, it buffers the data for efficient sending. This means you can call write() multiple times rapidly, but you must periodically call await writer.drain() to ensure the data is actually transmitted and the buffer doesn’t grow indefinitely. The drain() method blocks until the write buffer is flushed to the underlying transport, preventing memory issues and ensuring proper backpressure handling.
async def send_large_data(writer, large_data):
chunk_size = 1024
for i in range(0, len(large_data), chunk_size):
chunk = large_data[i:i+chunk_size]
writer.write(chunk)
if writer.get_write_buffer_size() > 64 * 1024: # 64KB threshold
await writer.drain() # Prevent buffer overgrowth
await writer.drain() # Final flush
Reading Strategies and Patterns
The StreamReader offers several methods for different reading scenarios. read(n) reads up to n bytes, readline() reads until a newline character, and readexactly(n) reads exactly n bytes, raising an exception if EOF is encountered prematurely. For line-oriented protocols, readuntil(separator) is particularly useful as it can handle custom delimiters.
async def read_lines(reader):
try:
while True:
line = await reader.readuntil(separator=b'\n')
print(f'Received: {line.decode().strip()}')
except asyncio.IncompleteReadError:
print('Connection closed')
except asyncio.LimitOverrunError:
print('Line too long without delimiter')
Proper Resource Management
Streams must be properly closed to avoid resource leaks. Always call writer.close() followed by await writer.wait_closed() to ensure complete cleanup. The context manager pattern can automate this process, making your code more robust against unexpected exceptions.
async def using_context_manager(host, port):
try:
reader, writer = await asyncio.open_connection(host, port)
async with writer: # Automatically closes writer when exiting block
writer.write(b'Hello')
await writer.drain()
data = await reader.read(100)
return data
except (ConnectionError, asyncio.TimeoutError) as e:
print(f"Connection failed: {e}")
Common Pitfalls and Best Practices
One common mistake is forgetting to call drain() after writes, which can lead to memory buildup in the write buffer. Another pitfall is not handling the various reading exceptions: IncompleteReadError when EOF occurs mid-read, LimitOverrunError when too much data arrives without a delimiter, and general connection errors.
Always set timeouts for stream operations using asyncio.wait_for() to prevent hanging indefinitely on stalled connections. For production code, implement proper error handling and reconnection logic, as network connections are inherently unreliable.
async def robust_communication(reader, writer, message, timeout=10):
try:
writer.write(message.encode())
await asyncio.wait_for(writer.drain(), timeout=timeout)
data = await asyncio.wait_for(reader.read(100), timeout=timeout)
return data.decode()
except asyncio.TimeoutError:
print("Operation timed out")
writer.close()
await writer.wait_closed()
raise
except ConnectionError as e:
print(f"Connection failed: {e}")
raise