Mid💻Практика14 min

PostgreSQL

MVCC, JSONB, индексы (B-tree, GIN, GiST), CTE, window functions -- глубокое погружение с PHP примерами через PDO

PostgreSQL

Почему PostgreSQL

PostgreSQL -- самая продвинутая РСУБД с открытым исходным кодом. Сочетает строгость реляционной модели с расширяемостью: JSONB, полнотекстовый поиск, geo-запросы, расширения (TimescaleDB, pgvector).

MVCC (Multi-Version Concurrency Control)

PostgreSQL реализует конкурентный доступ через MVCC: каждая транзакция видит свой "снимок" данных. Читатели не блокируют писателей и наоборот.

Как работает MVCC

Каждая строка имеет скрытые столбцы:

  • xmin -- ID транзакции, которая создала строку
  • xmax -- ID транзакции, которая удалила/обновила строку
UPDATE users SET name = 'John' WHERE id = 1;

Было:   (xmin=100, xmax=0)   name='Jack'   [видна всем]
Стало:  (xmin=100, xmax=200) name='Jack'   [мёртвая строка]
        (xmin=200, xmax=0)   name='John'   [новая версия]

VACUUM необходим для очистки мёртвых строк. Автоматически: autovacuum. Если autovacuum не справляется -- таблица "пухнет" (table bloat).

PHPGo
<?php

declare(strict_types=1);

/**
 * Understanding MVCC behavior through PHP
 */
final class MvccDemo
{
    public function __construct(
        private readonly \PDO $db,
    ) {}

    /**
     * Transaction isolation levels affect what MVCC snapshot you see
     */
    public function demonstrateIsolation(): void
    {
        // READ COMMITTED (default): sees committed changes between statements
        $this->db->exec("SET TRANSACTION ISOLATION LEVEL READ COMMITTED");

        // REPEATABLE READ: sees snapshot from transaction start
        $this->db->exec("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");

        // SERIALIZABLE: full isolation, may fail with serialization error
        $this->db->exec("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
    }

    /**
     * Advisory locks for application-level locking (not row-level)
     */
    public function processWithLock(string $resourceId, callable $callback): mixed
    {
        $lockId = crc32($resourceId);

        // Try to acquire advisory lock (non-blocking)
        $stmt = $this->db->prepare('SELECT pg_try_advisory_lock(:lock_id)');
        $stmt->execute(['lock_id' => $lockId]);
        $acquired = $stmt->fetchColumn();

        if (!$acquired) {
            throw new \RuntimeException("Resource {$resourceId} is locked by another process");
        }

        try {
            return $callback();
        } finally {
            $this->db->exec("SELECT pg_advisory_unlock({$lockId})");
        }
    }
}
package postgres

import (
	"context"
	"database/sql"
	"fmt"
	"hash/crc32"
)

// MvccDemo demonstrates MVCC behavior and advisory locks.
type MvccDemo struct {
	db *sql.DB
}

// ProcessWithLock acquires a PostgreSQL advisory lock before executing fn.
func (m *MvccDemo) ProcessWithLock(ctx context.Context, resourceID string, fn func() error) error {
	lockID := int64(crc32.ChecksumIEEE([]byte(resourceID)))

	var acquired bool
	err := m.db.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, lockID).Scan(&acquired)
	if err != nil {
		return fmt.Errorf("try advisory lock: %w", err)
	}
	if !acquired {
		return fmt.Errorf("resource %s is locked by another process", resourceID)
	}
	defer m.db.ExecContext(ctx, `SELECT pg_advisory_unlock($1)`, lockID)

	return fn()
}
## JSONB

PostgreSQL JSONB -- двоичный формат JSON с полной индексацией и операторами запросов.

PHPGo
<?php

declare(strict_types=1);

/**
 * JSONB: flexible schema within PostgreSQL
 */
final class ProductCatalog
{
    public function __construct(
        private readonly \PDO $db,
    ) {}

