Skip to main content
Bits & Bytes

Implementing a Write-Ahead Log (WAL) in Go

Introduction

I love digging into how our everyday tools work under the hood! As a software engineer, I find it incredibly rewarding to peek behind the curtain. It builds my understanding of tools and patterns like nothing else could. Let me show you how to build a Write-Ahead Log (WAL) step-by-step - it's more than just code, it's a gateway to understanding the magic that powers our tools.

Think about the tools you use every day: databases, version control systems, message queues. They're all using similar patterns behind the scenes. A WAL is a fundamental building block for all of these tools. When you build this component yourself, you'll:

While our WAL implementation isn't as complex as production systems, it will teach you the core principles that keep your data safe when things go wrong. You'll see firsthand how theoretical concepts transform into robust, working code.

Let me explain how a WAL keeps your data safe: it acts like a safety net for your storage systems. Before making any changes to your database, you first write them to a sequential log. If your system crashes, no problem! Just replay the log, and you'll recover your data perfectly.

Here's a real example: imagine you're building a key-value store. When someone saves a value, you first write it to your WAL. Even if your application crashes right after, you're covered - just restart, read the WAL, and rebuild your database state. Simple!

I've built this WAL implementation to be:

The Role of Atomic Operations

Throughout this WAL implementation, atomic operations play a crucial role in ensuring thread safety and data consistency without excessive locking. These operations are essential for maintaining the WAL's integrity in a concurrent environment. Let's examine the key uses of atomic operations:

Position Tracking

The segment.position field uses atomic operations to safely track write positions:

currentPos := atomic.LoadInt64(&segment.position)
if !atomic.CompareAndSwapInt64(&segment.position, currentPos, newPos) {
    continue
}

This atomic Compare-and-Swap (CAS) operation ensures that:

Segment ID Management

The nextSegmentID field uses atomic operations to generate unique segment IDs:

newID := atomic.AddInt64(&w.nextSegmentID, 1)

This ensures that:

Why Atomic Operations Matter

The use of atomic operations provides several critical benefits:

While the WAL still uses mutex locks for complex operations (like file I/O), atomic operations handle the high-contention scenarios where multiple goroutines frequently access the same values. This hybrid approach balances safety with performance.

Typical Client Usage

Here are the key operations you'll use when working with this WAL implementation. These patterns cover the entire lifecycle of WAL usage, from initialization through normal operations to proper shutdown.

On startup:

w, err := NewWAL() // Open or create a WAL, which recovers previous state

On updates:

AppendRecord() // Call for each PUT or DELETE operation

On crash recovery:

ReadAll() // Replay all records to rebuild in-memory database

On shutdown:

Close() // Release resources and ensure data is safely on disk

Constants and Types

The WAL implementation uses several key types and constants to represent its data structures and operations. These types provide the foundation for storing and managing records, tracking positions, and maintaining data integrity across segments.

Let's examine each core type and constant, along with its purpose and usage patterns:

Record Types

These define the fundamental operations that can be stored in the WAL.

const (
    PUT    byte = 1
    DELETE byte = 2
)

As a client, choose one of these types to indicate the kind of change you're recording in the WAL. Later, when you “replay” the log for recovery, you can build the data structure such as a B-tree by applying each operation in turn.

The WAL Struct

The WAL struct serves as the central coordinator for all write-ahead logging operations. It manages segments, handles concurrent access, and provides the interface through which clients interact with the log. This struct encapsulates both the state and behavior needed for reliable, persistent storage of records.

Key responsibilities of the WAL struct include:

The main entry point for client interactions:

type WAL struct {
    mu            sync.RWMutex
    dir           string
    activeSegment *segment
    segments      []*segment
    segmentSize   int64
    nextSegmentID int64
}

Client usage example:

// Create new or open existing WAL
w, err := NewWAL("wal_dir", 1024*1024)
if err != nil {
    // handle error
}

The Segment Struct

Segments are a critical part of WAL design that solve several important problems:

Think of segments like chapters in a book - they organize the content into digestible pieces while maintaining the overall narrative. Each segment has:

Internal structure representing each physical log file:

type segment struct {
    id       int64
    file     *os.File
    position int64 // atomic
    mu       sync.Mutex
}

Note: Clients don't interact with segments directly. The WAL handles segment management internally.

Position Type

The Position type is crucial for tracking exactly where records are stored in the WAL, enabling:

Identifies the exact location of a record in the WAL:

type Position struct {
    SegmentID int64
    Offset    int64
}

Example usage:

pos, err := w.AppendRecord(PUT, []byte("user:123"), []byte("Alice"))
if err != nil {
    // handle error
}
// pos can be stored for later record lookup

Record Type

The Record type is a fundamental data structure that serves three key purposes:

When writing a record, the WAL calculates its CRC and stores it alongside the data. Later, when reading the record, it recalculates the CRC and compares it to the stored value - if they don't match, we know the data has been corrupted.

Represents the data stored in the WAL:

type Record struct {
    Type     byte
    Key      []byte
    Value    []byte
    CRC      uint32
    Position Position
}

Example of reading a record:

record, err := w.ReadRecord(pos)
if err != nil {
    // handle error
}
fmt.Printf("Read record: Type=%d, Key=%s, Value=%s\\n",
    record.Type, record.Key, record.Value)

Core Functions

The core functions of this WAL implementation provide the essential operations needed for reliable, persistent logging. These functions handle everything from initialization and record management to crash recovery and cleanup. Let's examine each core function in detail, understanding its purpose, implementation, and usage patterns.

Each function is designed with careful attention to concurrency, data integrity, and error handling. We'll see how they work together to provide a robust foundation for building reliable storage systems.

NewWAL

Creates or opens a WAL from a directory:

func NewWAL(dir string, segmentSize int64) (*WAL, error) {
    if err := os.MkdirAll(dir, 0755); err != nil {
        return nil, fmt.Errorf("creating WAL directory: %w", err)
    }

    wal := &WAL{
        dir:         dir,
        segmentSize: segmentSize,
    }

    if err := wal.recoverSegments(); err != nil {
        return nil, fmt.Errorf("recovering segments: %w", err)
    }

    if len(wal.segments) == 0 && wal.activeSegment == nil {
        if err := wal.createNewSegment(); err != nil {
            return nil, fmt.Errorf("creating initial segment: %w", err)
        }
    }

    return wal, nil
}

Client usage:

w, err := NewWAL("mywal", 10*1024*1024) // 10MB segments
if err != nil {
    log.Fatal(err)
}

RecoverSegments

The RecoverSegments function is a critical component that runs during WAL initialization, responsible for rebuilding the WAL's state from existing segment files on disk. It scans the WAL directory, identifies valid segment files, and reconstructs the internal segment structures needed for normal operation.

This function provides crucial durability guarantees by ensuring no data is lost between process restarts. It handles tasks such as:

Internal method that runs during WAL initialization:

func (w *WAL) recoverSegments() error {
    files, err := os.ReadDir(w.dir)
    if err != nil {
        return fmt.Errorf("reading WAL directory: %w", err)
    }

    var maxID int64
    segments := make(map[int64]*segment)

    for _, file := range files {
        if filepath.Ext(file.Name()) != ".wal" {
            continue
        }

        var id int64
        _, err := fmt.Sscanf(file.Name(), "segment-%d.wal", &id)
        if err != nil {
            continue
        }

        path := filepath.Join(w.dir, file.Name())
        f, err := os.OpenFile(path, os.O_RDWR, 0666)
        if err != nil {
            return fmt.Errorf("opening segment file %s: %w", path, err)
        }

        info, err := f.Stat()
        if err != nil {
            f.Close()
            return fmt.Errorf("stating segment file %s: %w", path, err)
        }

        seg := &segment{
            id:   id,
            file: f,
        }
        atomic.StoreInt64(&seg.position, info.Size())
        segments[id] = seg
        if id > maxID {
            maxID = id
        }
    }

    w.segments = make([]*segment, 0, len(segments))
    for i := int64(1); i <= maxID; i++ {
        if seg, exists := segments[i]; exists {
            w.segments = append(w.segments, seg)
        }
    }

    if len(w.segments) > 0 {
        w.activeSegment = w.segments[len(w.segments)-1]
        w.segments = w.segments[:len(w.segments)-1]
        atomic.StoreInt64(&w.nextSegmentID, maxID)
    }

    return nil
}

CreateNewSegment

The CreateNewSegment function is responsible for creating and initializing new log segments when the current active segment reaches its size limit. This function handles the critical task of maintaining continuous write availability while ensuring proper segment file creation and management.

Key responsibilities of CreateNewSegment include:

Internal method for creating new segments when needed:

func (w *WAL) createNewSegment() error {
    newID := atomic.AddInt64(&w.nextSegmentID, 1)
    path := filepath.Join(w.dir, fmt.Sprintf("segment-%d.wal", newID))

    file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
    if err != nil {
        return fmt.Errorf("creating segment file: %w", err)
    }

    newSegment := &segment{
        id:   newID,
        file: file,
    }
    atomic.StoreInt64(&newSegment.position, 0)

    if w.activeSegment != nil {
        w.segments = append(w.segments, w.activeSegment)
    }

    w.activeSegment = newSegment
    return nil
}

