Hard📖Теория7 min

Асинхронные потоки и примитивы

Streams API, asyncio.Queue, Lock, Semaphore, Event и асинхронные итераторы

Асинхронные потоки и примитивы синхронизации

Asyncio Streams — сетевые потоки

Streams API предоставляет высокоуровневый интерфейс для асинхронного сетевого ввода-вывода. Два основных объекта: StreamReader для чтения и StreamWriter для записи.

import asyncio

async def tcp_echo_client(message: str) -> str:
    """Send a message to echo server and get response."""
    reader, writer = await asyncio.open_connection("127.0.0.1", 8888)

    # Send data
    writer.write(message.encode())
    await writer.drain()  # Ensure data is sent

    # Read response
    data = await reader.read(1024)
    response = data.decode()

    # Close connection
    writer.close()
    await writer.wait_closed()

    return response

async def main() -> None:
    response = await tcp_echo_client("Привет, сервер!")
    print(f"Ответ: {response}")

# asyncio.run(main())

TCP-сервер на Streams

import asyncio

async def handle_client(
    reader: asyncio.StreamReader,
    writer: asyncio.StreamWriter,
) -> None:
    """Handle a single client connection."""
    addr = writer.get_extra_info("peername")
    print(f"Новое подключение: {addr}")

    while True:
        data = await reader.read(1024)
        if not data:
            break

        message = data.decode()
        print(f"[{addr}] Получено: {message}")

        # Echo back with modification
        response = f"Эхо: {message}"
        writer.write(response.encode())
        await writer.drain()

    print(f"Отключение: {addr}")
    writer.close()
    await writer.wait_closed()

async def main() -> None:
    server = await asyncio.start_server(
        handle_client,
        "127.0.0.1",
        8888,
    )

    addrs = ", ".join(str(s.getsockname()) for s in server.sockets)
    print(f"Сервер запущен: {addrs}")

    async with server:
        await server.serve_forever()

# asyncio.run(main())

asyncio.Queue — очередь задач

asyncio.Queue — асинхронная очередь для передачи данных между корутинами. Это основа паттерна producer-consumer.

import asyncio
import random