    public function createTable(): void
    {
        $this->db->exec(<<<SQL
            CREATE TABLE IF NOT EXISTS products (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                name TEXT NOT NULL,
                category TEXT NOT NULL,
                attributes JSONB NOT NULL DEFAULT '{}',
                tags TEXT[] DEFAULT '{}',
                created_at TIMESTAMPTZ DEFAULT NOW()
            );

            -- GIN index on JSONB for containment queries
            CREATE INDEX IF NOT EXISTS idx_products_attrs ON products USING GIN (attributes);

            -- GIN index on array column
            CREATE INDEX IF NOT EXISTS idx_products_tags ON products USING GIN (tags);
        SQL);
    }

    /**
     * Store product with flexible attributes
     */
    public function create(string $name, string $category, array $attributes): string
    {
        $stmt = $this->db->prepare(<<<SQL
            INSERT INTO products (name, category, attributes, tags)
            VALUES (:name, :category, :attributes, :tags)
            RETURNING id
        SQL);

        $stmt->execute([
            'name' => $name,
            'category' => $category,
            'attributes' => json_encode($attributes),
            'tags' => '{' . implode(',', $attributes['tags'] ?? []) . '}',
        ]);

        return $stmt->fetchColumn();
    }

    /**
     * Query by JSONB attributes
     */
    public function findByAttributes(array $criteria): array
    {
        // @> containment operator: find products where attributes contain criteria
        $stmt = $this->db->prepare(<<<SQL
            SELECT id, name, category, attributes
            FROM products
            WHERE attributes @> :criteria
        SQL);

        $stmt->execute(['criteria' => json_encode($criteria)]);
        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    /**
     * Query specific JSONB fields
     */
    public function findByBrand(string $brand, float $minPrice): array
    {
        $stmt = $this->db->prepare(<<<SQL
            SELECT
                id,
                name,
                attributes->>'brand' as brand,
                (attributes->>'price')::DECIMAL as price,
                attributes->'specs' as specs
            FROM products
            WHERE attributes->>'brand' = :brand
              AND (attributes->>'price')::DECIMAL >= :min_price
            ORDER BY (attributes->>'price')::DECIMAL
        SQL);

        $stmt->execute(['brand' => $brand, 'min_price' => $minPrice]);
        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    /**
     * Update nested JSONB
     */
    public function updateAttribute(string $id, string $path, mixed $value): void
    {
        $stmt = $this->db->prepare(<<<SQL
            UPDATE products
            SET attributes = jsonb_set(attributes, :path, :value::jsonb)
            WHERE id = :id
        SQL);

        $stmt->execute([
            'id' => $id,
            'path' => $path, // e.g., '{specs,weight}'
            'value' => json_encode($value),
        ]);
    }
}
package postgres

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
)

// ProductCatalog manages products with flexible JSONB attributes.
type ProductCatalog struct {
	db *sql.DB
}

func NewProductCatalog(db *sql.DB) *ProductCatalog {
	return &ProductCatalog{db: db}
}

// Create stores a product with flexible attributes, returns its ID.
func (c *ProductCatalog) Create(ctx context.Context, name, category string, attrs map[string]any) (string, error) {
	attrsJSON, _ := json.Marshal(attrs)
	var id string
	err := c.db.QueryRowContext(ctx,
		`INSERT INTO products (name, category, attributes) VALUES ($1, $2, $3) RETURNING id`,
		name, category, string(attrsJSON),
	).Scan(&id)
	if err != nil {
		return "", fmt.Errorf("insert product: %w", err)
	}
	return id, nil
}

// FindByAttributes uses the @> containment operator on JSONB.
func (c *ProductCatalog) FindByAttributes(ctx context.Context, criteria map[string]any) ([]map[string]any, error) {
	criteriaJSON, _ := json.Marshal(criteria)
	rows, err := c.db.QueryContext(ctx,
		`SELECT id, name, category, attributes FROM products WHERE attributes @> $1`,
		string(criteriaJSON))
	if err != nil {
		return nil, fmt.Errorf("find by attrs: %w", err)
	}
	defer rows.Close()

	var results []map[string]any
	for rows.Next() {
		var id, name, category, attrs string
		rows.Scan(&id, &name, &category, &attrs)
		results = append(results, map[string]any{
			"id": id, "name": name, "category": category, "attributes": attrs,
		})
	}
	return results, rows.Err()
}

// FindByBrand queries specific JSONB fields.
func (c *ProductCatalog) FindByBrand(ctx context.Context, brand string, minPrice float64) ([]map[string]any, error) {
	rows, err := c.db.QueryContext(ctx, `
		SELECT id, name, attributes->>'brand' as brand,
		       (attributes->>'price')::DECIMAL as price
		FROM products
		WHERE attributes->>'brand' = $1
		  AND (attributes->>'price')::DECIMAL >= $2
		ORDER BY (attributes->>'price')::DECIMAL`, brand, minPrice)
	if err != nil {
		return nil, fmt.Errorf("find by brand: %w", err)
	}
	defer rows.Close()

	var results []map[string]any
	for rows.Next() {
		var id, name, b string
		var price float64
		rows.Scan(&id, &name, &b, &price)
		results = append(results, map[string]any{"id": id, "name": name, "brand": b, "price": price})
	}
	return results, rows.Err()
}

// UpdateAttribute updates a nested JSONB field.
func (c *ProductCatalog) UpdateAttribute(ctx context.Context, id, path string, value any) error {
	valJSON, _ := json.Marshal(value)
	_, err := c.db.ExecContext(ctx,
		`UPDATE products SET attributes = jsonb_set(attributes, $1, $2::jsonb) WHERE id = $3`,
		path, string(valJSON), id)
	return err
}
## Индексы

Типы индексов

Тип Когда использовать Операторы
B-tree Сравнение, сортировка (по умолчанию) =, <, >, <=, >=, BETWEEN
Hash Только равенство =
GIN JSONB, массивы, полнотекстовый поиск @>, @@, ?
GiST Геоданные, диапазоны, полнотекстовый поиск &&, @>, <->
BRIN Большие таблицы с коррелированными данными <, >, =
PHPGo
<?php

declare(strict_types=1);

/**
 * Index strategies for PostgreSQL
 */
final class IndexExamples
{
    public function __construct(
        private readonly \PDO $db,
    ) {}