AppendRecord

The AppendRecord function is the cornerstone of the WAL implementation, responsible for safely and atomically writing new records to disk. This function ensures durability by using CRC checksums and careful synchronization, while handling concurrent access through a combination of atomic operations and mutex locks. It automatically manages segment rotation when size limits are reached.

Key features of AppendRecord include:

Primary method for storing data:

func (w *WAL) AppendRecord(recordType byte, key, value []byte) (Position, error) {
    if len(key) == 0 {
        return Position{}, errors.New("key cannot be empty")
    }

    headerSize := 13
    recordSize := headerSize + len(key) + len(value)

    for {
        w.mu.RLock()
        segment := w.activeSegment
        w.mu.RUnlock()

        if segment == nil {
            return Position{}, errors.New("no active segment")
        }

        currentPos := atomic.LoadInt64(&segment.position)
        newPos := currentPos + int64(recordSize)

        if newPos > w.segmentSize {
            w.mu.Lock()
            if err := w.createNewSegment(); err != nil {
                w.mu.Unlock()
                return Position{}, fmt.Errorf("creating new segment: %w", err)
            }
            w.mu.Unlock()
            continue
        }

        if !atomic.CompareAndSwapInt64(&segment.position, currentPos, newPos) {
            continue
        }

        buf := make([]byte, recordSize)
        offset := 4 // CRC space

        buf[offset] = recordType
        offset++

        binary.LittleEndian.PutUint32(buf[offset:], uint32(len(key)))
        offset += 4

        binary.LittleEndian.PutUint32(buf[offset:], uint32(len(value)))
        offset += 4

        copy(buf[offset:], key)
        offset += len(key)
        copy(buf[offset:], value)

        crc := crc32.ChecksumIEEE(buf[4:])
        binary.LittleEndian.PutUint32(buf[0:], crc)

        segment.mu.Lock()
        pos := Position{
            SegmentID: segment.id,
            Offset:    currentPos,
        }

        if _, err := segment.file.WriteAt(buf, currentPos); err != nil {
            segment.mu.Unlock()
            return Position{}, fmt.Errorf("writing record: %w", err)
        }

        if err := segment.file.Sync(); err != nil {
            segment.mu.Unlock()
            return Position{}, fmt.Errorf("syncing record: %w", err)
        }

        segment.mu.Unlock()
        return pos, nil
    }
}

Client usage:

pos, err := w.AppendRecord(PUT, []byte("session:abc"), []byte("data123"))
if err != nil {
    // handle error
}

ReadRecord

The ReadRecord function is a critical component that enables random access to records stored in the WAL. It takes a Position parameter (obtained from a previous AppendRecord call) and returns the corresponding Record. This function ensures data integrity by validating CRC checksums and handles concurrent access through appropriate locking mechanisms.

Key aspects of ReadRecord include:

Retrieves a specific record by position:

func (w *WAL) ReadRecord(pos Position) (*Record, error) {
    w.mu.RLock()
    var segment *segment
    if w.activeSegment != nil && w.activeSegment.id == pos.SegmentID {
        segment = w.activeSegment
    } else {
        for _, seg := range w.segments {
            if seg.id == pos.SegmentID {
                segment = seg
                break
            }
        }
    }
    w.mu.RUnlock()

    if segment == nil {
        return nil, fmt.Errorf("segment %d not found", pos.SegmentID)
    }

    segment.mu.Lock()
    defer segment.mu.Unlock()

    headerBuf := make([]byte, 13)
    _, err := segment.file.ReadAt(headerBuf, pos.Offset)
    if err != nil {
        return nil, fmt.Errorf("reading record header: %w", err)
    }

    storedCRC := binary.LittleEndian.Uint32(headerBuf[0:])
    recordType := headerBuf[4]
    keyLen := binary.LittleEndian.Uint32(headerBuf[5:9])
    valueLen := binary.LittleEndian.Uint32(headerBuf[9:13])

    dataBuf := make([]byte, keyLen+valueLen)
    _, err = segment.file.ReadAt(dataBuf, pos.Offset+13)
    if err != nil {
        return nil, fmt.Errorf("reading record data: %w", err)
    }

    fullRecord := make([]byte, 1+4+4+len(dataBuf))
    copy(fullRecord, headerBuf[4:])
    copy(fullRecord[9:], dataBuf)

    computedCRC := crc32.ChecksumIEEE(fullRecord)
    if computedCRC != storedCRC {
        return nil, errors.New("CRC mismatch")
    }

    return &Record{
        Type:  recordType,
        Key:   dataBuf[:keyLen],
        Value: dataBuf[keyLen:],
        CRC:   storedCRC,
        Position: Position{
            SegmentID: pos.SegmentID,
            Offset:    pos.Offset,
        },
    }, nil
}

Close

The Close function ensures proper cleanup and resource management when shutting down the WAL. It's essential for maintaining data integrity by ensuring all files are properly closed and resources are released. This function should be called during graceful shutdowns to prevent file descriptor leaks and ensure all pending writes are completed.

Key responsibilities of Close include:

Properly shuts down the WAL:

func (w *WAL) Close() error {
    w.mu.Lock()
    defer w.mu.Unlock()

    if w.activeSegment != nil {
        if err := w.activeSegment.file.Close(); err != nil {
            return err
        }
    }

    for _, segment := range w.segments {
        if err := segment.file.Close(); err != nil {
            return err
        }
    }

    return nil
}

ReadAll

The ReadAll function is a critical utility for crash recovery and data reconstruction. It sequentially reads all records from all segments, performing CRC validation to ensure data integrity. This function is particularly useful when rebuilding state after a system crash or when migrating data.

Key features of ReadAll include:

Reads all records for crash recovery:

func (w *WAL) ReadAll() ([]*Record, error) {
    w.mu.RLock()
    segments := append([]*segment(nil), w.segments...)
    if w.activeSegment != nil && (len(segments) == 0 || segments[len(segments)-1].id != w.activeSegment.id) {
        segments = append(segments, w.activeSegment)
    }
    w.mu.RUnlock()

    var allRecords []*Record

    for _, seg := range segments {
        seg.mu.Lock()

        currentPos := int64(0)
        for {
            headerBuf := make([]byte, 13)
            n, err := seg.file.ReadAt(headerBuf, currentPos)
            if err != nil {
                if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || n == 0 {
                    break
                }
                seg.mu.Unlock()
                return nil, fmt.Errorf("reading record header: %w", err)
            }

            if n < len(headerBuf) {
                break
            }

            storedCRC := binary.LittleEndian.Uint32(headerBuf[0:])
            recordType := headerBuf[4]
            keyLen := binary.LittleEndian.Uint32(headerBuf[5:9])
            valueLen := binary.LittleEndian.Uint32(headerBuf[9:13])

            dataSize := keyLen + valueLen
            dataBuf := make([]byte, dataSize)
            n, err = seg.file.ReadAt(dataBuf, currentPos+13)
            if err != nil {
                if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || n == 0 {
                    break
                }
                seg.mu.Unlock()
                return nil, fmt.Errorf("reading record data: %w", err)
            }

            if uint32(n) < dataSize {
                break
            }

            fullRecord := make([]byte, 1+4+4+len(dataBuf))
            copy(fullRecord, headerBuf[4:])
            copy(fullRecord[9:], dataBuf)

            computedCRC := crc32.ChecksumIEEE(fullRecord)
            if computedCRC != storedCRC {
                seg.mu.Unlock()
                return nil, errors.New("CRC mismatch")
            }

            record := &Record{
                Type:  recordType,
                Key:   dataBuf[:keyLen],
                Value: dataBuf[keyLen:],
                CRC:   storedCRC,
                Position: Position{
                    SegmentID: seg.id,
                    Offset:    currentPos,
						},
			}
       allRecords = append(allRecords, record)

        currentPos += 13 + int64(dataSize)
    }

    seg.mu.Unlock()
	}

	return allRecords, nil
}

Client usage:

records, err := w.ReadAll()
if err != nil {
    // handle error
}
for _, r := range records {
    // Reapply PUT or DELETE operations to reconstruct state
}

Concurrency, Integrity, and Extensions

The WAL implementation uses several mechanisms to ensure safety and reliability:

Concurrency Safety

Data Integrity

Potential Extensions

Future enhancements could include:

  1. Write Batching
    • Group multiple records into single writes
    • Improve throughput for high-volume scenarios
  2. Compression
    • Compress records before writing
    • Reduce disk space usage
  3. Indexing
    • Build indexes for faster lookups
    • Support time-range queries

Conclusion

This WAL implementation provides a robust foundation for building reliable storage systems. Key features:

By understanding both the internal mechanisms and client usage patterns, you can effectively integrate this WAL into your own applications or extend it to meet specific requirements.

Best Practices

When using this WAL:

  1. Always call Close() during graceful shutdown
  2. Handle CRC mismatch errors appropriately
  3. Use ReadAll() for complete recovery after crashes
  4. Consider batching multiple records in high-throughput scenarios
  5. Monitor segment sizes and rotation frequency

Remember that the WAL is designed to be a durable source of truth for your data operations. Use it accordingly in your architecture.