async def producer(queue: asyncio.Queue[int], name: str) -> None:
    """Generate items and put them in the queue."""
    for i in range(5):
        item = random.randint(1, 100)
        await queue.put(item)
        print(f"[{name}] Произведён: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

    # Signal that producer is done
    await queue.put(-1)

async def consumer(queue: asyncio.Queue[int], name: str) -> int:
    """Process items from the queue."""
    total = 0
    while True:
        item = await queue.get()

        if item == -1:
            queue.task_done()
            break

        # Simulate processing
        await asyncio.sleep(0.2)
        total += item
        print(f"  [{name}] Обработан: {item}")
        queue.task_done()

    return total

async def main() -> None:
    queue: asyncio.Queue[int] = asyncio.Queue(maxsize=10)

    # Start producers and consumers
    producers = [
        asyncio.create_task(producer(queue, f"P{i}"))
        for i in range(2)
    ]
    consumers = [
        asyncio.create_task(consumer(queue, f"C{i}"))
        for i in range(3)
    ]

    # Wait for producers to finish
    await asyncio.gather(*producers)

    # Signal remaining consumers to stop
    for _ in range(len(consumers) - len(producers)):
        await queue.put(-1)

    # Wait for consumers
    results = await asyncio.gather(*consumers)
    print(f"Итого обработано: {sum(results)}")

asyncio.run(main())

Типы очередей

import asyncio

async def demo_queues() -> None:
    # Standard FIFO queue
    fifo: asyncio.Queue[str] = asyncio.Queue()
    await fifo.put("первый")
    await fifo.put("второй")
    print(await fifo.get())  # первый

    # LIFO queue (stack)
    lifo: asyncio.LifoQueue[str] = asyncio.LifoQueue()
    await lifo.put("первый")
    await lifo.put("второй")
    print(await lifo.get())  # второй

    # Priority queue
    pq: asyncio.PriorityQueue[tuple[int, str]] = asyncio.PriorityQueue()
    await pq.put((3, "низкий приоритет"))
    await pq.put((1, "высокий приоритет"))
    await pq.put((2, "средний приоритет"))
    print(await pq.get())  # (1, 'высокий приоритет')

asyncio.run(demo_queues())

Примитивы синхронизации

Хотя asyncio работает в одном потоке, конкурентные корутины могут конфликтовать при доступе к общим ресурсам. Примитивы синхронизации решают эту проблему.

Lock — блокировка

import asyncio

class BankAccount:
    """Thread-safe bank account using asyncio Lock."""

    def __init__(self, balance: float) -> None:
        self._balance = balance
        self._lock = asyncio.Lock()

    @property
    def balance(self) -> float:
        return self._balance

    async def transfer(self, amount: float) -> None:
        """Transfer money with lock to prevent race conditions."""
        async with self._lock:
            current = self._balance
            # Simulate processing delay
            await asyncio.sleep(0.01)
            self._balance = current + amount
            action = "пополнение" if amount > 0 else "списание"
            print(f"{action}: {abs(amount):.2f}, баланс: {self._balance:.2f}")

async def main() -> None:
    account = BankAccount(1000.0)

    # Without lock, concurrent operations could corrupt balance
    tasks = [
        account.transfer(100),
        account.transfer(-50),
        account.transfer(200),
        account.transfer(-75),
    ]
    await asyncio.gather(*tasks)
    print(f"Итоговый баланс: {account.balance:.2f}")

asyncio.run(main())

Semaphore — ограничение конкурентности

import asyncio

async def fetch_url(
    url: str,
    semaphore: asyncio.Semaphore,
) -> str:
    """Fetch URL with concurrency limit."""
    async with semaphore:
        print(f"Загружаю: {url}")
        await asyncio.sleep(1)  # Simulate network request
        return f"OK: {url}"

async def main() -> None:
    # Allow max 3 concurrent requests
    semaphore = asyncio.Semaphore(3)

    urls = [f"https://api.example.com/page/{i}" for i in range(10)]
    results = await asyncio.gather(
        *[fetch_url(url, semaphore) for url in urls]
    )
    print(f"Загружено {len(results)} страниц")

asyncio.run(main())

Event — сигнализация между корутинами

import asyncio

async def waiter(event: asyncio.Event, name: str) -> None:
    """Wait for the event to be set."""
    print(f"[{name}] Жду сигнала...")
    await event.wait()
    print(f"[{name}] Сигнал получен!")

async def setter(event: asyncio.Event) -> None:
    """Set the event after a delay."""
    print("Подготовка данных...")
    await asyncio.sleep(2)
    print("Данные готовы! Отправляю сигнал.")
    event.set()

async def main() -> None:
    event = asyncio.Event()

    await asyncio.gather(
        waiter(event, "A"),
        waiter(event, "B"),
        waiter(event, "C"),
        setter(event),
    )

asyncio.run(main())

Асинхронные итераторы и генераторы

async for — асинхронная итерация

Асинхронный итератор реализует методы __aiter__() и __anext__():

import asyncio

class AsyncCounter:
    """Async iterator that counts with delays."""

    def __init__(self, start: int, stop: int) -> None:
        self.current = start
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self) -> int:
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.3)
        value = self.current
        self.current += 1
        return value

async def main() -> None:
    async for number in AsyncCounter(1, 6):
        print(f"Число: {number}")

asyncio.run(main())

async def с yield — асинхронные генераторы

Более простой способ создать асинхронный итератор:

import asyncio

async def async_range(start: int, stop: int, delay: float = 0.5):
    """Async generator that yields numbers with delay."""
    for i in range(start, stop):
        await asyncio.sleep(delay)
        yield i

async def read_chunks(data: str, chunk_size: int = 10):
    """Async generator simulating chunked reading."""
    for i in range(0, len(data), chunk_size):
        await asyncio.sleep(0.1)  # Simulate I/O
        yield data[i:i + chunk_size]