    public function createIndexes(): void
    {
        // B-tree: default, for equality and range queries
        $this->db->exec(<<<SQL
            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_user_id
                ON orders (user_id);

            -- Composite index: order matters!
            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_user_status
                ON orders (user_id, status);

            -- Partial index: only index active records
            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_active
                ON orders (created_at)
                WHERE status = 'active';

            -- Covering index: includes extra columns to avoid heap lookup
            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_covering
                ON orders (user_id) INCLUDE (status, total_amount);
        SQL);

        // GIN for full-text search
        $this->db->exec(<<<SQL
            ALTER TABLE products ADD COLUMN IF NOT EXISTS
                search_vector tsvector
                GENERATED ALWAYS AS (
                    setweight(to_tsvector('russian', coalesce(name, '')), 'A') ||
                    setweight(to_tsvector('russian', coalesce(description, '')), 'B')
                ) STORED;

            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_products_search
                ON products USING GIN (search_vector);
        SQL);

        // GiST for geographic data
        $this->db->exec(<<<SQL
            CREATE EXTENSION IF NOT EXISTS postgis;

            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_stores_location
                ON stores USING GIST (location);
        SQL);

        // BRIN for time-series data (very small index, good for append-only)
        $this->db->exec(<<<SQL
            CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_logs_time
                ON application_logs USING BRIN (created_at);
        SQL);
    }

