En el post anterior trazamos una query SQL a través de cuatro etapas y vimos a DataFusion producir un resultado. Al final pasó algo importante que casi dejamos pasar: los operadores físicos se pasaron datos entre sí no como filas, sino como columnas. El scanner de CSV no produjo una lista de registros de empleados. Produjo un paquete de arrays, uno por columna, empaquetados de forma contigua en memoria.
Eso no fue un detalle de implementación. Fue la decisión de diseño que volvió rápido al resto del motor. Si el post anterior era sobre el pipeline, este es sobre la forma física de los datos que se mueven dentro de ese pipeline.
Cada query engine analítico moderno, DataFusion, DuckDB, Polars, Velox, Tungsten de Spark, termina más o menos en el mismo lugar: almacenar datos columna por columna, procesarlos en lotes, y evitar tocar bytes que no hacen falta. La razón no es estética. Es hardware. Los CPUs modernos premian tanto el layout columnar que cambiar solo la disposición de los datos puede transformar una query lenta en una sorprendentemente rápida sobre la misma máquina y los mismos datos.
Este post explica por qué pasa eso y después muestra el formato que hizo portable la idea: Apache Arrow. Arrow ganó porque resolvía un dolor real que todos compartían, no porque alguien lo declarara estándar desde el principio.
Las filas no eran una mala idea. Solo resolvían otro problema #
Las bases de datos tradicionales almacenan datos como almacenarías una planilla en papel: una fila tras otra.
Layout en memoria (por filas):
[1, "Alice", "Engineering", 95000, "2021-03-15"]
[2, "Bob", "Sales", 87000, "2019-06-01"]
[3, "Carol", "Engineering", 102000, "2022-01-10"]
...Esto tiene perfecto sentido para transacciones. Cuando haces INSERT de una fila, escribís un bloque contiguo. Cuando haces UPDATE de un campo, encontrás la fila y la parchás in-place. Cuando buscas un cliente por clave primaria, lees un chunk y obtenés todo sobre ese cliente de una vez.
Pero las queries analíticas no son transacciones. Escanean millones de filas y tocan un puñado de columnas:
SELECT AVG(salary) FROM employees WHERE department = 'Engineering'Esto necesita dos columnas: department y salary. En un layout por filas, el CPU carga los cinco campos de cada fila en cache solo para leer dos. Para una tabla de producción con 50 columnas donde la query usa 3, eso es 94% de ancho de banda de memoria desperdiciado. Cada cache line que el CPU busca es mayormente basura.
La máquina está trabajando. Buena parte de ese trabajo sobra.
Tres razones por las que columnar gana #
El almacenamiento columnar invierte el layout. En vez de agrupar todos los campos de un registro juntos, agrupa todos los valores de una columna juntos:
Layout en memoria (columnar):
ids: [1, 2, 3, ...]
names: ["Alice", "Bob", "Carol", ...]
departments: ["Engineering", "Sales", "Engineering", ...]
salaries: [95000, 87000, 102000, ...]
hire_dates: ["2021-03-15", "2019-06-01", "2022-01-10", ...]Ahora leer salary significa leer un array contiguo de números. Sin desvíos por nombres y fechas. Sin bytes desperdiciados. El CPU carga exactamente lo que necesita y nada más.
Este layout explota tres propiedades del hardware moderno.
Cache lines #
Los CPUs no leen bytes individuales de RAM. Leen cache lines: bloques alineados de 64 bytes. Cuando accedes a un solo entero, el CPU busca toda la línea de 64 bytes donde vive. Lo que sea que esté en esa línea viene gratis.
En un layout por filas, una cache line podría contener el id, name, department, salary y hire_date de un empleado. Si solo necesitas el salary, cuatro de cinco campos son desperdicio.
En un layout columnar, una cache line de la columna salary contiene 16 valores de salary consecutivos (64 bytes / 4 bytes por i32 = 16 valores). Cada byte es útil. El CPU busca menos memoria total y hace más trabajo útil por búsqueda.
SIMD #
Los procesadores modernos tienen registros SIMD, de 256 o 512 bits de ancho, que aplican una instruccion a multiples valores simultaneamente. Una instruccion AVX2 puede comparar ocho enteros de 32 bits contra un umbral en un solo ciclo. Una instruccion AVX-512 duplica eso.
Pero SIMD requiere datos contiguos y alineados. No podes dispersar ocho salaries a traves de ocho structs de fila diferentes y esperar que el hardware vectorice la comparacion. El layout columnar le entrega los datos al CPU en exactamente el formato que SIMD necesita: un array contiguo de valores homogeneos.
Cuando DataFusion evalua WHERE salary > 90000, el kernel de computo Arrow subyacente puede procesar la columna de salary en chunks del ancho de SIMD, produciendo un bitmap de indices que coinciden con minimo branching. La evaluacion por filas verificaria una fila a la vez, con un branch por fila.
Compresion #
Las columnas contienen datos homogeneos: todos strings, todos enteros, todas fechas. Los valores en la misma columna tienden a agruparse. Los departamentos se repiten. Los salaries caen en un rango estrecho. Los timestamps son monotonicamente crecientes.
Esta homogeneidad hace que los datos columnares sean extraordinariamente compresibles:
- Codificacion por diccionario reemplaza strings repetidos con codigos enteros. Una columna de departamento con cuatro valores distintos se convierte en cuatro enteros mas una tabla de lookup, sin importar la cantidad de filas.
- Codificacion run-length colapsa secuencias de valores identicos. Una columna de status ordenada con un millon de filas “active” seguidas de diez mil “inactive” se comprime a dos entradas.
- Codificacion delta almacena enteros ordenados como diferencias respecto al valor anterior. Una columna de IDs secuenciales se convierte en una columna de unos.
Los datos por filas mezclan tipos en cada bloque, derrotando todos estos esquemas. Los datos columnares le entregan a cada esquema un flujo de un solo tipo, altamente redundante. El resultado: archivos mas pequenos, menos I/O, queries mas rapidas. Los archivos Parquet son tipicamente 5-10x mas pequenos que archivos CSV equivalentes, y los ahorros se componen con la cantidad de columnas.
Apache Arrow: el formato en el que todos terminaron convergiendo #
Antes de Arrow, cada herramienta tenía su propio formato columnar en memoria. Spark tenía Tungsten. Pandas tenía su block manager interno. Impala tenía su formato. Cuando movías datos entre herramientas, serializabas del formato A y deserializabas al formato B. Para un notebook de Python llamando a un cluster de Spark, esto significaba copiar gigabytes de datos a través de un cuello de botella de serialización que con frecuencia dominaba el tiempo total de ejecución.
Apache Arrow nacio de una observacion simple: si toda herramienta usara el mismo formato en memoria, los datos podrian moverse entre ellas al costo de un puntero, no de una copia.
Arrow es tres cosas:
- Una especificacion de como los datos columnares se disponen en memoria: alineaciones de buffers, representacion de nulls, esquemas de offsets para tipos de longitud variable.
- Un conjunto de librerias implementando esa especificacion en Rust, C++, Java, Python, Go, y mas.
- Un contrato que dice que cualquier sistema compatible con Arrow puede leer los datos de cualquier otro sistema compatible con Arrow sin conversion.
Arrow no es un formato de archivo. Parquet es un formato de archivo (columnar, comprimido, en disco). Arrow es la representacion en memoria: como se ven los datos una vez que dejan el archivo y entran al dominio del CPU. Parquet va al disco. Arrow vive en RAM. DataFusion lee Parquet del disco a Arrow en memoria, lo procesa, y produce Arrow como salida. Sin formato intermedio. Sin copias.
Arrow a nivel de bytes #
Un array de Arrow es refrescantemente simple. Sin headers de objetos. Sin punteros vtable. Sin indirecciones. Solo buffers planos.
Tipos de ancho fijo #
Una columna de valores i32 [1, NULL, 3, 4] se almacena como dos buffers:
Bitmap de validez: [1, 0, 1, 1] (1 bit por valor: 1=valido, 0=null)
Buffer de datos: [1, ?, 3, 4] (valores i32 contiguos; ? = indefinido)El bitmap de validez usa un bit por valor. Una columna de un millon de filas necesita un bitmap de 125 KB. Verificar null en el indice i es un AND a nivel de bits: bitmap[i / 8] & (1 << (i % 8)).
El buffer de datos es un [i32] plano. Acceder al valor i es aritmetica de punteros: base + i * 4. Sin indirecciones. Sin perseguir punteros. El patron de acceso es perfectamente predecible, lo que significa que el prefetcher del CPU puede adelantarse a la computacion y practicamente eliminar los cache misses.
Tipos de ancho variable #
Los strings no pueden ser de ancho fijo. Arrow los maneja con un buffer de offsets:
Valores: ["hello", "world", "!"]
Offsets: [0, 5, 10, 11] (n+1 enteros: posicion de inicio de cada string, mas la longitud total)
Data: "helloworld!" (todos los strings concatenados, sin separadores, sin null terminators)Para leer el string i: cortar data[offsets[i]..offsets[i+1]]. El string 1 es data[5..10] = "world". Dos accesos a array, un slice, cero allocaciones en heap.
Compara esto con un Vec<String> en Rust, donde cada string es una allocacion de heap separada con su propio puntero, longitud y capacidad. Para un millon de strings, eso es un millon de allocaciones dispersas por el heap. Arrow almacena el mismo millon de strings en dos buffers contiguos: uno para offsets, uno para caracteres. La diferencia en comportamiento de cache es enorme.
Record batches #
Una sola columna rara vez es util sola. Un record batch agrupa multiples columnas de igual longitud con un schema:
RecordBatch:
Schema: {id: Int32, name: Utf8, salary: Float64}
Columnas:
id: [1, 2, 3]
name: ["Alice", "Bob", "Carol"]
salary: [95000.0, 87000.0, 102000.0]
Cantidad de filas: 3Los query engines streamean datos como record batches. Los tamanos tipicos van de 1,024 a 65,536 filas: suficientemente grandes para amortizar el overhead por batch, suficientemente pequenos para caber en cache L2, y naturalmente compatibles con streaming (no necesitas cargar el dataset entero antes de procesar el primer batch).
En el post anterior, cuando vimos a los operadores de DataFusion pasarse datos entre etapas, esto es lo que se pasaban: Arrow RecordBatches. El scanner de CSV los producia. El operador de filtro los consumia y producia. El operador de agregacion los consumia y producia un batch final. Sin conversion de formato en ningun punto del pipeline.
Construyendo Arrow en Rust #
El crate arrow es una de las bases de DataFusion. Construyamos las mismas formas de datos de las que venimos hablando.
[dependencies]
arrow = "55"Arrays tipados #
use arrow::array::{Int32Array, StringArray, Float64Array};
fn main() {
// Una columna de enteros con un null en el indice 1
let ids = Int32Array::from(vec![Some(1), None, Some(3), Some(4)]);
println!("ids[0] = {}", ids.value(0)); // 1
println!("ids[1] null? {}", ids.is_null(1)); // true
// Una columna de strings
let names = StringArray::from(vec!["Alice", "Bob", "Carol", "Dave"]);
println!("names[2] = {}", names.value(2)); // Carol
// Una columna de floats
let salaries = Float64Array::from(vec![95000.0, 87000.0, 102000.0, 78000.0]);
println!("salaries[0] = {}", salaries.value(0)); // 95000.0
}Int32Array::from(vec![Some(1), None, Some(3), Some(4)]) aloca dos buffers: un buffer de datos de 16 bytes y un bitmap de validez de 1 byte. El None pone el bit 1 en cero. Internamente, esto es exactamente el layout que describimos: arrays planos, sin indirecciones. El sistema de tipos te impide accidentalmente tratar un valor null como valido, y el layout de memoria es identico en cada lenguaje que habla Arrow.
Record batches #
use std::sync::Arc;
use arrow::array::{Int32Array, StringArray, Float64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
fn main() -> Result<(), arrow::error::ArrowError> {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("department", DataType::Utf8, false),
Field::new("salary", DataType::Float64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "Dave", "Eve"])),
Arc::new(StringArray::from(vec![
"Engineering", "Sales", "Engineering", "Marketing", "Engineering",
])),
Arc::new(Float64Array::from(vec![95000.0, 87000.0, 102000.0, 78000.0, 110000.0])),
],
)?;
println!("Filas: {}, Columnas: {}", batch.num_rows(), batch.num_columns());
// Filas: 5, Columnas: 4
Ok(())
}RecordBatch::try_new valida en tiempo de construccion: todas las columnas deben tener igual longitud y coincidir con los tipos del schema. Pasa un Float64Array donde el schema espera Int32 y obtienes un error antes de que corra cualquier query. Es el mismo principio que el sistema de tipos de Rust: atrapar el error en la frontera, no en medio de la computacion.
Filtrado: mascaras booleanas en vez de if-else #
En un mundo por filas, filtras con un loop y un branch por fila. En Arrow, filtras con una mascara booleana aplicada a una columna entera:
use arrow::array::{StringArray, Float64Array, BooleanArray};
use arrow::compute;
fn main() {
let departments = StringArray::from(vec![
"Engineering", "Sales", "Engineering", "Marketing", "Engineering",
]);
let salaries = Float64Array::from(vec![95000.0, 87000.0, 102000.0, 78000.0, 110000.0]);
// Construir la mascara: cuales filas son Engineering?
let mask = BooleanArray::from(
departments.iter()
.map(|d| Some(d == Some("Engineering")))
.collect::<Vec<_>>()
);
// Aplicar la mascara
let filtered = compute::filter(&salaries, &mask).unwrap();
let result = filtered.as_any().downcast_ref::<Float64Array>().unwrap();
for i in 0..result.len() {
println!("{}", result.value(i));
}
// 95000.0
// 102000.0
// 110000.0
}La mascara es en si misma un array de Arrow: un bit por fila. El kernel compute::filter usa esa mascara para extraer valores coincidentes de la columna de salary en un loop cerrado que el compilador frecuentemente puede auto-vectorizar en instrucciones SIMD. Sin branch por fila. Sin overhead de objeto-por-fila. Solo arrays entran, arrays salen.
Agregacion: kernels de computo #
use arrow::array::Float64Array;
use arrow::compute;
fn main() {
let salaries = Float64Array::from(vec![95000.0, 102000.0, 110000.0]);
let sum = compute::sum(&salaries).unwrap();
let min = compute::min(&salaries).unwrap();
let max = compute::max(&salaries).unwrap();
let avg = sum / salaries.len() as f64;
println!("Sum: {sum}"); // 307000.0
println!("Min: {min}"); // 95000.0
println!("Max: {max}"); // 110000.0
println!("Avg: {avg}"); // 102333.33
}Estos kernels son loops cerrados sobre memoria contigua. LLVM auto-vectoriza el kernel sum en sumas SIMD. Para un array de un millon de elementos, la computacion corre a velocidad de ancho de banda de memoria: el CPU suma valores tan rapido como puede buscarlos de RAM. Este es el payoff del layout columnar. Los datos estan dispuestos exactamente como el hardware los quiere.
De disco a memoria: Parquet a Arrow #
Parquet es el formato de archivo columnar. Arrow es el formato de memoria columnar. Juntos forman el puente de almacenamiento a computo:
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = File::open("employees.parquet")?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
// Projection push-down: solo descomprimir estas columnas
let reader = builder
.with_projection(arrow::compute::ProjectionMask::leaves(
builder.parquet_schema(),
vec![2, 3], // department, salary
))
.with_batch_size(4096)
.build()?;
for batch in reader {
let batch = batch?;
println!("Batch: {} filas, {} columnas", batch.num_rows(), batch.num_columns());
}
Ok(())
}El .with_projection() es projection push-down a nivel de archivo. El lector de Parquet nunca descomprime los chunks de columna de id, name, o hire_date. En una tabla con 50 columnas, esto significa leer y descomprimir 2 columnas en vez de 50. Los datos entran a memoria ya en formato Arrow, listos para los kernels de computo. Sin paso de parsing, sin paso de conversion, sin representacion intermedia.
La evidencia del mundo real #
Esto no es una victoria teórica. Se nota en sistemas reales.
DuckDB corre benchmarks analiticos TPC-H en un laptop mas rapido que algunos clusters distribuidos ejecutando bases de datos por filas. Un solo thread satura el ancho de banda de memoria porque el motor columnar no desperdicia ninguna cache line, ninguna lane SIMD, ningun ciclo de bus de memoria en datos que no necesita.
Polars, construido sobre Arrow en Rust, procesa archivos CSV de multiples gigabytes en segundos. El secreto no son algoritmos ingeniosos. Es leer datos en formato Arrow al ingestarlos y nunca salir de el.
Apache Spark aprendio esta leccion por las malas. El Spark temprano usaba un objeto Row basado en filas para todo. El rendimiento era pobre. El proyecto Tungsten introdujo un formato columnar off-heap, y luego Spark adopto Arrow para interoperabilidad con Python (PySpark). El resultado fueron mejoras de un orden de magnitud para cargas de trabajo pesadas en agregaciones y joins.
DataFusion usa Arrow como el unico formato de intercambio de datos entre operadores. Un filtro produce un Arrow RecordBatch. Un aggregate lo consume. Sin serializacion entre etapas. Sin conversion de formato. Los datos fluyen a traves del motor como memoria contigua, desde el disco hasta el resultado final.
Cuando las filas todavia ganan #
Columnar no es universalmente mejor. El tradeoff es real.
Transacciones. Cuando haces INSERT de una fila, el almacenamiento por filas escribe un bloque contiguo. El almacenamiento columnar debe appendear a cada archivo de columna. Por eso PostgreSQL y MySQL usan almacenamiento por filas: su pan de cada dia es OLTP, una fila a la vez, millones de veces por segundo.
Busquedas puntuales. Buscar un usuario por clave primaria en almacenamiento por filas es una lectura. En almacenamiento columnar, es una lectura por columna. Con un indice, la lectura por filas es una cache line. La lectura columnar son N cache lines para N columnas.
Patrones de acceso a filas completas. Si la query es SELECT * FROM users WHERE id = 42, columnar no tiene ventaja: necesitas cada columna de todas formas.
La division es limpia: OLTP favorece filas. OLAP favorece columnas. Por eso la industria esta convergiendo en arquitecturas hibridas: PostgreSQL para transacciones, DuckDB o DataFusion para analitica, Parquet como puente entre ambos. Algunos sistemas, como AlloyDB y CockroachDB, estan experimentando con ambos layouts en el mismo motor, eligiendo por query.
Que sigue #
Ya cubrimos el pipeline (post 1) y el formato de memoria que lo hace rapido (este post). El siguiente paso es la parte donde el sistema de tipos de Rust tiene oportunidad de brillar: planes logicos y expresiones. Como un SELECT ... WHERE ... GROUP BY se convierte en un arbol de variantes de enum tipadas sobre las que el optimizador puede hacer pattern matching, reescribir, y entregar al ejecutor. Si la analogia del compilador se sostiene, aqui es donde construimos la representacion intermedia.
Referencia: