Fibers — глубокое погружение
Краткое напоминание
Fiber (PHP 8.1+) — механизм кооперативной многозадачности (cooperative multitasking). Fiber позволяет приостановить выполнение кода в произвольном месте стека вызовов и возобновить его позже. Это НЕ параллелизм — в каждый момент времени выполняется только один Fiber.
Потоки ОС (threads): параллельные ──────┐
├── одновременно
параллельные ──────┘
Fibers (кооперативные): задача A ──► suspend ──────────────────► resume ──► return
задача B ──► suspend ──────────────► resume ──► return
задача C ──► return
════════════════════════════════════════════════════════════
один поток, последовательное переключение
Ключевое отличие от базового материала: В этом разделе мы рассматриваем Fiber как фундамент для построения асинхронных систем, а не просто как API для приостановки/возобновления.
Жизненный цикл Fiber
Fiber проходит строго определённые состояния. Переходы между ними жёстко контролируются — некорректный переход выбрасывает FiberError.
new Fiber(callable)
│
▼
┌─────────┐
│ CREATED │
└────┬────┘
│ start()
▼
┌─────────┐
┌────►│ RUNNING │◄────┐
│ └────┬────┘ │
│ │ │
│ suspend() exception
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ SUSPENDED │ │
│ └─────┬─────┘ │
│ │ │
│ resume() / throw()
│ │ │
└──────────┘ │
│ │
return / uncaught exception
│
▼
┌──────────────┐
│ TERMINATED │
└──────────────┘
<?php
declare(strict_types=1);
// Full lifecycle demonstration
$fiber = new Fiber(function (string $name): string {
echo "Phase 1: started with '{$name}'\n";
// First suspension — send data to caller
$command = Fiber::suspend('ready');
echo "Phase 2: received command '{$command}'\n";
// Second suspension — send progress
$data = Fiber::suspend('processing');
echo "Phase 3: received data '{$data}'\n";
return "completed by {$name}";
});
// CREATED state
var_dump($fiber->isStarted()); // false
var_dump($fiber->isSuspended()); // false
var_dump($fiber->isTerminated()); // false
// Transition: CREATED → RUNNING → SUSPENDED
$result = $fiber->start('worker-1');
echo "Caller got: {$result}\n"; // 'ready'
// SUSPENDED state
var_dump($fiber->isStarted()); // true
var_dump($fiber->isSuspended()); // true
var_dump($fiber->isRunning()); // false
// Transition: SUSPENDED → RUNNING → SUSPENDED
$result = $fiber->resume('execute');
echo "Caller got: {$result}\n"; // 'processing'
// Transition: SUSPENDED → RUNNING → TERMINATED
$fiber->resume('payload-data');
// TERMINATED state
var_dump($fiber->isTerminated()); // true
echo $fiber->getReturn(); // 'completed by worker-1'
Вложенные Fibers
Fibers можно создавать и запускать внутри других Fibers. Каждый Fiber имеет собственный стек вызовов. Fiber::suspend() всегда приостанавливает текущий (самый внутренний запущенный) Fiber.
<?php
declare(strict_types=1);
// Nested fibers — each has its own call stack
$outer = new Fiber(function (): void {
echo "Outer: start\n";
$inner = new Fiber(function (): void {
echo "Inner: start\n";
Fiber::suspend(); // Suspends INNER fiber only
echo "Inner: resumed\n";
});
$inner->start();
echo "Outer: inner is suspended\n";
Fiber::suspend(); // Suspends OUTER fiber
echo "Outer: resumed\n";
$inner->resume();
echo "Outer: done\n";
});
$outer->start();
// Output:
// Outer: start
// Inner: start
// Outer: inner is suspended
echo "--- Main: outer is suspended ---\n";
$outer->resume();
// Output:
// Outer: resumed
// Inner: resumed
// Outer: done
Важно:
Fiber::suspend()внутри$innerприостанавливает именно$inner, а не$outer. Управление возвращается в$outer, который продолжает выполнение. Если$outerтоже вызываетFiber::suspend(), управление возвращается в основной код.
Передача данных через вложенные Fibers
<?php
declare(strict_types=1);
// Data flow through nested fibers
$outer = new Fiber(function (): string {
$inner = new Fiber(function (): string {
$value = Fiber::suspend('inner-hello');
return "inner-result: {$value}";
});
$fromInner = $inner->start();
echo "Outer got from inner: {$fromInner}\n"; // 'inner-hello'
// Pass data from outer's caller through to inner
$outerData = Fiber::suspend("outer-forwarding: {$fromInner}");
echo "Outer got: {$outerData}\n";
$inner->resume($outerData); // Forward to inner
$innerResult = $inner->getReturn();
return "outer-result: {$innerResult}";
});
$fromOuter = $outer->start();
echo "Main got: {$fromOuter}\n"; // 'outer-forwarding: inner-hello'
$outer->resume('main-data');
echo $outer->getReturn();
// outer-result: inner-result: main-data
Fiber vs Generator — глубокое сравнение
Generators (yield) и Fibers (Fiber::suspend()) решают похожую задачу — приостановку выполнения. Но у них фундаментальное архитектурное отличие.
<?php
declare(strict_types=1);
// PROBLEM: Generator yield only works at the function level
function generatorHelper(): void
{
// yield 'hello'; // Fatal error: yield from a non-generator function
}
function myGenerator(): Generator
{
yield 'step 1';
// generatorHelper(); // Cannot yield from helper!
yield 'step 2';
}
// SOLUTION: Fiber::suspend() works at ANY call depth
function deepHelper(): void
{
// Works perfectly — suspends the enclosing Fiber
Fiber::suspend('from depth 3');
}
function middleware(): void
{
deepHelper();
}
function service(): void
{
middleware();
}
$fiber = new Fiber(function (): void {
service(); // suspend() at depth 4 — no problem
});
$result = $fiber->start();
echo $result; // 'from depth 3'
Yield from — частичное решение для Generator
<?php
declare(strict_types=1);
// Generators can delegate with "yield from", but it's limited
function subGenerator(): Generator
{
yield 'sub-1';
yield 'sub-2';
}
function mainGenerator(): Generator
{
yield 'main-1';
yield from subGenerator(); // Delegation
yield 'main-3';
}
foreach (mainGenerator() as $value) {
echo "{$value}\n"; // main-1, sub-1, sub-2, main-3
}
// BUT: yield from requires the callee to be a Generator too
// You can't yield from a regular function
// Fiber::suspend() has no such limitation
| Аспект | Generator | Fiber |
|---|---|---|
| Приостановка из вложенных вызовов | Только через yield from |
Из любой глубины стека |
Реализует Iterator |
Да | Нет |
| Возврат значения | getReturn() после завершения |
getReturn() после завершения |
| Двусторонний обмен | yield/send() |
suspend()/resume() |
| Исключения | throw() |
throw() |
| Основное применение | Ленивые коллекции, итерация | Async I/O, кооперативная многозадачность |
| Стек вызовов | Нет своего стека | Полноценный собственный стек |
| Память | Легче (только frame) | Тяжелее (полный стек, ~8KB по умолчанию) |
Построение Scheduler
Scheduler (планировщик) — ключевой паттерн, который превращает Fibers из низкоуровневого примитива в полезный инструмент для конкурентности.
Простой round-robin scheduler
<?php
declare(strict_types=1);
/**
* Round-robin fiber scheduler.
* Executes fibers in turns: each fiber runs until it suspends,
* then the next fiber gets a turn.
*/
final class Scheduler
{
/** @var SplQueue<Fiber> */
private SplQueue $queue;
/** @var array<string, mixed> */
private array $results = [];
private int $taskIdCounter = 0;
public function __construct()
{
$this->queue = new SplQueue();
}
/**
* Schedule a new task and return its ID.
*/
public function schedule(Closure $task): string
{
$id = 'task-' . (++$this->taskIdCounter);
$fiber = new Fiber($task);
$this->queue->enqueue(['id' => $id, 'fiber' => $fiber]);
return $id;
}
/**
* Run all scheduled tasks until completion.
*/
public function run(): void
{
while (!$this->queue->isEmpty()) {
$item = $this->queue->dequeue();
$id = $item['id'];
/** @var Fiber $fiber */
$fiber = $item['fiber'];
try {
if (!$fiber->isStarted()) {
$fiber->start();
} elseif ($fiber->isSuspended()) {
$fiber->resume();
}
// Re-schedule if not terminated
if ($fiber->isSuspended()) {
$this->queue->enqueue($item);
} elseif ($fiber->isTerminated()) {
$this->results[$id] = $fiber->getReturn();
}
} catch (\Throwable $e) {
$this->results[$id] = $e;
echo "Task {$id} failed: {$e->getMessage()}\n";
}
}
}
/**
* Get results of all completed tasks.
*
* @return array<string, mixed>
*/
public function getResults(): array
{
return $this->results;
}
}
// Usage
$scheduler = new Scheduler();
$scheduler->schedule(function (): string {
echo "[A] Step 1\n";
Fiber::suspend();
echo "[A] Step 2\n";
Fiber::suspend();
echo "[A] Step 3\n";
return 'A-done';
});
$scheduler->schedule(function (): string {
echo "[B] Step 1\n";
Fiber::suspend();
echo "[B] Step 2\n";
return 'B-done';
});
$scheduler->schedule(function (): string {
echo "[C] Only step\n";
return 'C-done';
});
$scheduler->run();
// Output:
// [A] Step 1
// [B] Step 1
// [C] Only step
// [A] Step 2
// [B] Step 2
// [A] Step 3
print_r($scheduler->getResults());
// ['task-1' => 'A-done', 'task-2' => 'B-done', 'task-3' => 'C-done']
Scheduler с таймерами
<?php
declare(strict_types=1);
/**
* Scheduler with timer support.
* Tasks can request a delay, and the scheduler will resume them later.
*/
final class TimerScheduler
{
/** @var SplQueue<array{id: string, fiber: Fiber}> */
private SplQueue $ready;
/** @var SplPriorityQueue<array{id: string, fiber: Fiber}, float> */
private SplPriorityQueue $timers;
private int $taskId = 0;
public function __construct()
{
$this->ready = new SplQueue();
$this->timers = new SplPriorityQueue();
// Min-heap: earlier timestamps have higher priority
$this->timers->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
}
public function schedule(Closure $task): string
{
$id = 'task-' . (++$this->taskId);
$this->ready->enqueue([
'id' => $id,
'fiber' => new Fiber($task),
]);
return $id;
}
/**
* Called from within a fiber to request a delay.
*/
public static function delay(float $seconds): void
{
// Suspend with a delay request
Fiber::suspend(['type' => 'delay', 'seconds' => $seconds]);
}
public function run(): void
{
while (!$this->ready->isEmpty() || !$this->timers->isEmpty()) {
// Move ready timers to the ready queue
$now = microtime(true);
while (!$this->timers->isEmpty()) {
$top = $this->timers->top();
$wakeAt = -$top['priority']; // Negate because SplPriorityQueue is max-heap
if ($wakeAt <= $now) {
$this->timers->extract();
$this->ready->enqueue($top['data']);
} else {
break;
}
}
// Process one ready task
if (!$this->ready->isEmpty()) {
$item = $this->ready->dequeue();
$fiber = $item['fiber'];
$result = null;
if (!$fiber->isStarted()) {
$result = $fiber->start();
} elseif ($fiber->isSuspended()) {
$result = $fiber->resume();
}
if ($fiber->isSuspended()) {
// Check if task requested a delay
if (is_array($result) && ($result['type'] ?? '') === 'delay') {
$wakeAt = microtime(true) + $result['seconds'];
// Negate for min-heap behavior
$this->timers->insert($item, -$wakeAt);
} else {
$this->ready->enqueue($item);
}
}
} else {
// No ready tasks — sleep until next timer
if (!$this->timers->isEmpty()) {
$nextWake = -$this->timers->top()['priority'];
$sleepUs = (int) max(0, ($nextWake - microtime(true)) * 1_000_000);
usleep($sleepUs);
}
}
}
}
}
// Usage
$scheduler = new TimerScheduler();
$scheduler->schedule(function (): void {
echo "[Fast] Start\n";
TimerScheduler::delay(0.1); // 100ms delay
echo "[Fast] Done\n";
});
$scheduler->schedule(function (): void {
echo "[Slow] Start\n";
TimerScheduler::delay(0.3); // 300ms delay
echo "[Slow] Done\n";
});
$scheduler->schedule(function (): void {
echo "[Instant] No delay\n";
});
$scheduler->run();
// Output:
// [Fast] Start
// [Slow] Start
// [Instant] No delay
// [Fast] Done (after ~100ms)
// [Slow] Done (after ~300ms)
Обработка ошибок в Fiber
Распространение исключений
<?php
declare(strict_types=1);
// Exception thrown inside fiber propagates to the caller
$fiber = new Fiber(function (): void {
echo "Before exception\n";
throw new RuntimeException('Something went wrong');
echo "This never executes\n";
});
try {
$fiber->start();
} catch (RuntimeException $e) {
echo "Caught: {$e->getMessage()}\n";
// Fiber is now terminated
var_dump($fiber->isTerminated()); // true
}
Инъекция исключений через throw()
<?php
declare(strict_types=1);
// Inject an exception at the point where fiber is suspended
$fiber = new Fiber(function (): string {
try {
echo "Waiting for data...\n";
$data = Fiber::suspend('waiting');
return "Processed: {$data}";
} catch (TimeoutException $e) {
echo "Timeout! Cleaning up...\n";
return 'default-value';
}
});
// Custom exception
class TimeoutException extends RuntimeException {}
$status = $fiber->start();
echo "Fiber says: {$status}\n"; // 'waiting'
// Simulate a timeout — inject exception instead of data
$fiber->throw(new TimeoutException('Request timed out'));
echo "Result: {$fiber->getReturn()}\n";
// Output:
// Waiting for data...
// Fiber says: waiting
// Timeout! Cleaning up...
// Result: default-value
FiberError — полный каталог
<?php
declare(strict_types=1);
$fiber = new Fiber(function (): void {
Fiber::suspend();
});
// 1. Resume before start
try {
$fiber->resume();
} catch (FiberError $e) {
echo "Error: {$e->getMessage()}\n";
// "Cannot resume a fiber that is not suspended"
}
// 2. Double start
$fiber->start();
try {
$fiber->start();
} catch (FiberError $e) {
echo "Error: {$e->getMessage()}\n";
// "Cannot start a fiber that is not in the created state"
}
// 3. getReturn before termination
try {
$fiber->getReturn();
} catch (FiberError $e) {
echo "Error: {$e->getMessage()}\n";
// "Cannot get return value of a fiber that hasn't terminated"
}
// 4. Resume terminated fiber
$fiber->resume(); // Fiber terminates
try {
$fiber->resume();
} catch (FiberError $e) {
echo "Error: {$e->getMessage()}\n";
// "Cannot resume a terminated fiber"
}
// 5. Suspend outside fiber
try {
Fiber::suspend();
} catch (FiberError $e) {
echo "Error: {$e->getMessage()}\n";
// "Cannot suspend outside of a fiber"
}
Управление памятью
Каждый Fiber выделяет собственный стек вызовов. По умолчанию размер стека определяется PHP (обычно ~8KB, растёт по необходимости). Это важно при создании большого количества Fibers.
<?php
declare(strict_types=1);
// Memory impact of creating many fibers
$memBefore = memory_get_usage();
$fibers = [];
for ($i = 0; $i < 10_000; $i++) {
$fibers[] = new Fiber(function (): void {
Fiber::suspend();
});
}
$memAfter = memory_get_usage();
$perFiber = ($memAfter - $memBefore) / 10_000;
echo "Memory per fiber: " . round($perFiber) . " bytes\n";
// Typically ~1-2 KB per unstarted fiber
// Start all — now each has its own stack
$memBefore = memory_get_usage();
foreach ($fibers as $fiber) {
$fiber->start();
}
$memAfterStart = memory_get_usage();
$perStartedFiber = ($memAfterStart - $memBefore) / 10_000;
echo "Memory per started fiber: " . round($perStartedFiber) . " bytes\n";
// Typically ~8-16 KB per started fiber (stack allocated)
// Cleanup
foreach ($fibers as $fiber) {
$fiber->resume();
}
unset($fibers);
gc_collect_cycles();
Практическое правило: Не создавайте миллионы Fibers одновременно. Для 10 000 конкурентных задач Fibers подходят отлично. Для миллиона задач используйте очередь + пул из нескольких сотен Fibers.
Практические примеры
Конкурентный HTTP-клиент (симуляция)
<?php
declare(strict_types=1);
/**
* Simulates concurrent HTTP requests using fibers.
* In production, use AMPHP or ReactPHP for real non-blocking I/O.
*/
final class ConcurrentHttpClient
{
/**
* @param array<string, string> $urls Map of name => URL
* @return array<string, string> Map of name => response body
*/
public function fetchAll(array $urls): array
{
$fibers = [];
$results = [];
// Create a fiber for each URL
foreach ($urls as $name => $url) {
$fibers[$name] = new Fiber(function () use ($url): string {
// Simulate: initiate request
$context = stream_context_create([
'http' => ['timeout' => 5],
]);
// In real async code, this would be non-blocking
Fiber::suspend('initiated');
// Simulate: read response
$body = @file_get_contents($url, false, $context);
if ($body === false) {
throw new RuntimeException("Failed to fetch: {$url}");
}
return $body;
});
}
// Start all fibers (initiate all requests)
foreach ($fibers as $name => $fiber) {
$fiber->start();
}
// Resume all fibers (read all responses)
foreach ($fibers as $name => $fiber) {
try {
$fiber->resume();
$results[$name] = $fiber->getReturn();
} catch (\Throwable $e) {
$results[$name] = "ERROR: {$e->getMessage()}";
}
}
return $results;
}
}
Coroutine-based Task Runner
<?php
declare(strict_types=1);
/**
* Task that can yield control and communicate with the runner.
*/
enum TaskCommand
{
case Continue;
case Sleep;
case WaitForInput;
}
/**
* Advanced task runner with command support.
*/
final class TaskRunner
{
/** @var array<string, Fiber> */
private array $tasks = [];
/** @var array<string, mixed> */
private array $results = [];
/** @var array<string, \Throwable> */
private array $errors = [];
public function add(string $name, Closure $callable): void
{
$this->tasks[$name] = new Fiber($callable);
}
/**
* Execute all tasks with cooperative scheduling.
*
* @return array{results: array<string, mixed>, errors: array<string, \Throwable>}
*/
public function execute(): array
{
$active = $this->tasks;
while (!empty($active)) {
foreach ($active as $name => $fiber) {
try {
if (!$fiber->isStarted()) {
$command = $fiber->start();
} elseif ($fiber->isSuspended()) {
$command = $fiber->resume(microtime(true));
} else {
continue;
}
if ($fiber->isTerminated()) {
$this->results[$name] = $fiber->getReturn();
unset($active[$name]);
}
} catch (\Throwable $e) {
$this->errors[$name] = $e;
unset($active[$name]);
}
}
}
return [
'results' => $this->results,
'errors' => $this->errors,
];
}
}
// Usage
$runner = new TaskRunner();
$runner->add('counter', function (): int {
$sum = 0;
for ($i = 0; $i < 5; $i++) {
$sum += $i;
Fiber::suspend();
}
return $sum;
});
$runner->add('accumulator', function (): string {
$parts = [];
for ($i = 0; $i < 3; $i++) {
$timestamp = Fiber::suspend();
$parts[] = number_format((float) $timestamp, 4);
}
return implode(', ', $parts);
});
$result = $runner->execute();
echo "Counter: {$result['results']['counter']}\n"; // 10
echo "Accumulator: {$result['results']['accumulator']}\n"; // timestamps
Fiber-based Pipeline
<?php
declare(strict_types=1);
/**
* Pipeline where each stage is a Fiber.
* Data flows: Stage 1 → Stage 2 → Stage 3.
*/
final class FiberPipeline
{
/** @var array<Fiber> */
private array $stages = [];
public function addStage(Closure $stage): self
{
$this->stages[] = new Fiber($stage);
return $this;
}
/**
* Process input through all stages.
*/
public function process(mixed $input): mixed
{
$data = $input;
foreach ($this->stages as $fiber) {
$fiber->start($data);
if ($fiber->isTerminated()) {
$data = $fiber->getReturn();
} else {
throw new LogicException(
'Pipeline stages must not suspend'
);
}
}
return $data;
}
}
$pipeline = new FiberPipeline();
$pipeline
->addStage(function (string $text): string {
// Stage 1: Normalize whitespace
return preg_replace('/\s+/', ' ', trim($text));
})
->addStage(function (string $text): string {
// Stage 2: Convert to lowercase
return mb_strtolower($text);
})
->addStage(function (string $text): array {
// Stage 3: Tokenize
return explode(' ', $text);
});
$result = $pipeline->process(" Hello World PHP ");
print_r($result); // ['hello', 'world', 'php']
Fiber как основа для Async
Все современные async-фреймворки PHP (AMPHP v3, Revolt) используют Fiber под капотом. Суть: библиотека создаёт Fiber для каждой «задачи», а event loop управляет их приостановкой и возобновлением в зависимости от готовности I/O.
<?php
declare(strict_types=1);
/**
* Simplified illustration of how async frameworks use Fibers.
* This is NOT production code — it shows the concept.
*/
final class SimpleEventLoop
{
/** @var array<array{fiber: Fiber, event: string, resource: mixed}> */
private array $waiting = [];
/** @var SplQueue<Fiber> */
private SplQueue $ready;
public function __construct()
{
$this->ready = new SplQueue();
}
/**
* Run a callback as a concurrent task.
*/
public function async(Closure $callback): void
{
$this->ready->enqueue(new Fiber($callback));
}
/**
* Non-blocking sleep (called from within a fiber).
*/
public function sleep(float $seconds): void
{
$fiber = $this->getCurrentFiber();
$this->waiting[] = [
'fiber' => $fiber,
'event' => 'timer',
'resource' => microtime(true) + $seconds,
];
Fiber::suspend();
}
/**
* Main event loop — run until all tasks complete.
*/
public function run(): void
{
while (!$this->ready->isEmpty() || !empty($this->waiting)) {
// Check timers
$now = microtime(true);
foreach ($this->waiting as $key => $entry) {
if ($entry['event'] === 'timer' && $entry['resource'] <= $now) {
$this->ready->enqueue($entry['fiber']);
unset($this->waiting[$key]);
}
}
$this->waiting = array_values($this->waiting);
// Process one ready fiber
if (!$this->ready->isEmpty()) {
$fiber = $this->ready->dequeue();
if (!$fiber->isStarted()) {
$fiber->start($this);
} elseif ($fiber->isSuspended()) {
$fiber->resume();
}
} else {
usleep(1000); // Prevent busy-waiting
}
}
}
private function getCurrentFiber(): Fiber
{
// In real frameworks, current fiber is tracked internally
// This is a simplification
return Fiber::getCurrent()
?? throw new LogicException('Not inside a fiber');
}
}
// Note: Fiber::getCurrent() is not a real PHP method.
// Real frameworks track the current fiber themselves.
// This example illustrates the concept.
Запомни: Fiber сам по себе — примитив. Без event loop он мало полезен. Для реального async используйте Revolt + AMPHP или ReactPHP. Они предоставляют event loop, который автоматически приостанавливает и возобновляет Fibers при готовности I/O-операций.