    /**
     * Full-text search with ranking
     */
    public function search(string $query): array
    {
        $stmt = $this->db->prepare(<<<SQL
            SELECT
                id, name, description,
                ts_rank_cd(search_vector, websearch_to_tsquery('russian', :query)) as rank
            FROM products
            WHERE search_vector @@ websearch_to_tsquery('russian', :query)
            ORDER BY rank DESC
            LIMIT 20
        SQL);

        $stmt->execute(['query' => $query]);
        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    /**
     * EXPLAIN ANALYZE: always check your query plan
     */
    public function explainQuery(string $sql, array $params = []): string
    {
        $stmt = $this->db->prepare("EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) " . $sql);
        $stmt->execute($params);

        return implode("\n", $stmt->fetchAll(\PDO::FETCH_COLUMN));
    }
}
package postgres

import (
	"context"
	"database/sql"
	"fmt"
	"strings"
)

// IndexExamples demonstrates PostgreSQL index strategies.
type IndexExamples struct {
	db *sql.DB
}

// Search performs full-text search with ranking.
func (e *IndexExamples) Search(ctx context.Context, query string) ([]map[string]any, error) {
	rows, err := e.db.QueryContext(ctx, `
		SELECT id, name, description,
		       ts_rank_cd(search_vector, websearch_to_tsquery('russian', $1)) as rank
		FROM products
		WHERE search_vector @@ websearch_to_tsquery('russian', $1)
		ORDER BY rank DESC LIMIT 20`, query)
	if err != nil {
		return nil, fmt.Errorf("search: %w", err)
	}
	defer rows.Close()

	var results []map[string]any
	for rows.Next() {
		var id, name, desc string
		var rank float64
		rows.Scan(&id, &name, &desc, &rank)
		results = append(results, map[string]any{
			"id": id, "name": name, "description": desc, "rank": rank,
		})
	}
	return results, rows.Err()
}

// ExplainQuery returns the execution plan for a SQL query.
func (e *IndexExamples) ExplainQuery(ctx context.Context, query string) (string, error) {
	rows, err := e.db.QueryContext(ctx, "EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) "+query)
	if err != nil {
		return "", fmt.Errorf("explain: %w", err)
	}
	defer rows.Close()

	var lines []string
	for rows.Next() {
		var line string
		rows.Scan(&line)
		lines = append(lines, line)
	}
	return strings.Join(lines, "\n"), rows.Err()
}
## CTE (Common Table Expressions)
PHPGo
<?php

declare(strict_types=1);

/**
 * CTE: readable complex queries
 */
final class CteExamples
{
    public function __construct(
        private readonly \PDO $db,
    ) {}

    /**
     * Recursive CTE: category tree
     */
    public function getCategoryTree(int $rootId): array
    {
        $stmt = $this->db->prepare(<<<SQL
            WITH RECURSIVE category_tree AS (
                -- Base case: root category
                SELECT id, name, parent_id, 0 as depth, ARRAY[id] as path
                FROM categories
                WHERE id = :root_id

                UNION ALL

                -- Recursive case: children
                SELECT c.id, c.name, c.parent_id, ct.depth + 1, ct.path || c.id
                FROM categories c
                JOIN category_tree ct ON c.parent_id = ct.id
                WHERE ct.depth < 10  -- Safety limit
            )
            SELECT * FROM category_tree ORDER BY path
        SQL);

        $stmt->execute(['root_id' => $rootId]);
        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }

