Асинхронные потоки и примитивы синхронизации
Asyncio Streams — сетевые потоки
Streams API предоставляет высокоуровневый интерфейс для асинхронного сетевого ввода-вывода. Два основных объекта: StreamReader для чтения и StreamWriter для записи.
import asyncio
async def tcp_echo_client(message: str) -> str:
"""Send a message to echo server and get response."""
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# Send data
writer.write(message.encode())
await writer.drain() # Ensure data is sent
# Read response
data = await reader.read(1024)
response = data.decode()
# Close connection
writer.close()
await writer.wait_closed()
return response
async def main() -> None:
response = await tcp_echo_client("Привет, сервер!")
print(f"Ответ: {response}")
# asyncio.run(main())
TCP-сервер на Streams
import asyncio
async def handle_client(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Handle a single client connection."""
addr = writer.get_extra_info("peername")
print(f"Новое подключение: {addr}")
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode()
print(f"[{addr}] Получено: {message}")
# Echo back with modification
response = f"Эхо: {message}"
writer.write(response.encode())
await writer.drain()
print(f"Отключение: {addr}")
writer.close()
await writer.wait_closed()
async def main() -> None:
server = await asyncio.start_server(
handle_client,
"127.0.0.1",
8888,
)
addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
print(f"Сервер запущен: {addrs}")
async with server:
await server.serve_forever()
# asyncio.run(main())
asyncio.Queue — очередь задач
asyncio.Queue — асинхронная очередь для передачи данных между корутинами. Это основа паттерна producer-consumer.
import asyncio
import random
async def producer(queue: asyncio.Queue[int], name: str) -> None:
"""Generate items and put them in the queue."""
for i in range(5):
item = random.randint(1, 100)
await queue.put(item)
print(f"[{name}] Произведён: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# Signal that producer is done
await queue.put(-1)
async def consumer(queue: asyncio.Queue[int], name: str) -> int:
"""Process items from the queue."""
total = 0
while True:
item = await queue.get()
if item == -1:
queue.task_done()
break
# Simulate processing
await asyncio.sleep(0.2)
total += item
print(f" [{name}] Обработан: {item}")
queue.task_done()
return total
async def main() -> None:
queue: asyncio.Queue[int] = asyncio.Queue(maxsize=10)
# Start producers and consumers
producers = [
asyncio.create_task(producer(queue, f"P{i}"))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, f"C{i}"))
for i in range(3)
]
# Wait for producers to finish
await asyncio.gather(*producers)
# Signal remaining consumers to stop
for _ in range(len(consumers) - len(producers)):
await queue.put(-1)
# Wait for consumers
results = await asyncio.gather(*consumers)
print(f"Итого обработано: {sum(results)}")
asyncio.run(main())
Типы очередей
import asyncio
async def demo_queues() -> None:
# Standard FIFO queue
fifo: asyncio.Queue[str] = asyncio.Queue()
await fifo.put("первый")
await fifo.put("второй")
print(await fifo.get()) # первый
# LIFO queue (stack)
lifo: asyncio.LifoQueue[str] = asyncio.LifoQueue()
await lifo.put("первый")
await lifo.put("второй")
print(await lifo.get()) # второй
# Priority queue
pq: asyncio.PriorityQueue[tuple[int, str]] = asyncio.PriorityQueue()
await pq.put((3, "низкий приоритет"))
await pq.put((1, "высокий приоритет"))
await pq.put((2, "средний приоритет"))
print(await pq.get()) # (1, 'высокий приоритет')
asyncio.run(demo_queues())
Примитивы синхронизации
Хотя asyncio работает в одном потоке, конкурентные корутины могут конфликтовать при доступе к общим ресурсам. Примитивы синхронизации решают эту проблему.
Lock — блокировка
import asyncio
class BankAccount:
"""Thread-safe bank account using asyncio Lock."""
def __init__(self, balance: float) -> None:
self._balance = balance
self._lock = asyncio.Lock()
@property
def balance(self) -> float:
return self._balance
async def transfer(self, amount: float) -> None:
"""Transfer money with lock to prevent race conditions."""
async with self._lock:
current = self._balance
# Simulate processing delay
await asyncio.sleep(0.01)
self._balance = current + amount
action = "пополнение" if amount > 0 else "списание"
print(f"{action}: {abs(amount):.2f}, баланс: {self._balance:.2f}")
async def main() -> None:
account = BankAccount(1000.0)
# Without lock, concurrent operations could corrupt balance
tasks = [
account.transfer(100),
account.transfer(-50),
account.transfer(200),
account.transfer(-75),
]
await asyncio.gather(*tasks)
print(f"Итоговый баланс: {account.balance:.2f}")
asyncio.run(main())
Semaphore — ограничение конкурентности
import asyncio
async def fetch_url(
url: str,
semaphore: asyncio.Semaphore,
) -> str:
"""Fetch URL with concurrency limit."""
async with semaphore:
print(f"Загружаю: {url}")
await asyncio.sleep(1) # Simulate network request
return f"OK: {url}"
async def main() -> None:
# Allow max 3 concurrent requests
semaphore = asyncio.Semaphore(3)
urls = [f"https://api.example.com/page/{i}" for i in range(10)]
results = await asyncio.gather(
*[fetch_url(url, semaphore) for url in urls]
)
print(f"Загружено {len(results)} страниц")
asyncio.run(main())
Event — сигнализация между корутинами
import asyncio
async def waiter(event: asyncio.Event, name: str) -> None:
"""Wait for the event to be set."""
print(f"[{name}] Жду сигнала...")
await event.wait()
print(f"[{name}] Сигнал получен!")
async def setter(event: asyncio.Event) -> None:
"""Set the event after a delay."""
print("Подготовка данных...")
await asyncio.sleep(2)
print("Данные готовы! Отправляю сигнал.")
event.set()
async def main() -> None:
event = asyncio.Event()
await asyncio.gather(
waiter(event, "A"),
waiter(event, "B"),
waiter(event, "C"),
setter(event),
)
asyncio.run(main())
Асинхронные итераторы и генераторы
async for — асинхронная итерация
Асинхронный итератор реализует методы __aiter__() и __anext__():
import asyncio
class AsyncCounter:
"""Async iterator that counts with delays."""
def __init__(self, start: int, stop: int) -> None:
self.current = start
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.3)
value = self.current
self.current += 1
return value
async def main() -> None:
async for number in AsyncCounter(1, 6):
print(f"Число: {number}")
asyncio.run(main())
async def с yield — асинхронные генераторы
Более простой способ создать асинхронный итератор:
import asyncio
async def async_range(start: int, stop: int, delay: float = 0.5):
"""Async generator that yields numbers with delay."""
for i in range(start, stop):
await asyncio.sleep(delay)
yield i
async def read_chunks(data: str, chunk_size: int = 10):
"""Async generator simulating chunked reading."""
for i in range(0, len(data), chunk_size):
await asyncio.sleep(0.1) # Simulate I/O
yield data[i:i + chunk_size]
async def main() -> None:
# Async for with generator
async for num in async_range(1, 5):
print(f"Получено: {num}")
# Async comprehension
values = [v async for v in async_range(10, 15, 0.1)]
print(f"Значения: {values}")
# Chunked reading
text = "Это длинная строка для демонстрации чтения по частям"
async for chunk in read_chunks(text, 15):
print(f"Чанк: '{chunk}'")
asyncio.run(main())
async with — асинхронные контекстные менеджеры
import asyncio
from contextlib import asynccontextmanager
class AsyncDBConnection:
"""Async context manager for database connection."""
def __init__(self, dsn: str) -> None:
self.dsn = dsn
self.connected = False
async def __aenter__(self):
print(f"Подключаюсь к {self.dsn}...")
await asyncio.sleep(0.5) # Simulate connection
self.connected = True
print("Подключено!")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Закрываю соединение...")
await asyncio.sleep(0.1) # Simulate cleanup
self.connected = False
print("Соединение закрыто.")
return False # Do not suppress exceptions
async def query(self, sql: str) -> list[dict]:
"""Execute a query."""
if not self.connected:
raise RuntimeError("Нет подключения к БД")
await asyncio.sleep(0.3)
return [{"id": 1, "name": "test"}]
async def main() -> None:
async with AsyncDBConnection("postgres://localhost/mydb") as db:
result = await db.query("SELECT * FROM users")
print(f"Результат: {result}")
asyncio.run(main())
asynccontextmanager — через декоратор
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
@asynccontextmanager
async def managed_resource(name: str) -> AsyncIterator[dict]:
"""Async context manager using decorator."""
print(f"Захват ресурса: {name}")
resource = {"name": name, "active": True}
try:
yield resource
finally:
resource["active"] = False
print(f"Освобождение ресурса: {name}")
async def main() -> None:
async with managed_resource("файл") as res:
print(f"Использую: {res}")
asyncio.run(main())
Практический пример: пул воркеров с очередью
import asyncio
import random
from dataclasses import dataclass, field
from time import monotonic
@dataclass
class Job:
"""A unit of work to process."""
id: int
data: str
priority: int = 0
@dataclass
class WorkerPool:
"""Pool of async workers processing a shared queue."""
num_workers: int
queue: asyncio.PriorityQueue = field(default_factory=asyncio.PriorityQueue)
results: list[str] = field(default_factory=list)
async def submit(self, job: Job) -> None:
"""Submit a job to the pool."""
await self.queue.put((job.priority, job.id, job))
async def _worker(self, name: str) -> None:
"""Worker that processes jobs from the queue."""
while True:
priority, _, job = await self.queue.get()
try:
delay = random.uniform(0.1, 0.5)
await asyncio.sleep(delay)
result = f"[{name}] Job#{job.id} '{job.data}' ({delay:.2f}с)"
self.results.append(result)
print(result)
finally:
self.queue.task_done()
async def run(self, jobs: list[Job]) -> list[str]:
"""Run pool with given jobs."""
# Submit all jobs
for job in jobs:
await self.submit(job)
# Start workers
workers = [
asyncio.create_task(self._worker(f"W{i}"))
for i in range(self.num_workers)
]
# Wait for all jobs to complete
await self.queue.join()
# Cancel workers
for w in workers:
w.cancel()
return self.results
async def main() -> None:
pool = WorkerPool(num_workers=3)
jobs = [
Job(id=i, data=f"задача-{i}", priority=random.randint(1, 5))
for i in range(10)
]
start = monotonic()
results = await pool.run(jobs)
elapsed = monotonic() - start
print(f"\nОбработано {len(results)} задач за {elapsed:.2f}с")
asyncio.run(main())