SYSTEM_DESIGN
System Design: Time Series Database
Design a time series database optimized for high-throughput metric ingestion and fast range queries, supporting billions of data points from IoT devices, application monitoring, and financial tick data.
Requirements
Functional Requirements:
- Ingest time series data points: (metric_name, tags, timestamp, value)
- Query data by time range, metric name, tag filters
- Support aggregations: sum, avg, min, max, percentiles over time windows
- Downsampling/rollups: auto-aggregate old data to reduce storage
- Retention policies: auto-expire data after N days
- Alert rules: trigger notifications when metric crosses threshold
Non-Functional Requirements:
- Ingest 10 million data points/sec
- Sub-10ms query latency for 1-day time range queries
- Store 100 trillion data points
- Compression: >10x vs. raw float64 storage
- 99.9% write availability
Scale Estimation
At 10 million points/sec, with each point being 16 bytes (8-byte timestamp + 8-byte float value) uncompressed, raw ingestion is 160 MB/sec. Gorilla compression (Facebook's TSDB algorithm) achieves 1.37 bytes/point on average for typical metric data, reducing ingestion bandwidth to 13.7 MB/sec. Over 1 year (31.5 million seconds), at 10M points/sec: 315 quadrillion data points. With Gorilla compression at 1.37 bytes/point, total storage is ~430 PB. With aggressive rollup policies (downsample to 1-minute resolution after 7 days, 1-hour after 30 days), storage drops 10–100x for aged data.
High-Level Architecture
The system separates ingestion from query serving. An ingestion tier receives data points via Kafka, buffers recent data in memory, and writes to storage. A query tier serves read requests against persistent storage. A compaction tier runs background jobs for compression, downsampling, and retention enforcement. A metadata service maintains the metric catalog (all known metric names + tag key-value pairs) and an inverted index for tag-based lookups.
Data points flow through Kafka (partitioned by metric_name hash) into ingestion workers. Each ingestion worker maintains a per-metric in-memory buffer (the "head chunk") holding the last 2 hours of data. When a head chunk is full or 2 hours elapse, it is sealed, Gorilla-compressed, and written to the chunk store (object storage like S3 or a local SSD store). Prometheus uses this exact model (WAL + head block + immutable blocks on disk). The head block is also written to a WAL (write-ahead log) for crash recovery.
Queries arrive at a query frontend, which parses the query language (PromQL, InfluxQL, or custom DSL), identifies the time range and metric/tag filters, fetches the relevant compressed chunks from storage (potentially from multiple shard nodes), decompresses and merges the data streams, applies aggregation functions, and returns results. The query frontend caches recently computed query results (1-second resolution query cache) to avoid redundant decompression for repeated dashboard refreshes.
Core Components
Gorilla Compression
Gorilla is a delta-of-delta encoding for timestamps and XOR encoding for values. Timestamps: store the first timestamp raw (8 bytes). For each subsequent timestamp, store the delta from the previous delta (delta-of-delta). Most metrics arrive at regular intervals (10s, 60s), so delta-of-delta is 0 — encoded as 1 bit. Irregular timestamps require 7 or 12 additional bits. Values: XOR the current float64 with the previous float64. Most consecutive values in a slowly changing metric share leading and trailing zeros in the XOR — store only the meaningful middle bits. Together, Gorilla achieves 1.37 bytes/point on typical metrics (vs. 16 bytes raw): 11.7x compression.
Inverted Index for Tag Queries
Time series are identified by a metric name + a set of tag key-value pairs (e.g., cpu_usage{host="web1", region="us-east"}). To efficiently query "all cpu_usage time series where region=us-east", the system maintains an inverted index: (tag_key, tag_value) → set of series_ids. This maps exactly to the Elasticsearch inverted index model. The inverted index is stored in memory (Roaring Bitmaps for compact series_id sets) and on disk (serialized to persistent storage). For large deployments, the index is sharded by metric name. Label cardinality explosion (millions of unique tag values like user_ids in tags) is a known anti-pattern that bloats the inverted index.
Downsampling & Retention
A background compaction service runs continuously, processing chunks that have aged past the downsampling threshold. For data older than 7 days, 10-second resolution is downsampled to 1-minute resolution: for each 1-minute window, compute mean, min, max, and percentile estimates (using t-digest or DDSketch). Downsampled data replaces the original at the 7-day boundary, reducing storage by 6x for 10s → 1min downsampling. Retention enforcement deletes all chunks (original + downsampled) older than the configured retention period. Both operations are performed lazily (no global lock) by iterating through object storage and deleting expired chunks.
Database Design
Data storage uses a two-level structure: chunk files (compressed blocks of time series data for a specific time range) stored in object storage (S3) or local SSD, and a chunk index mapping (series_id, time_range) → chunk_file_location. Series metadata (series_id, metric_name, tags) is stored in a dedicated metadata store (LevelDB or PostgreSQL). Prometheus's TSDB uses memory-mapped block files on local disk; InfluxDB IOx uses Apache Parquet format stored on object storage, enabling SQL-like queries via DataFusion. The WAL ensures in-flight data is not lost on crash: on restart, WAL is replayed to reconstruct the in-memory head chunks.
API Design
Scaling & Bottlenecks
The ingestion tier bottleneck is the WAL write rate. A single Prometheus instance can ingest 1–2 million time series with 15-second scrape intervals (~100,000 samples/sec). Scaling beyond this requires horizontal sharding by metric name (consistent hashing). Thanos, Cortex (now Grafana Mimir), and VictoriaMetrics extend Prometheus with horizontal scaling by adding a sharded write gateway (distributes writes to multiple Prometheus instances) and a query frontend (fans out queries to all shards and merges results).
Query performance for long time ranges (1 month, 1 year) depends on how much data must be decompressed. With proper downsampling, a 1-year range query against 1-hour resolution data reads 8,760 data points per series — easily served in <10ms. Without downsampling, the same query reads 210 million 10-second data points per series. Query parallelism (fan out to storage nodes holding different time chunks) speeds up long-range queries. Dedicated query nodes with warm chunk caches (frequently queried metrics kept decompressed in DRAM) serve dashboard refreshes in 1–2ms.
Key Trade-offs
- Pull vs. push ingestion: Prometheus pull model (scrape targets) simplifies discovery but requires network connectivity from Prometheus to targets; push model (agents write to TSDB) handles ephemeral targets and cross-network ingestion
- Gorilla vs. columnar (Parquet): Gorilla is optimal for per-series sequential access; Parquet is better for cross-series analytical queries ("show all metrics for this time range"); InfluxDB IOx uses Parquet for this reason
- Local vs. remote storage: Local SSD storage is fastest but limits scalability and complicates replication; remote object storage (S3) scales infinitely but adds 10–50ms read latency for uncached data
- High cardinality tag values vs. inverted index size: Using user_id, request_id, or trace_id as tag values creates billions of unique series, exploding the inverted index and making tag queries extremely slow; metric naming conventions must prohibit unbounded cardinality tags
GO DEEPER
Master this topic in our 12-week cohort
Our Advanced System Design cohort covers this and 11 other deep-dive topics with live sessions, assignments, and expert feedback.