    /**
     * CTE for step-by-step data processing
     */
    public function getCustomerAnalytics(): array
    {
        return $this->db->query(<<<SQL
            WITH customer_orders AS (
                SELECT
                    user_id,
                    COUNT(*) as order_count,
                    SUM(total_amount) as total_spent,
                    MAX(created_at) as last_order
                FROM orders
                WHERE status = 'completed'
                GROUP BY user_id
            ),
            customer_segments AS (
                SELECT
                    user_id,
                    order_count,
                    total_spent,
                    last_order,
                    CASE
                        WHEN total_spent > 10000 THEN 'vip'
                        WHEN total_spent > 1000 THEN 'regular'
                        ELSE 'new'
                    END as segment
                FROM customer_orders
            )
            SELECT
                segment,
                COUNT(*) as customer_count,
                AVG(order_count)::INT as avg_orders,
                AVG(total_spent)::DECIMAL(10,2) as avg_spent
            FROM customer_segments
            GROUP BY segment
            ORDER BY avg_spent DESC
        SQL)->fetchAll(\PDO::FETCH_ASSOC);
    }
}
package postgres

import (
	"context"
	"database/sql"
	"fmt"
)

// CteExamples demonstrates CTEs for readable complex queries.
type CteExamples struct {
	db *sql.DB
}

// GetCategoryTree uses a recursive CTE to build a category hierarchy.
func (c *CteExamples) GetCategoryTree(ctx context.Context, rootID int) ([]map[string]any, error) {
	rows, err := c.db.QueryContext(ctx, `
		WITH RECURSIVE category_tree AS (
			SELECT id, name, parent_id, 0 as depth, ARRAY[id] as path
			FROM categories WHERE id = $1
			UNION ALL
			SELECT c.id, c.name, c.parent_id, ct.depth + 1, ct.path || c.id
			FROM categories c
			JOIN category_tree ct ON c.parent_id = ct.id
			WHERE ct.depth < 10
		)
		SELECT id, name, depth FROM category_tree ORDER BY path`, rootID)
	if err != nil {
		return nil, fmt.Errorf("category tree: %w", err)
	}
	defer rows.Close()

	var results []map[string]any
	for rows.Next() {
		var id, name string
		var depth int
		rows.Scan(&id, &name, &depth)
		results = append(results, map[string]any{"id": id, "name": name, "depth": depth})
	}
	return results, rows.Err()
}

// GetCustomerAnalytics uses a CTE pipeline for step-by-step processing.
func (c *CteExamples) GetCustomerAnalytics(ctx context.Context) ([]map[string]any, error) {
	rows, err := c.db.QueryContext(ctx, `
		WITH customer_orders AS (
			SELECT user_id, COUNT(*) as order_count, SUM(total_amount) as total_spent
			FROM orders WHERE status = 'completed' GROUP BY user_id
		),
		customer_segments AS (
			SELECT *, CASE
				WHEN total_spent > 10000 THEN 'vip'
				WHEN total_spent > 1000 THEN 'regular'
				ELSE 'new' END as segment
			FROM customer_orders
		)
		SELECT segment, COUNT(*) as customer_count,
		       AVG(order_count)::INT as avg_orders,
		       AVG(total_spent)::DECIMAL(10,2) as avg_spent
		FROM customer_segments GROUP BY segment ORDER BY avg_spent DESC`)
	if err != nil {
		return nil, fmt.Errorf("analytics: %w", err)
	}
	defer rows.Close()

	var results []map[string]any
	for rows.Next() {
		var segment string
		var count, avgOrders int
		var avgSpent float64
		rows.Scan(&segment, &count, &avgOrders, &avgSpent)
		results = append(results, map[string]any{
			"segment": segment, "customer_count": count,
			"avg_orders": avgOrders, "avg_spent": avgSpent,
		})
	}
	return results, rows.Err()
}
## Window Functions
PHPGo
<?php

declare(strict_types=1);

/**
 * Window functions: analytics without GROUP BY
 */
final class WindowFunctionExamples
{
    public function __construct(
        private readonly \PDO $db,
    ) {}

    /**
     * Running total, rank, moving average
     */
    public function getSalesAnalytics(): array
    {
        return $this->db->query(<<<SQL
            SELECT
                order_date,
                daily_revenue,
                -- Running total
                SUM(daily_revenue) OVER (ORDER BY order_date) as cumulative_revenue,
                -- 7-day moving average
                AVG(daily_revenue) OVER (
                    ORDER BY order_date
                    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
                ) as moving_avg_7d,
                -- Rank by revenue
                RANK() OVER (ORDER BY daily_revenue DESC) as revenue_rank,
                -- Percent of total
                daily_revenue / SUM(daily_revenue) OVER () * 100 as pct_of_total
            FROM (
                SELECT
                    DATE(created_at) as order_date,
                    SUM(total_amount) as daily_revenue
                FROM orders
                WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
                GROUP BY DATE(created_at)
            ) daily
            ORDER BY order_date
        SQL)->fetchAll(\PDO::FETCH_ASSOC);
    }

