PostgreSQL
Почему PostgreSQL
PostgreSQL -- самая продвинутая РСУБД с открытым исходным кодом. Сочетает строгость реляционной модели с расширяемостью: JSONB, полнотекстовый поиск, geo-запросы, расширения (TimescaleDB, pgvector).
MVCC (Multi-Version Concurrency Control)
PostgreSQL реализует конкурентный доступ через MVCC: каждая транзакция видит свой "снимок" данных. Читатели не блокируют писателей и наоборот.
Как работает MVCC
Каждая строка имеет скрытые столбцы:
xmin-- ID транзакции, которая создала строкуxmax-- ID транзакции, которая удалила/обновила строку
UPDATE users SET name = 'John' WHERE id = 1;
Было: (xmin=100, xmax=0) name='Jack' [видна всем]
Стало: (xmin=100, xmax=200) name='Jack' [мёртвая строка]
(xmin=200, xmax=0) name='John' [новая версия]
VACUUM необходим для очистки мёртвых строк. Автоматически: autovacuum. Если autovacuum не справляется -- таблица "пухнет" (table bloat).
<?php
declare(strict_types=1);
/**
* Understanding MVCC behavior through PHP
*/
final class MvccDemo
{
public function __construct(
private readonly \PDO $db,
) {}
/**
* Transaction isolation levels affect what MVCC snapshot you see
*/
public function demonstrateIsolation(): void
{
// READ COMMITTED (default): sees committed changes between statements
$this->db->exec("SET TRANSACTION ISOLATION LEVEL READ COMMITTED");
// REPEATABLE READ: sees snapshot from transaction start
$this->db->exec("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
// SERIALIZABLE: full isolation, may fail with serialization error
$this->db->exec("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
}
/**
* Advisory locks for application-level locking (not row-level)
*/
public function processWithLock(string $resourceId, callable $callback): mixed
{
$lockId = crc32($resourceId);
// Try to acquire advisory lock (non-blocking)
$stmt = $this->db->prepare('SELECT pg_try_advisory_lock(:lock_id)');
$stmt->execute(['lock_id' => $lockId]);
$acquired = $stmt->fetchColumn();
if (!$acquired) {
throw new \RuntimeException("Resource {$resourceId} is locked by another process");
}
try {
return $callback();
} finally {
$this->db->exec("SELECT pg_advisory_unlock({$lockId})");
}
}
}
package postgres
import (
"context"
"database/sql"
"fmt"
"hash/crc32"
)
// MvccDemo demonstrates MVCC behavior and advisory locks.
type MvccDemo struct {
db *sql.DB
}
// ProcessWithLock acquires a PostgreSQL advisory lock before executing fn.
func (m *MvccDemo) ProcessWithLock(ctx context.Context, resourceID string, fn func() error) error {
lockID := int64(crc32.ChecksumIEEE([]byte(resourceID)))
var acquired bool
err := m.db.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, lockID).Scan(&acquired)
if err != nil {
return fmt.Errorf("try advisory lock: %w", err)
}
if !acquired {
return fmt.Errorf("resource %s is locked by another process", resourceID)
}
defer m.db.ExecContext(ctx, `SELECT pg_advisory_unlock($1)`, lockID)
return fn()
}
PostgreSQL JSONB -- двоичный формат JSON с полной индексацией и операторами запросов.
<?php
declare(strict_types=1);
/**
* JSONB: flexible schema within PostgreSQL
*/
final class ProductCatalog
{
public function __construct(
private readonly \PDO $db,
) {}
public function createTable(): void
{
$this->db->exec(<<<SQL
CREATE TABLE IF NOT EXISTS products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
category TEXT NOT NULL,
attributes JSONB NOT NULL DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- GIN index on JSONB for containment queries
CREATE INDEX IF NOT EXISTS idx_products_attrs ON products USING GIN (attributes);
-- GIN index on array column
CREATE INDEX IF NOT EXISTS idx_products_tags ON products USING GIN (tags);
SQL);
}
/**
* Store product with flexible attributes
*/
public function create(string $name, string $category, array $attributes): string
{
$stmt = $this->db->prepare(<<<SQL
INSERT INTO products (name, category, attributes, tags)
VALUES (:name, :category, :attributes, :tags)
RETURNING id
SQL);
$stmt->execute([
'name' => $name,
'category' => $category,
'attributes' => json_encode($attributes),
'tags' => '{' . implode(',', $attributes['tags'] ?? []) . '}',
]);
return $stmt->fetchColumn();
}
/**
* Query by JSONB attributes
*/
public function findByAttributes(array $criteria): array
{
// @> containment operator: find products where attributes contain criteria
$stmt = $this->db->prepare(<<<SQL
SELECT id, name, category, attributes
FROM products
WHERE attributes @> :criteria
SQL);
$stmt->execute(['criteria' => json_encode($criteria)]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
/**
* Query specific JSONB fields
*/
public function findByBrand(string $brand, float $minPrice): array
{
$stmt = $this->db->prepare(<<<SQL
SELECT
id,
name,
attributes->>'brand' as brand,
(attributes->>'price')::DECIMAL as price,
attributes->'specs' as specs
FROM products
WHERE attributes->>'brand' = :brand
AND (attributes->>'price')::DECIMAL >= :min_price
ORDER BY (attributes->>'price')::DECIMAL
SQL);
$stmt->execute(['brand' => $brand, 'min_price' => $minPrice]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
/**
* Update nested JSONB
*/
public function updateAttribute(string $id, string $path, mixed $value): void
{
$stmt = $this->db->prepare(<<<SQL
UPDATE products
SET attributes = jsonb_set(attributes, :path, :value::jsonb)
WHERE id = :id
SQL);
$stmt->execute([
'id' => $id,
'path' => $path, // e.g., '{specs,weight}'
'value' => json_encode($value),
]);
}
}
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
)
// ProductCatalog manages products with flexible JSONB attributes.
type ProductCatalog struct {
db *sql.DB
}
func NewProductCatalog(db *sql.DB) *ProductCatalog {
return &ProductCatalog{db: db}
}
// Create stores a product with flexible attributes, returns its ID.
func (c *ProductCatalog) Create(ctx context.Context, name, category string, attrs map[string]any) (string, error) {
attrsJSON, _ := json.Marshal(attrs)
var id string
err := c.db.QueryRowContext(ctx,
`INSERT INTO products (name, category, attributes) VALUES ($1, $2, $3) RETURNING id`,
name, category, string(attrsJSON),
).Scan(&id)
if err != nil {
return "", fmt.Errorf("insert product: %w", err)
}
return id, nil
}
// FindByAttributes uses the @> containment operator on JSONB.
func (c *ProductCatalog) FindByAttributes(ctx context.Context, criteria map[string]any) ([]map[string]any, error) {
criteriaJSON, _ := json.Marshal(criteria)
rows, err := c.db.QueryContext(ctx,
`SELECT id, name, category, attributes FROM products WHERE attributes @> $1`,
string(criteriaJSON))
if err != nil {
return nil, fmt.Errorf("find by attrs: %w", err)
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var id, name, category, attrs string
rows.Scan(&id, &name, &category, &attrs)
results = append(results, map[string]any{
"id": id, "name": name, "category": category, "attributes": attrs,
})
}
return results, rows.Err()
}
// FindByBrand queries specific JSONB fields.
func (c *ProductCatalog) FindByBrand(ctx context.Context, brand string, minPrice float64) ([]map[string]any, error) {
rows, err := c.db.QueryContext(ctx, `
SELECT id, name, attributes->>'brand' as brand,
(attributes->>'price')::DECIMAL as price
FROM products
WHERE attributes->>'brand' = $1
AND (attributes->>'price')::DECIMAL >= $2
ORDER BY (attributes->>'price')::DECIMAL`, brand, minPrice)
if err != nil {
return nil, fmt.Errorf("find by brand: %w", err)
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var id, name, b string
var price float64
rows.Scan(&id, &name, &b, &price)
results = append(results, map[string]any{"id": id, "name": name, "brand": b, "price": price})
}
return results, rows.Err()
}
// UpdateAttribute updates a nested JSONB field.
func (c *ProductCatalog) UpdateAttribute(ctx context.Context, id, path string, value any) error {
valJSON, _ := json.Marshal(value)
_, err := c.db.ExecContext(ctx,
`UPDATE products SET attributes = jsonb_set(attributes, $1, $2::jsonb) WHERE id = $3`,
path, string(valJSON), id)
return err
}
Типы индексов
| Тип | Когда использовать | Операторы |
|---|---|---|
| B-tree | Сравнение, сортировка (по умолчанию) | =, <, >, <=, >=, BETWEEN |
| Hash | Только равенство | = |
| GIN | JSONB, массивы, полнотекстовый поиск | @>, @@, ? |
| GiST | Геоданные, диапазоны, полнотекстовый поиск | &&, @>, <-> |
| BRIN | Большие таблицы с коррелированными данными | <, >, = |
<?php
declare(strict_types=1);
/**
* Index strategies for PostgreSQL
*/
final class IndexExamples
{
public function __construct(
private readonly \PDO $db,
) {}
public function createIndexes(): void
{
// B-tree: default, for equality and range queries
$this->db->exec(<<<SQL
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_user_id
ON orders (user_id);
-- Composite index: order matters!
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_user_status
ON orders (user_id, status);
-- Partial index: only index active records
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_active
ON orders (created_at)
WHERE status = 'active';
-- Covering index: includes extra columns to avoid heap lookup
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_covering
ON orders (user_id) INCLUDE (status, total_amount);
SQL);
// GIN for full-text search
$this->db->exec(<<<SQL
ALTER TABLE products ADD COLUMN IF NOT EXISTS
search_vector tsvector
GENERATED ALWAYS AS (
setweight(to_tsvector('russian', coalesce(name, '')), 'A') ||
setweight(to_tsvector('russian', coalesce(description, '')), 'B')
) STORED;
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_products_search
ON products USING GIN (search_vector);
SQL);
// GiST for geographic data
$this->db->exec(<<<SQL
CREATE EXTENSION IF NOT EXISTS postgis;
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_stores_location
ON stores USING GIST (location);
SQL);
// BRIN for time-series data (very small index, good for append-only)
$this->db->exec(<<<SQL
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_logs_time
ON application_logs USING BRIN (created_at);
SQL);
}
/**
* Full-text search with ranking
*/
public function search(string $query): array
{
$stmt = $this->db->prepare(<<<SQL
SELECT
id, name, description,
ts_rank_cd(search_vector, websearch_to_tsquery('russian', :query)) as rank
FROM products
WHERE search_vector @@ websearch_to_tsquery('russian', :query)
ORDER BY rank DESC
LIMIT 20
SQL);
$stmt->execute(['query' => $query]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
/**
* EXPLAIN ANALYZE: always check your query plan
*/
public function explainQuery(string $sql, array $params = []): string
{
$stmt = $this->db->prepare("EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) " . $sql);
$stmt->execute($params);
return implode("\n", $stmt->fetchAll(\PDO::FETCH_COLUMN));
}
}
package postgres
import (
"context"
"database/sql"
"fmt"
"strings"
)
// IndexExamples demonstrates PostgreSQL index strategies.
type IndexExamples struct {
db *sql.DB
}
// Search performs full-text search with ranking.
func (e *IndexExamples) Search(ctx context.Context, query string) ([]map[string]any, error) {
rows, err := e.db.QueryContext(ctx, `
SELECT id, name, description,
ts_rank_cd(search_vector, websearch_to_tsquery('russian', $1)) as rank
FROM products
WHERE search_vector @@ websearch_to_tsquery('russian', $1)
ORDER BY rank DESC LIMIT 20`, query)
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var id, name, desc string
var rank float64
rows.Scan(&id, &name, &desc, &rank)
results = append(results, map[string]any{
"id": id, "name": name, "description": desc, "rank": rank,
})
}
return results, rows.Err()
}
// ExplainQuery returns the execution plan for a SQL query.
func (e *IndexExamples) ExplainQuery(ctx context.Context, query string) (string, error) {
rows, err := e.db.QueryContext(ctx, "EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) "+query)
if err != nil {
return "", fmt.Errorf("explain: %w", err)
}
defer rows.Close()
var lines []string
for rows.Next() {
var line string
rows.Scan(&line)
lines = append(lines, line)
}
return strings.Join(lines, "\n"), rows.Err()
}
<?php
declare(strict_types=1);
/**
* CTE: readable complex queries
*/
final class CteExamples
{
public function __construct(
private readonly \PDO $db,
) {}
/**
* Recursive CTE: category tree
*/
public function getCategoryTree(int $rootId): array
{
$stmt = $this->db->prepare(<<<SQL
WITH RECURSIVE category_tree AS (
-- Base case: root category
SELECT id, name, parent_id, 0 as depth, ARRAY[id] as path
FROM categories
WHERE id = :root_id
UNION ALL
-- Recursive case: children
SELECT c.id, c.name, c.parent_id, ct.depth + 1, ct.path || c.id
FROM categories c
JOIN category_tree ct ON c.parent_id = ct.id
WHERE ct.depth < 10 -- Safety limit
)
SELECT * FROM category_tree ORDER BY path
SQL);
$stmt->execute(['root_id' => $rootId]);
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
/**
* CTE for step-by-step data processing
*/
public function getCustomerAnalytics(): array
{
return $this->db->query(<<<SQL
WITH customer_orders AS (
SELECT
user_id,
COUNT(*) as order_count,
SUM(total_amount) as total_spent,
MAX(created_at) as last_order
FROM orders
WHERE status = 'completed'
GROUP BY user_id
),
customer_segments AS (
SELECT
user_id,
order_count,
total_spent,
last_order,
CASE
WHEN total_spent > 10000 THEN 'vip'
WHEN total_spent > 1000 THEN 'regular'
ELSE 'new'
END as segment
FROM customer_orders
)
SELECT
segment,
COUNT(*) as customer_count,
AVG(order_count)::INT as avg_orders,
AVG(total_spent)::DECIMAL(10,2) as avg_spent
FROM customer_segments
GROUP BY segment
ORDER BY avg_spent DESC
SQL)->fetchAll(\PDO::FETCH_ASSOC);
}
}
package postgres
import (
"context"
"database/sql"
"fmt"
)
// CteExamples demonstrates CTEs for readable complex queries.
type CteExamples struct {
db *sql.DB
}
// GetCategoryTree uses a recursive CTE to build a category hierarchy.
func (c *CteExamples) GetCategoryTree(ctx context.Context, rootID int) ([]map[string]any, error) {
rows, err := c.db.QueryContext(ctx, `
WITH RECURSIVE category_tree AS (
SELECT id, name, parent_id, 0 as depth, ARRAY[id] as path
FROM categories WHERE id = $1
UNION ALL
SELECT c.id, c.name, c.parent_id, ct.depth + 1, ct.path || c.id
FROM categories c
JOIN category_tree ct ON c.parent_id = ct.id
WHERE ct.depth < 10
)
SELECT id, name, depth FROM category_tree ORDER BY path`, rootID)
if err != nil {
return nil, fmt.Errorf("category tree: %w", err)
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var id, name string
var depth int
rows.Scan(&id, &name, &depth)
results = append(results, map[string]any{"id": id, "name": name, "depth": depth})
}
return results, rows.Err()
}
// GetCustomerAnalytics uses a CTE pipeline for step-by-step processing.
func (c *CteExamples) GetCustomerAnalytics(ctx context.Context) ([]map[string]any, error) {
rows, err := c.db.QueryContext(ctx, `
WITH customer_orders AS (
SELECT user_id, COUNT(*) as order_count, SUM(total_amount) as total_spent
FROM orders WHERE status = 'completed' GROUP BY user_id
),
customer_segments AS (
SELECT *, CASE
WHEN total_spent > 10000 THEN 'vip'
WHEN total_spent > 1000 THEN 'regular'
ELSE 'new' END as segment
FROM customer_orders
)
SELECT segment, COUNT(*) as customer_count,
AVG(order_count)::INT as avg_orders,
AVG(total_spent)::DECIMAL(10,2) as avg_spent
FROM customer_segments GROUP BY segment ORDER BY avg_spent DESC`)
if err != nil {
return nil, fmt.Errorf("analytics: %w", err)
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var segment string
var count, avgOrders int
var avgSpent float64
rows.Scan(&segment, &count, &avgOrders, &avgSpent)
results = append(results, map[string]any{
"segment": segment, "customer_count": count,
"avg_orders": avgOrders, "avg_spent": avgSpent,
})
}
return results, rows.Err()
}
<?php
declare(strict_types=1);
/**
* Window functions: analytics without GROUP BY
*/
final class WindowFunctionExamples
{
public function __construct(
private readonly \PDO $db,
) {}
/**
* Running total, rank, moving average
*/
public function getSalesAnalytics(): array
{
return $this->db->query(<<<SQL
SELECT
order_date,
daily_revenue,
-- Running total
SUM(daily_revenue) OVER (ORDER BY order_date) as cumulative_revenue,
-- 7-day moving average
AVG(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7d,
-- Rank by revenue
RANK() OVER (ORDER BY daily_revenue DESC) as revenue_rank,
-- Percent of total
daily_revenue / SUM(daily_revenue) OVER () * 100 as pct_of_total
FROM (
SELECT
DATE(created_at) as order_date,
SUM(total_amount) as daily_revenue
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE(created_at)
) daily
ORDER BY order_date
SQL)->fetchAll(\PDO::FETCH_ASSOC);
}
/**
* Keyset (cursor) pagination: faster than OFFSET
*/
public function getOrdersPage(?string $lastId, int $limit = 20): array
{
if ($lastId === null) {
$stmt = $this->db->prepare(<<<SQL
SELECT id, user_id, total_amount, created_at
FROM orders
ORDER BY created_at DESC, id DESC
LIMIT :limit
SQL);
$stmt->execute(['limit' => $limit]);
} else {
$stmt = $this->db->prepare(<<<SQL
SELECT id, user_id, total_amount, created_at
FROM orders
WHERE (created_at, id) < (
SELECT created_at, id FROM orders WHERE id = :last_id
)
ORDER BY created_at DESC, id DESC
LIMIT :limit
SQL);
$stmt->execute(['last_id' => $lastId, 'limit' => $limit]);
}
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
}
}
package postgres
import (
"context"
"database/sql"
"fmt"
)
// WindowFunctionExamples demonstrates window functions and keyset pagination.
type WindowFunctionExamples struct {
db *sql.DB
}
// GetSalesAnalytics returns running totals, moving averages, and ranks.
func (w *WindowFunctionExamples) GetSalesAnalytics(ctx context.Context) ([]map[string]any, error) {
rows, err := w.db.QueryContext(ctx, `
SELECT order_date, daily_revenue,
SUM(daily_revenue) OVER (ORDER BY order_date) as cumulative_revenue,
AVG(daily_revenue) OVER (ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as moving_avg_7d,
RANK() OVER (ORDER BY daily_revenue DESC) as revenue_rank
FROM (
SELECT DATE(created_at) as order_date, SUM(total_amount) as daily_revenue
FROM orders WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE(created_at)
) daily ORDER BY order_date`)
if err != nil {
return nil, fmt.Errorf("sales analytics: %w", err)
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var date string
var revenue, cumulative, movingAvg float64
var rank int
rows.Scan(&date, &revenue, &cumulative, &movingAvg, &rank)
results = append(results, map[string]any{
"order_date": date, "daily_revenue": revenue,
"cumulative": cumulative, "moving_avg_7d": movingAvg, "rank": rank,
})
}
return results, rows.Err()
}
// GetOrdersPage implements keyset (cursor) pagination -- faster than OFFSET.
func (w *WindowFunctionExamples) GetOrdersPage(ctx context.Context, lastID *string, limit int) ([]map[string]any, error) {
var rows *sql.Rows
var err error
if lastID == nil {
rows, err = w.db.QueryContext(ctx,
`SELECT id, user_id, total_amount, created_at FROM orders
ORDER BY created_at DESC, id DESC LIMIT $1`, limit)
} else {
rows, err = w.db.QueryContext(ctx,
`SELECT id, user_id, total_amount, created_at FROM orders
WHERE (created_at, id) < (SELECT created_at, id FROM orders WHERE id = $1)
ORDER BY created_at DESC, id DESC LIMIT $2`, *lastID, limit)
}
if err != nil {
return nil, fmt.Errorf("get orders page: %w", err)
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var id, userID, createdAt string
var total float64
rows.Scan(&id, &userID, &total, &createdAt)
results = append(results, map[string]any{
"id": id, "user_id": userID, "total_amount": total, "created_at": createdAt,
})
}
return results, rows.Err()
}
<?php
declare(strict_types=1);
/**
* PostgreSQL connection best practices with PDO
*/
final class PdoConnectionFactory
{
public static function create(
string $host,
int $port,
string $dbname,
string $user,
string $password,
): \PDO {
$dsn = "pgsql:host={$host};port={$port};dbname={$dbname}";
$pdo = new \PDO($dsn, $user, $password, [
\PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
\PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
\PDO::ATTR_EMULATE_PREPARES => false, // Use native prepared statements
\PDO::ATTR_STRINGIFY_FETCHES => false, // Keep native types
]);
// Set session parameters
$pdo->exec("SET timezone = 'UTC'");
$pdo->exec("SET statement_timeout = '30s'");
return $pdo;
}
}
Best Practice: всегда используйте
EXPLAIN ANALYZEдля проверки планов запросов. Индексы не помогают, если PostgreSQL выбирает seq scan.
Итоги
- MVCC обеспечивает конкурентный доступ без блокировок между читателями и писателями
- JSONB позволяет хранить полуструктурированные данные с индексацией
- Правильный выбор индекса критичен: B-tree, GIN, GiST, BRIN для разных задач
- CTE и Window Functions решают сложные аналитические задачи в одном запросе
- Keyset pagination вместо OFFSET для больших наборов данных