Ozan Sazak | sazak.io
sazak.io
home blog whoami

Cloud Native Patterns Illustrated: Fan-in and Fan-out

I've started reading technical books after my graduation from CS to broaden my knowledge on many topics, such as system design, low-level CS, cloud native, etc. Nowadays, I'm reading Cloud Native Go by Matthew Titmus, which gives a smooth introduction to the cloud native world using the benefits of Go. The book starts with Go basics and cloud native patterns, and that's where I thought "I should write about these patterns!".

In this series of blog posts, I will write about the cloud native patterns I've learnt – also with illustrations using Manim.

The Problem

Let's say we have a constant data source (such as a Go channel), and we need to process the data and put the processed data into a destination channel with a latency as low as possible.

In the general case, we have a processor function which is in the middle of source and destinations, which processes the data packets (packet here is an abstraction for simplicity) one by one.

However, every processing function have latency. This may be because of network or CPU usage, blocking syscalls, etc. If we send enough packets per second to the processor function, voilà! We now have a bottleneck!

Solution

There's a fairly simple solution to this problem: using multiple processors inside the data pipeline. This way, we can process the data stream concurrently, which drops the overall latency and decreases the pipeline congestion.

That's where Fan-in and Fan-out comes in…

We can implement this solution by utilizing shared memory, such as message queues.
With this approach, we will split the incoming data packets into different input queues. Then, each queue's own processor will take the packets one by one, process the data, and put the processed data into its related output queue. Finally, the destination (another processor, queue, or some other system) will take the processed data packets from each of the output queues, and aggregate them into a single data source.

The first approach, splitting the data source (input) to multiple data sources (input queues) is called the Fan-out pattern. The second one, aggregating multiple data sources (output queues) into a single data source (destination) is called the Fan-in pattern.

For simplicity, we've specified one input queue and one output queue for each processor in our pipeline. We could use multiple input/output queues per processor, or a shared queue between a few processors, depending on the overall system requirements.

Go Implementation

Let's get our hands dirty with some concurrent Go! First, let's define our data sources:

func stringGenerator(prefix string, count uint) <-chan string {
    ch := make(chan string)

    go func() {
        defer close(ch)
        for i := uint(0); i < count; i++ {
            ch <- fmt.Sprintf("%s-%d", prefix, i)
        }
    }()

    return ch
}

The function stringGenerator creates a receive-only string channel, creates a goroutine that puts prefixed strings into the channel, and returns the channel. We read from this channel later in the Fan-out code.

Our processor function will be fairly simple too:

func stringProcessor(id int, source <-chan string) <-chan string {
    ch := make(chan string)

    go func() {
        defer close(ch)
        for data := range source {
            ch <- fmt.Sprintf("P-%d processed message: %s", id, data)
            millisecs := rand.Intn(901) + 100
            time.Sleep(time.Duration(millisecs * int(time.Millisecond)))
        }
    }()

    return ch
}

Inside the processor function, we will wait for a random while to simulate the processor latency.

Fan-out Implementation

In the Fan-out implementation, we will take a receive-only channel, and return a slice of receive-only channels:

// Splitter implements the Fan-out pattern
func Splitter(source <-chan string, numDests uint) []<-chan string {
    destinations := make([]<-chan string, 0, numDests)

    for i := uint(0); i < numDests; i++ {
        dest := make(chan string)

        go func() {
            defer close(dest)
            for data := range source {
                dest <- data
            }
        }()

        destinations = append(destinations, dest)
    }

    return destinations
}

Goroutines created inside the Splitter function will handle the data routing logic. Notice that inside the goroutines, we used a single range statement to receive from the source channel:

for data := range source {
    dest <- data
}

This means each goroutine will try to read from the channel in the loop, and the first one to do the read will receive the next item. In other words, each goroutine will fight over the next data instance.

We could use a centralized solution for Fan-out instead of competing goroutines. In that case, we would define a master process to distribute each incoming data instance (string in our case) among all the output channels.

Fan-in Implementation

In Fan-in, we will basically do the reverse of Fan-out, with a few differences:

// Aggregator implements the Fan-in pattern
func Aggregator(sources []<-chan string) <-chan string {
    destination := make(chan string)

    var wg sync.WaitGroup
    wg.Add(len(sources))

    for _, source := range sources {
        go func(src <-chan string) {
            defer wg.Done()
            for data := range src {
                destination <- data
            }
        }(source)
    }
    go func() {
        wg.Wait()
        close(destination)
    }()

    return destination
}

The Aggregator function takes a slice of receive-only input sources and returns a single receive-only output channel. Inside, we created a goroutine for each input source, which reads from the source continuously and populates the output channel (destination) with the data it received.

Notice that we've used a sync.WaitGroup to wait for the aggregator goroutines to finish. After an input source (channel) is closed, the for loop inside the respective goroutine will end, and it will finish its job.

After all input sources are closed, we're done with aggregating, and we now can close the destination channel. This is a crucial step, and if we don't close the channel we have created, the Go runtime will complain with a fatal error:

fatal error: all goroutines are asleep - deadlock!

main()

Putting all of our functions together, we're ready to run our code:

const (
    processorCount = 10
)

func main() {
    splitSources := Splitter(stringGenerator("test", 100), processorCount)
    procSources := make([]<-chan string, 0, processorCount)

    for i, src := range splitSources {
        procSources = append(procSources, stringProcessor(i, src))
    }

    for out := range Aggregator(procSources) {
        fmt.Println(out)
    }
}

tl;dr

Thank you for reading!