    /**
     * Keyset (cursor) pagination: faster than OFFSET
     */
    public function getOrdersPage(?string $lastId, int $limit = 20): array
    {
        if ($lastId === null) {
            $stmt = $this->db->prepare(<<<SQL
                SELECT id, user_id, total_amount, created_at
                FROM orders
                ORDER BY created_at DESC, id DESC
                LIMIT :limit
            SQL);
            $stmt->execute(['limit' => $limit]);
        } else {
            $stmt = $this->db->prepare(<<<SQL
                SELECT id, user_id, total_amount, created_at
                FROM orders
                WHERE (created_at, id) < (
                    SELECT created_at, id FROM orders WHERE id = :last_id
                )
                ORDER BY created_at DESC, id DESC
                LIMIT :limit
            SQL);
            $stmt->execute(['last_id' => $lastId, 'limit' => $limit]);
        }

        return $stmt->fetchAll(\PDO::FETCH_ASSOC);
    }
}
package postgres

import (
	"context"
	"database/sql"
	"fmt"
)

// WindowFunctionExamples demonstrates window functions and keyset pagination.
type WindowFunctionExamples struct {
	db *sql.DB
}

// GetSalesAnalytics returns running totals, moving averages, and ranks.
func (w *WindowFunctionExamples) GetSalesAnalytics(ctx context.Context) ([]map[string]any, error) {
	rows, err := w.db.QueryContext(ctx, `
		SELECT order_date, daily_revenue,
		       SUM(daily_revenue) OVER (ORDER BY order_date) as cumulative_revenue,
		       AVG(daily_revenue) OVER (ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as moving_avg_7d,
		       RANK() OVER (ORDER BY daily_revenue DESC) as revenue_rank
		FROM (
			SELECT DATE(created_at) as order_date, SUM(total_amount) as daily_revenue
			FROM orders WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
			GROUP BY DATE(created_at)
		) daily ORDER BY order_date`)
	if err != nil {
		return nil, fmt.Errorf("sales analytics: %w", err)
	}
	defer rows.Close()

	var results []map[string]any
	for rows.Next() {
		var date string
		var revenue, cumulative, movingAvg float64
		var rank int
		rows.Scan(&date, &revenue, &cumulative, &movingAvg, &rank)
		results = append(results, map[string]any{
			"order_date": date, "daily_revenue": revenue,
			"cumulative": cumulative, "moving_avg_7d": movingAvg, "rank": rank,
		})
	}
	return results, rows.Err()
}

// GetOrdersPage implements keyset (cursor) pagination -- faster than OFFSET.
func (w *WindowFunctionExamples) GetOrdersPage(ctx context.Context, lastID *string, limit int) ([]map[string]any, error) {
	var rows *sql.Rows
	var err error
	if lastID == nil {
		rows, err = w.db.QueryContext(ctx,
			`SELECT id, user_id, total_amount, created_at FROM orders
			 ORDER BY created_at DESC, id DESC LIMIT $1`, limit)
	} else {
		rows, err = w.db.QueryContext(ctx,
			`SELECT id, user_id, total_amount, created_at FROM orders
			 WHERE (created_at, id) < (SELECT created_at, id FROM orders WHERE id = $1)
			 ORDER BY created_at DESC, id DESC LIMIT $2`, *lastID, limit)
	}
	if err != nil {
		return nil, fmt.Errorf("get orders page: %w", err)
	}
	defer rows.Close()

	var results []map[string]any
	for rows.Next() {
		var id, userID, createdAt string
		var total float64
		rows.Scan(&id, &userID, &total, &createdAt)
		results = append(results, map[string]any{
			"id": id, "user_id": userID, "total_amount": total, "created_at": createdAt,
		})
	}
	return results, rows.Err()
}
## PHP: работа через PDO
<?php

declare(strict_types=1);

/**
 * PostgreSQL connection best practices with PDO
 */
final class PdoConnectionFactory
{
    public static function create(
        string $host,
        int $port,
        string $dbname,
        string $user,
        string $password,
    ): \PDO {
        $dsn = "pgsql:host={$host};port={$port};dbname={$dbname}";

        $pdo = new \PDO($dsn, $user, $password, [
            \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
            \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
            \PDO::ATTR_EMULATE_PREPARES => false, // Use native prepared statements
            \PDO::ATTR_STRINGIFY_FETCHES => false, // Keep native types
        ]);

        // Set session parameters
        $pdo->exec("SET timezone = 'UTC'");
        $pdo->exec("SET statement_timeout = '30s'");

        return $pdo;
    }
}

Best Practice: всегда используйте EXPLAIN ANALYZE для проверки планов запросов. Индексы не помогают, если PostgreSQL выбирает seq scan.

Итоги

  • MVCC обеспечивает конкурентный доступ без блокировок между читателями и писателями
  • JSONB позволяет хранить полуструктурированные данные с индексацией
  • Правильный выбор индекса критичен: B-tree, GIN, GiST, BRIN для разных задач
  • CTE и Window Functions решают сложные аналитические задачи в одном запросе
  • Keyset pagination вместо OFFSET для больших наборов данных