Jekyll's data processing capabilities are often limited by sequential execution and memory constraints when handling large datasets. By building sophisticated Ruby data processing pipelines, you can transform, aggregate, and analyze data with exceptional performance while maintaining Jekyll's simplicity. This technical guide explores advanced Ruby techniques for building ETL (Extract, Transform, Load) pipelines that leverage parallel processing, streaming data, and memory optimization to handle massive datasets efficiently within Jekyll's build process.
Effective data pipeline architecture separates extraction, transformation, and loading phases while providing fault tolerance and monitoring. The pipeline design uses the processor pattern with composable stages that can be reused across different data sources.
The architecture comprises source adapters for different data formats, processor chains for transformation logic, and sink adapters for output destinations. Each stage implements a common interface allowing flexible composition. Error handling, logging, and performance monitoring are built into the pipeline framework to ensure reliability and visibility.
module Jekyll
module DataPipelines
# Base pipeline architecture
class Pipeline
def initialize(stages = [])
@stages = stages
@metrics = PipelineMetrics.new
end
def process(data)
@metrics.record_start
result = @stages.reduce(data) do |current_data, stage|
@metrics.record_stage_start(stage)
processed_data = stage.process(current_data)
@metrics.record_stage_complete(stage, processed_data)
processed_data
end
@metrics.record_complete(result)
result
rescue => e
@metrics.record_error(e)
raise PipelineError.new("Pipeline processing failed", e)
end
def |(other_stage)
self.class.new(@stages + [other_stage])
end
end
# Base stage class
class Stage
def process(data)
raise NotImplementedError, "Subclasses must implement process method"
end
def |(other_stage)
Pipeline.new([self, other_stage])
end
end
# Specific stage implementations
class ExtractStage < Stage
def initialize(source_adapter)
@source = source_adapter
end
def process(_ = nil)
@source.extract
end
end
class TransformStage < Stage
def initialize(transformer)
@transformer = transformer
end
def process(data)
@transformer.transform(data)
end
end
class LoadStage < Stage
def initialize(sink_adapter)
@sink = sink_adapter
end
def process(data)
@sink.load(data)
data # Return data for potential further processing
end
end
# Pipeline builder for fluent interface
class PipelineBuilder
def initialize
@stages = []
end
def extract(source_adapter)
@stages << ExtractStage.new(source_adapter)
self
end
def transform(transformer)
@stages << TransformStage.new(transformer)
self
end
def load(sink_adapter)
@stages << LoadStage.new(sink_adapter)
self
end
def build
Pipeline.new(@stages)
end
end
end
end
# Usage example:
pipeline = Jekyll::DataPipelines::PipelineBuilder.new
.extract(JsonFileSource.new('_data/products.json'))
.transform(ProductNormalizer.new)
.transform(ImageProcessor.new)
.load(JekyllDataSink.new('products'))
.build
pipeline.process
Parallel processing dramatically improves performance for CPU-intensive data transformations. Ruby's threads and fibers enable concurrent execution while managing shared state and resource limitations.
Here's an implementation of parallel data processing for Jekyll:
module Jekyll
module ParallelProcessing
class ParallelProcessor
def initialize(worker_count: Etc.nprocessors - 1)
@worker_count = worker_count
@queue = Queue.new
@results = Queue.new
@workers = []
end
def process_batch(data, &block)
setup_workers(&block)
enqueue_data(data)
wait_for_completion
collect_results
ensure
stop_workers
end
def process_stream(enum, &block)
# Use fibers for streaming processing
fiber_pool = FiberPool.new(@worker_count)
enum.lazy.map do |item|
fiber_pool.schedule { block.call(item) }
end.each(&:resume)
end
private
def setup_workers(&block)
@worker_count.times do
@workers << Thread.new do
while item = @queue.pop
break if item == :TERMINATE
begin
result = block.call(item)
@results << [item, result, nil]
rescue => e
@results << [item, nil, e]
end
end
end
end
end
def enqueue_data(data)
data.each { |item| @queue << item }
@worker_count.times { @queue << :TERMINATE }
end
def wait_for_completion
@workers.each(&:join)
end
def collect_results
results = []
errors = []
until @results.empty?
item, result, error = @results.pop(true) rescue nil
if error
errors << { item: item, error: error }
elsif result
results << result
end
end
{ results: results, errors: errors }
end
def stop_workers
@workers.each do |worker|
worker.kill if worker.alive?
end
end
end
# Fiber-based processing for I/O bound operations
class FiberPool
def initialize(size)
@size = size
@queue = Queue.new
@fibers = size.times.map { create_worker_fiber }
end
def schedule(&job)
fiber = @fibers.find(&:alive?) || create_worker_fiber
fiber.transfer(job)
end
private
def create_worker_fiber
Fiber.new do |job|
loop do
result = job.call
job = Fiber.yield(result)
end
end
end
end
# Parallel data processing plugin for Jekyll
class ParallelDataProcessor < Generator
def generate(site)
@site = site
# Process large datasets in parallel
process_large_collections
process_external_data_sources
generate_parallel_content
end
private
def process_large_collections
processor = ParallelProcessor.new
@site.collections.each do |name, collection|
next if collection.docs.size < 50 # Only parallelize large collections
results = processor.process_batch(collection.docs) do |doc|
process_document_parallel(doc)
end
handle_processing_results(results, name)
end
end
def process_document_parallel(doc)
{
id: doc.id,
enhanced_data: {
semantic_analysis: analyze_semantics(doc.content),
readability_score: calculate_readability(doc.content),
related_content: find_related_content(doc),
seo_optimization: generate_seo_suggestions(doc)
},
processing_time: Time.now
}
end
def process_external_data_sources
# Process external APIs and data sources in parallel
data_sources = @site.config['external_data_sources'] || []
processor = ParallelProcessor.new
results = processor.process_batch(data_sources) do |source|
fetch_and_process_external_data(source)
end
# Store processed external data
results[:results].each do |data|
@site.data[data[:source_name]] = data[:content]
end
end
def fetch_and_process_external_data(source)
require 'net/http'
require 'json'
uri = URI.parse(source['url'])
response = Net::HTTP.get_response(uri)
if response.is_a?(Net::HTTPSuccess)
raw_data = JSON.parse(response.body)
transform_external_data(raw_data, source['transformations'])
else
raise "Failed to fetch #{source['url']}: #{response.code}"
end
end
end
end
end
Streaming processing enables handling datasets larger than available memory by processing data in chunks. This approach is essential for large Jekyll sites with extensive content or external data sources.
Here's a streaming data processing implementation:
module Jekyll
module StreamingProcessing
class StreamProcessor
def initialize(batch_size: 1000)
@batch_size = batch_size
end
def process_large_dataset(enum, &processor)
enum.each_slice(@batch_size).lazy.map do |batch|
process_batch(batch, &processor)
end
end
def process_file_stream(path, &processor)
# Stream process large files line by line
File.open(path, 'r') do |file|
file.lazy.each_slice(@batch_size).map do |lines|
process_batch(lines, &processor)
end
end
end
def transform_stream(input_enum, transformers)
transformers.reduce(input_enum) do |stream, transformer|
stream.lazy.flat_map { |item| transformer.transform(item) }
end
end
private
def process_batch(batch, &processor)
batch.map { |item| processor.call(item) }
end
end
# Memory-efficient data transformations
class LazyTransformer
def initialize(&transform_block)
@transform_block = transform_block
end
def transform(data)
data.lazy.map(&@transform_block)
end
end
class LazyFilter
def initialize(&filter_block)
@filter_block = filter_block
end
def transform(data)
data.lazy.select(&@filter_block)
end
end
# Streaming file processor for large data files
class StreamingFileProcessor
def process_large_json_file(file_path)
# Process JSON files that are too large to load into memory
File.open(file_path, 'r') do |file|
json_stream = JsonStreamParser.new(file)
json_stream.each_object.lazy.map do |obj|
process_json_object(obj)
end.each do |processed|
yield processed if block_given?
end
end
end
def process_large_csv_file(file_path, &processor)
require 'csv'
CSV.foreach(file_path, headers: true).lazy.each_slice(1000) do |batch|
processed_batch = batch.map(&processor)
yield processed_batch if block_given?
end
end
end
# JSON stream parser for large files
class JsonStreamParser
def initialize(io)
@io = io
@buffer = ""
end
def each_object
return enum_for(:each_object) unless block_given?
in_object = false
depth = 0
object_start = 0
@io.each_char do |char|
@buffer << char
case char
when '{'
depth += 1
in_object = true if depth == 1
object_start = @buffer.length - 1 if depth == 1
when '}'
depth -= 1
if depth == 0 && in_object
object_json = @buffer[object_start..-1]
yield JSON.parse(object_json) rescue nil
@buffer.clear
in_object = false
end
end
end
end
end
# Memory-optimized Jekyll generator
class StreamingDataGenerator < Generator
def generate(site)
@site = site
@stream_processor = StreamProcessor.new
process_large_data_files
generate_streaming_content
optimize_memory_usage
end
private
def process_large_data_files
# Process large data files without loading into memory
data_dir = File.join(@site.source, '_data')
Dir.glob(File.join(data_dir, '*.json')).each do |json_file|
next if File.size(json_file) < 10_000_000 # Only stream large files
stream_processor = StreamingFileProcessor.new
collection_name = File.basename(json_file, '.json')
stream_processor.process_large_json_file(json_file) do |obj|
process_streaming_data_object(obj, collection_name)
end
end
end
def process_streaming_data_object(obj, collection_name)
# Process individual objects from stream
enhanced_obj = enhance_data_object(obj)
# Create Jekyll documents from streamed objects
create_document_from_stream(enhanced_obj, collection_name)
end
def create_document_from_stream(data, collection_name)
doc = Document.new(
data['content'] || '',
{
site: @site,
collection: @site.collections[collection_name]
}
)
doc.data = data
doc.data['layout'] ||= 'default'
@site.collections[collection_name].docs << doc
end
def optimize_memory_usage
# Force garbage collection after memory-intensive operations
GC.start
# Monitor memory usage
memory_usage = `ps -o rss= -p #{Process.pid}`.to_i / 1024
Jekyll.logger.info "Memory usage: #{memory_usage}MB"
if memory_usage > 500 # 500MB threshold
Jekyll.logger.warn "High memory usage detected, optimizing..."
optimize_large_collections
end
end
def optimize_large_collections
@site.collections.each do |name, collection|
next if collection.docs.size < 1000
# Convert to lazy enumeration for memory efficiency
collection.define_singleton_method(:docs) do
@lazy_docs ||= @docs.lazy
@lazy_docs
end
end
end
end
end
end
Ruby's Enumerable module provides powerful data transformation capabilities. Advanced techniques like lazy evaluation, method chaining, and custom enumerators enable complex data processing with clean, efficient code.
module Jekyll
module DataTransformation
# Advanced enumerable utilities for data processing
module EnumerableUtils
def self.grouped_transformation(enum, group_size, &transform)
enum.each_slice(group_size).lazy.flat_map(&transform)
end
def self.pipelined_transformation(enum, *transformers)
transformers.reduce(enum) do |current, transformer|
current.lazy.map { |item| transformer.call(item) }
end
end
def self.memoized_transformation(enum, &transform)
cache = {}
enum.lazy.map do |item|
cache[item] ||= transform.call(item)
end
end
end
# Data transformation DSL
class TransformationBuilder
def initialize
@transformations = []
end
def map(&block)
@transformations << ->(enum) { enum.lazy.map(&block) }
self
end
def select(&block)
@transformations << ->(enum) { enum.lazy.select(&block) }
self
end
def reject(&block)
@transformations << ->(enum) { enum.lazy.reject(&block) }
self
end
def flat_map(&block)
@transformations << ->(enum) { enum.lazy.flat_map(&block) }
self
end
def group_by(&block)
@transformations << ->(enum) { enum.lazy.group_by(&block) }
self
end
def sort_by(&block)
@transformations << ->(enum) { enum.lazy.sort_by(&block) }
self
end
def apply_to(enum)
@transformations.reduce(enum.lazy) do |current, transformation|
transformation.call(current)
end
end
end
# Specific data transformers for common Jekyll tasks
class ContentEnhancer
def initialize(site)
@site = site
end
def enhance_documents(documents)
TransformationBuilder.new
.map { |doc| add_reading_metrics(doc) }
.map { |doc| add_related_content(doc) }
.map { |doc| add_seo_data(doc) }
.apply_to(documents)
end
private
def add_reading_metrics(doc)
doc.data['word_count'] = doc.content.split(/\s+/).size
doc.data['reading_time'] = (doc.data['word_count'] / 200.0).ceil
doc.data['complexity_score'] = calculate_complexity(doc.content)
doc
end
def add_related_content(doc)
related = find_related_documents(doc)
doc.data['related_content'] = related.take(5).to_a
doc
end
def find_related_documents(doc)
@site.documents.lazy
.reject { |other| other.id == doc.id }
.sort_by { |other| calculate_similarity(doc, other) }
.reverse
end
def calculate_similarity(doc1, doc2)
# Simple content-based similarity
words1 = doc1.content.downcase.split(/\W+/).uniq
words2 = doc2.content.downcase.split(/\W+/).uniq
common_words = words1 & words2
total_words = words1 | words2
common_words.size.to_f / total_words.size
end
end
class DataNormalizer
def normalize_collection(collection)
TransformationBuilder.new
.map { |doc| normalize_document(doc) }
.select { |doc| doc.data['published'] != false }
.map { |doc| add_default_values(doc) }
.apply_to(collection.docs)
end
private
def normalize_document(doc)
# Normalize common data fields
doc.data['title'] = doc.data['title'].to_s.strip
doc.data['date'] = parse_date(doc.data['date'])
doc.data['tags'] = Array(doc.data['tags']).map(&:to_s).map(&:strip)
doc.data['categories'] = Array(doc.data['categories']).map(&:to_s).map(&:strip)
doc
end
def add_default_values(doc)
doc.data['layout'] ||= 'default'
doc.data['author'] ||= 'Unknown'
doc.data['excerpt'] ||= generate_excerpt(doc.content)
doc
end
end
# Jekyll generator using advanced data transformation
class DataTransformationGenerator < Generator
def generate(site)
@site = site
@enhancer = ContentEnhancer.new(site)
@normalizer = DataNormalizer.new
transform_site_data
enhance_content_collections
generate_derived_data
end
private
def transform_site_data
# Transform site data using advanced enumerable techniques
@site.data.transform_values! do |value|
if value.is_a?(Array)
process_array_data(value)
elsif value.is_a?(Hash)
process_hash_data(value)
else
value
end
end
end
def process_array_data(array)
array.lazy
.map { |item| deep_transform_values(item) }
.select { |item| filter_data_item(item) }
.to_a
end
def process_hash_data(hash)
hash.transform_values { |v| deep_transform_values(v) }
.select { |k, v| filter_data_item(v) }
end
def deep_transform_values(obj)
case obj
when Array
obj.map { |item| deep_transform_values(item) }
when Hash
obj.transform_values { |v| deep_transform_values(v) }
when String
obj.strip
else
obj
end
end
def enhance_content_collections
@site.collections.each do |name, collection|
next if collection.docs.empty?
# Apply transformations to collection
enhanced_docs = @enhancer.enhance_documents(collection.docs)
normalized_docs = @normalizer.normalize_collection(collection)
# Update collection with transformed docs
collection.docs.replace(normalized_docs.to_a)
end
end
end
end
end
These high-performance Ruby data processing techniques transform Jekyll's capabilities for handling large datasets and complex transformations. By leveraging parallel processing, streaming data, and advanced enumerable patterns, you can build Jekyll sites that process millions of data points efficiently while maintaining the simplicity and reliability of static site generation.