async def main() -> None:
    # Async for with generator
    async for num in async_range(1, 5):
        print(f"Получено: {num}")

    # Async comprehension
    values = [v async for v in async_range(10, 15, 0.1)]
    print(f"Значения: {values}")

    # Chunked reading
    text = "Это длинная строка для демонстрации чтения по частям"
    async for chunk in read_chunks(text, 15):
        print(f"Чанк: '{chunk}'")

asyncio.run(main())

async with — асинхронные контекстные менеджеры

import asyncio
from contextlib import asynccontextmanager

class AsyncDBConnection:
    """Async context manager for database connection."""

    def __init__(self, dsn: str) -> None:
        self.dsn = dsn
        self.connected = False

    async def __aenter__(self):
        print(f"Подключаюсь к {self.dsn}...")
        await asyncio.sleep(0.5)  # Simulate connection
        self.connected = True
        print("Подключено!")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Закрываю соединение...")
        await asyncio.sleep(0.1)  # Simulate cleanup
        self.connected = False
        print("Соединение закрыто.")
        return False  # Do not suppress exceptions

    async def query(self, sql: str) -> list[dict]:
        """Execute a query."""
        if not self.connected:
            raise RuntimeError("Нет подключения к БД")
        await asyncio.sleep(0.3)
        return [{"id": 1, "name": "test"}]

async def main() -> None:
    async with AsyncDBConnection("postgres://localhost/mydb") as db:
        result = await db.query("SELECT * FROM users")
        print(f"Результат: {result}")

asyncio.run(main())

asynccontextmanager — через декоратор

from contextlib import asynccontextmanager
from collections.abc import AsyncIterator

@asynccontextmanager
async def managed_resource(name: str) -> AsyncIterator[dict]:
    """Async context manager using decorator."""
    print(f"Захват ресурса: {name}")
    resource = {"name": name, "active": True}
    try:
        yield resource
    finally:
        resource["active"] = False
        print(f"Освобождение ресурса: {name}")

async def main() -> None:
    async with managed_resource("файл") as res:
        print(f"Использую: {res}")

asyncio.run(main())

Практический пример: пул воркеров с очередью

import asyncio
import random
from dataclasses import dataclass, field
from time import monotonic

@dataclass
class Job:
    """A unit of work to process."""
    id: int
    data: str
    priority: int = 0

@dataclass
class WorkerPool:
    """Pool of async workers processing a shared queue."""
    num_workers: int
    queue: asyncio.PriorityQueue = field(default_factory=asyncio.PriorityQueue)
    results: list[str] = field(default_factory=list)

    async def submit(self, job: Job) -> None:
        """Submit a job to the pool."""
        await self.queue.put((job.priority, job.id, job))

    async def _worker(self, name: str) -> None:
        """Worker that processes jobs from the queue."""
        while True:
            priority, _, job = await self.queue.get()
            try:
                delay = random.uniform(0.1, 0.5)
                await asyncio.sleep(delay)
                result = f"[{name}] Job#{job.id} '{job.data}' ({delay:.2f}с)"
                self.results.append(result)
                print(result)
            finally:
                self.queue.task_done()

    async def run(self, jobs: list[Job]) -> list[str]:
        """Run pool with given jobs."""
        # Submit all jobs
        for job in jobs:
            await self.submit(job)

        # Start workers
        workers = [
            asyncio.create_task(self._worker(f"W{i}"))
            for i in range(self.num_workers)
        ]

        # Wait for all jobs to complete
        await self.queue.join()

        # Cancel workers
        for w in workers:
            w.cancel()

        return self.results

async def main() -> None:
    pool = WorkerPool(num_workers=3)
    jobs = [
        Job(id=i, data=f"задача-{i}", priority=random.randint(1, 5))
        for i in range(10)
    ]

    start = monotonic()
    results = await pool.run(jobs)
    elapsed = monotonic() - start

    print(f"\nОбработано {len(results)} задач за {elapsed:.2f}с")

asyncio.run(main())

Проверь себя

🧪

Зачем вызывать `await writer.drain()` после `writer.write()`?

🧪

Что произойдёт при вызове `await queue.get()` на пустой asyncio.Queue?

🧪

Для чего используется asyncio.Semaphore?

🧪

Как создать асинхронное списковое включение (async comprehension)?