Deep dive on Go Channel and a sneak peak at LMAX Disruptor
Unmasking the Inner Workings of Golang Channels
At their core, Golang channels facilitate safe and efficient communication between concurrently running goroutines. To understand their power, it's essential to examine their internal structure and the mechanics of send and receive operations.
The Underlying Data Structure
1type hchan struct {
2 qcount uint // total data in the queue
3 dataqsiz uint // size of the circular queue
4 buf unsafe.Pointer // points to an array of dataqsiz elements
5 elemsize uint16
6 synctest bool // true if created in a synctest bubble
7 closed uint32
8 timer *timer // timer feeding this chan
9 elemtype *_type // element type
10 sendx uint // send index
11 recvx uint // receive index
12 recvq waitq // list of recv waiters
13 sendq waitq // list of send waiters
14
15 // lock protects all fields in hchan, as well as several
16 // fields in sudogs blocked on this channel.
17 //
18 // Do not change another G's status while holding this lock
19 // (in particular, do not ready a G), as this can deadlock
20 // with stack shrinking.
21 lock mutex
22}
Internally, a Golang channel is represented by the hchan
struct. This structure incorporates several key fields that manage the channel's state and behavior. A fundamental component is a circular queue, or ring buffer, which serves as the storage for the data being transmitted through the channel. The qcount
field tracks the current number of elements within this queue, while dataqsiz
defining the maximum capacity of the queue, determining whether the channel is buffered or unbuffered. The buf
field is a pointer to the actual underlying circular buffer in memory. To manage the flow of data, sendx
and recvx
are indices that point to the next available positions for send and receive operations, respectively.
Despite their role in facilitating communication between goroutines, the internal operations of a channel rely on mutual exclusion to ensure data integrity. This is achieved through the lock
field, a mutex that protects the channel's internal state from race conditions. Additionally, the sendq
and recvq
fields are wait queues that hold goroutines that are blocked trying to send to a full channel or receive from an empty channel, respectively. Finally, the closed
field is a flag that indicates whether the channel has been closed.
The distinction between buffered and unbuffered channels lies in the value of dataqsiz
. Buffered channels have a capacity greater than zero, allowing a certain number of elements to be stored in the queue. This means that a send operation on a buffered channel might not immediately block if there is space available in the buffer. Conversely, unbuffered channels have a capacity of zero. In this case, a send operation will only proceed if there is a corresponding receive operation ready to accept the data, highlighting the synchronous nature of communication through unbuffered channels.
The Send Operation
When a goroutine attempts to send a value to a channel using the ch <- value
syntax, a sequence of steps is initiated. First, the goroutine acquires the channel's mutex to ensure exclusive access to its internal state. It then checks if the channel has been closed. Sending to a closed channel will result in a panic, signaling an invalid operation.
If the channel is open, the operation proceeds based on whether there are any waiting receivers. If the recvq
is not empty, it means that a goroutine is already waiting to receive a value from the channel. In this scenario, the sending goroutine dequeues a receiver from recvq
, directly transfers the value to the receiver, and wakes up the receiving goroutine. After this direct handoff, the sender releases the mutex. This direct transfer mechanism in unbuffered channels, or when a buffered channel is full and a receiver is waiting, showcases the synchronous nature of this communication, ensuring both sender and receiver are actively involved in the exchange.
If there are no waiting receivers, the behavior depends on whether the channel is buffered. For buffered channels, if the buffer is not full, the sending goroutine copies the value into the buffer at the index indicated by sendx
, increments qcount
(the number of elements in the buffer), updates sendx
to point to the next available slot (wrapping around if necessary due to the circular nature of the buffer), and then releases the mutex.
However, if there are no waiting receivers and the buffer is full (for a buffered channel) or if it's an unbuffered channel (where the buffer size is always zero), the sending goroutine cannot proceed immediately. In this case, the sending goroutine is added to the sendq
(the wait queue for senders) and goes to sleep, becoming blocked until a receiver becomes available. While the sending goroutine is waiting, it releases the channel's mutex, allowing other goroutines to potentially perform operations on the channel.
The Receive Operation
The receive operation, using the value := <-ch
syntax, follows a similar process. The receiving goroutine first acquires the channel's mutex. It then checks if the channel is closed and the buffer is empty. If both conditions are true, it means there are no more values to receive, and the receive operation will return the zero value of the channel's type along with a false
boolean value (often used in a comma-ok idiom to check if the channel is closed).
If the channel is not closed or the buffer is not empty, the receiving goroutine checks for waiting senders in the sendq
. If sendq
is not empty, it means a sender is blocked waiting to send a value. The receiving goroutine dequeues a sender, receives the value directly from it, and wakes up the sender. If the buffer of the channel is not full, the woken sender will then place its value into the buffer. After this direct value transfer, the receiver releases the mutex. This interaction between senders and receivers through the wait queues and the Go scheduler is crucial for the efficient handling of blocked goroutines, preventing busy-waiting and allowing the CPU to be utilized by other runnable goroutines.
If there are no waiting senders, the behavior depends on whether the channel is buffered. For buffered channels, if the buffer is not empty, the receiving goroutine retrieves the value from the buffer at the index indicated by recvx
, decrements qcount
, updates recvx
to point to the next element (wrapping around if necessary), and releases the mutex.
If there are no waiting senders and the buffer is empty (for a buffered channel) or if it's an unbuffered channel (which always has an empty buffer when no senders are waiting), the receiving goroutine cannot proceed. It is added to the recvq
(the wait queue for receivers) and goes to sleep until a value is sent to the channel. The mutex is released while the goroutine is waiting.
Closing Channels
Channels can be closed using the built-in close(ch)
function. It's important to note that only the sender of a channel should typically close it, and it's not always necessary to do so. Closing a channel signals to receivers that no more values will be sent on that channel.
The process of closing a channel involves acquiring the channel's mutex, setting the closed
flag to true, and then waking up all goroutines waiting in both the sendq
and recvq
. Waiting senders will panic when they attempt to send to a closed channel, providing a clear indication that the communication link has been terminated. Waiting receivers, on the other hand, will receive the zero value of the channel's type. This defined behavior provides a consistent way for goroutines to handle the termination of communication.
Zero-Sized Channels
Unbuffered channels, also known as zero-sized channels, have a capacity of zero. This means that a send operation on an unbuffered channel will block until a corresponding receive operation is ready, and vice versa. They act as a rendezvous point, requiring both a sender and a receiver to be ready simultaneously for communication to occur. This characteristic makes them particularly useful for direct synchronization and signaling between goroutines, ensuring that one goroutine proceeds only after another has reached a certain point.
Interaction with the Go Scheduler
The efficient management of goroutines blocked on channel operations is handled by the Go scheduler. The scheduler is aware of goroutines waiting in the sendq
and recvq
of channels. When a channel operation can proceed (e.g., a value is sent to an empty channel or a receiver becomes available for a full channel), the scheduler wakes up the appropriate waiting goroutine, allowing it to resume execution. This prevents goroutines from busy-waiting, which would waste CPU resources, and ensures that the CPU is efficiently utilized by other runnable goroutines.
Practical Code Examples
To solidify the understanding of Golang channels, let's explore several practical code examples that demonstrate their usage in various concurrency scenarios.
Producer-Consumer Pattern
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8// Producer function
9func producer(ch chan<- int, count int) {
10 for i := 0; i < count; i++ {
11 ch <- i
12 fmt.Println("Produced:", i)
13 time.Sleep(time.Millisecond * 100)
14 }
15 close(ch)
16}
17
18// Consumer function
19func consumer(ch <-chan int, id int) {
20 for val := range ch {
21 fmt.Println("Consumer", id, "received:", val)
22 time.Sleep(time.Millisecond * 200)
23 }
24 fmt.Println("Consumer", id, "finished")
25}
26
27func main() {
28 // Create a buffered channel
29 ch := make(chan int, 5)
30
31 // Start multiple producer and consumer goroutines
32 go producer(ch, 10)
33 go consumer(ch, 1)
34 go consumer(ch, 2)
35
36 // Wait for a while to see the output
37 time.Sleep(time.Second * 5)
38 fmt.Println("Main function finished")
39}
This example demonstrates the classic producer-consumer pattern using a buffered channel. One producer goroutine generates values and sends them to the channel, while multiple consumer goroutines receive and process these values. The buffered channel allows the producer to continue generating values even if the consumers are temporarily busy.
Worker Pool
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8// Task to be processed
9type Task struct {
10 ID int
11}
12
13// Worker function
14func worker(id int, jobs <-chan Task, results chan<- int) {
15 for j := range jobs {
16 fmt.Println("Worker", id, "started job", j.ID)
17 time.Sleep(time.Millisecond * 500) // Simulate work
18 results <- j.ID * 2
19 fmt.Println("Worker", id, "finished job", j.ID)
20 }
21}
22
23func main() {
24 const numJobs = 5
25 const numWorkers = 3
26
27 jobs := make(chan Task, numJobs)
28 results := make(chan int, numJobs)
29
30 // Start worker goroutines
31 for w := 1; w <= numWorkers; w++ {
32 go worker(w, jobs, results)
33 }
34
35 // Send jobs to the workers
36 for j := 1; j <= numJobs; j++ {
37 jobs <- Task{ID: j}
38 }
39 close(jobs) // Signal that no more jobs will be sent
40
41 // Collect the results
42 for a := 1; a <= numJobs; a++ {
43 result := <-results
44 fmt.Println("Result:", result)
45 }
46 close(results)
47}
This example showcases a worker pool implementation using channels. A fixed number of worker goroutines are started, each listening on a jobs
channel for tasks. The main goroutine sends tasks to this channel, and the workers process them, sending the results to a results
channel. This pattern is useful for limiting the number of concurrent operations and managing resources efficiently.
Alternative
Now that we understand how the Golang channel works under the hood, let's examine another approach for controlling concurrency using a Ring Buffer: LMAX Disruptor.
The LMAX Disruptor pattern is engineered for high-performance inter-thread messaging, emphasizing low latency and high throughput in concurrent applications.
The LMAX Disruptor architecture comprises several core components. The RingBuffer is a pre-sized, circular array that holds the events being passed between producers and consumers.BatchEventProcessor
, which is designed to efficiently process events in batches, potentially improving throughput, especially for operations that involve I/O.
Disruptor offers several key performance advantages. Its lock-free algorithms and the principle of having a single writer for each slot in the ring buffer at any given time significantly reduce contention, leading to better scalability.
Disruptor typically involves pre-allocating the memory for the events within the ring buffer. This reduces the need for dynamic memory allocation during the runtime processing of events, which in turn minimizes the overhead associated with garbage collection, a process that can introduce pauses and negatively impact latency-sensitive applications.BatchEventProcessor
to handle events in batches can further improve throughput for certain types of workloads, especially those involving I/O operations, by allowing for more efficient processing of multiple events together.
While the original LMAX Disruptor was developed in Java, several Go ports have emerged, such as github.com/smartystreets/go-disruptor
.
Benchmark
Let's construct a small benchmark to compare both approaches in terms of performance. There are three setups we can investigate further:
- Single producer, single consumer
- Single producer, multiple consumers
- Multiple producers, multiple consumers
Set up const:
1const (
2 // Constants for benchmark configuration
3 bufferSize = 1024 * 16 // Must be a power of 2 for the disruptor
4 iterations = 10000 // Number of messages to send (reduced to avoid timeout)
5 ringBufferMask = bufferSize - 1
6 reserveOne = 1
7)
Channel implementations:
1// BenchmarkChannel tests the performance of Go's native channels with a single producer and consumer
2func BenchmarkChannel(b *testing.B) {
3 b.ReportAllocs()
4
5 for n := 0; n < b.N; n++ {
6 // Reset for each iteration
7 b.StopTimer()
8 channel := make(chan int64, bufferSize)
9 done := make(chan struct{})
10
11 // Start consumer
12 go func() {
13 var received int64
14 for i := int64(0); i < iterations; i++ {
15 received = <-channel
16 // Ensure the compiler doesn't optimize away the receive
17 if received < 0 {
18 b.Fail()
19 }
20 }
21 close(done)
22 }()
23
24 b.StartTimer()
25
26 // Producer
27 for i := int64(0); i < iterations; i++ {
28 channel <- i
29 }
30
31 // Wait for consumer to finish
32 <-done
33 }
34}
35
36// BenchmarkChannelOneProducerMultipleConsumers tests the performance of Go's native channels with one producer and multiple consumers
37func BenchmarkChannelOneProducerMultipleConsumers(b *testing.B) {
38 b.ReportAllocs()
39
40 const numConsumers = 4 // Number of consumers
41
42 for n := 0; n < b.N; n++ {
43 // Reset for each iteration
44 b.StopTimer()
45
46 channel := make(chan int64, bufferSize)
47 done := make(chan struct{}, numConsumers) // Channel to signal when consumers are done
48
49 // Calculate how many messages each consumer should process
50 messagesPerConsumer := iterations / numConsumers
51
52 // Start multiple consumers
53 for c := 0; c < numConsumers; c++ {
54 go func(consumerID int) {
55 startMsg := int64(consumerID * messagesPerConsumer)
56 endMsg := startMsg + int64(messagesPerConsumer)
57
58 // Each consumer processes its portion of messages
59 for i := startMsg; i < endMsg; i++ {
60 received := <-channel
61 // Ensure the compiler doesn't optimize away the receive
62 if received < 0 {
63 b.Fail()
64 }
65 }
66
67 done <- struct{}{} // Signal that this consumer is done
68 }(c)
69 }
70
71 b.StartTimer()
72
73 // Producer sends all messages
74 for i := int64(0); i < iterations; i++ {
75 channel <- i
76 }
77
78 // Wait for all consumers to finish
79 for i := 0; i < numConsumers; i++ {
80 <-done
81 }
82 }
83}
84
85// BenchmarkChannelMultipleProducersMultipleConsumers tests the performance of Go's native channels with multiple producers and consumers
86func BenchmarkChannelMultipleProducersMultipleConsumers(b *testing.B) {
87 b.ReportAllocs()
88
89 const numProducers = 4 // Number of producers
90 const numConsumers = 4 // Number of consumers
91 const messagesPerProducer = iterations / numProducers
92 const messagesPerConsumer = iterations / numConsumers
93
94 for n := 0; n < b.N; n++ {
95 // Reset for each iteration
96 b.StopTimer()
97
98 channel := make(chan int64, bufferSize)
99 producersDone := make(chan struct{}, numProducers) // Channel to signal when producers are done
100 consumersDone := make(chan struct{}, numConsumers) // Channel to signal when consumers are done
101
102 // Start multiple consumers
103 for c := 0; c < numConsumers; c++ {
104 go func(consumerID int) {
105 // Each consumer processes its portion of messages
106 for i := int64(0); i < messagesPerConsumer; i++ {
107 received := <-channel
108 // Ensure the compiler doesn't optimize away the receive
109 if received < 0 {
110 b.Fail()
111 }
112 }
113
114 consumersDone <- struct{}{} // Signal that this consumer is done
115 }(c)
116 }
117
118 b.StartTimer()
119
120 // Start multiple producers
121 for p := 0; p < numProducers; p++ {
122 go func(producerID int) {
123 startMsg := int64(producerID * messagesPerProducer)
124 endMsg := startMsg + int64(messagesPerProducer)
125
126 // Each producer sends its portion of messages
127 for i := startMsg; i < endMsg; i++ {
128 channel <- i
129 }
130
131 producersDone <- struct{}{} // Signal that this producer is done
132 }(p)
133 }
134
135 // Wait for all producers to finish
136 for i := 0; i < numProducers; i++ {
137 <-producersDone
138 }
139
140 // Wait for all consumers to finish
141 for i := 0; i < numConsumers; i++ {
142 <-consumersDone
143 }
144 }
145}
146
LMAX disruptor implementations (github.com/smartystreets-prototypes/go-disruptor):
1// BenchmarkDisruptor tests the performance of LMAX Disruptor with a single producer and consumer
2func BenchmarkDisruptor(b *testing.B) {
3 b.ReportAllocs()
4
5 for n := 0; n < b.N; n++ {
6 // Reset for each iteration
7 b.StopTimer()
8
9 // Create a new ring buffer
10 ringBuffer := [bufferSize]int64{}
11
12 // Create a consumer that will read from the ring buffer
13 consumer := &disruptorConsumer{ringBuffer: ringBuffer[:]}
14
15 // Create the disruptor with the consumer
16 d := disruptor.New(
17 disruptor.WithCapacity(int64(bufferSize)),
18 disruptor.WithConsumerGroup(consumer),
19 )
20
21 b.StartTimer()
22
23 // Producer loop - use d directly as a Writer
24 for sequence := int64(0); sequence < iterations; {
25 // Reserve a slot in the ring buffer
26 sequence = d.Reserve(reserveOne)
27
28 // Write to the ring buffer
29 ringBuffer[sequence&ringBufferMask] = sequence
30
31 // Publish the sequence
32 d.Commit(sequence, sequence)
33 }
34
35 // Cleanup
36 d.Close()
37 }
38}
39
40// BenchmarkDisruptorOneProducerMultipleConsumers tests the performance of LMAX Disruptor with one producer and multiple consumers
41func BenchmarkDisruptorOneProducerMultipleConsumers(b *testing.B) {
42 b.ReportAllocs()
43
44 const numConsumers = 4 // Number of consumers
45
46 for n := 0; n < b.N; n++ {
47 // Reset for each iteration
48 b.StopTimer()
49
50 // Create a new ring buffer
51 ringBuffer := [bufferSize]int64{}
52
53 // Create multiple consumers
54 consumers := make([]disruptor.Consumer, numConsumers)
55 for i := 0; i < numConsumers; i++ {
56 consumers[i] = &disruptorConsumer{
57 ringBuffer: ringBuffer[:],
58 id: i, // Add an ID to identify the consumer
59 }
60 }
61
62 // Create the disruptor with multiple consumers
63 d := disruptor.New(
64 disruptor.WithCapacity(int64(bufferSize)),
65 disruptor.WithConsumerGroup(consumers...),
66 )
67
68 b.StartTimer()
69
70 // Producer loop - use d directly as a Writer
71 for sequence := int64(0); sequence < iterations; {
72 // Reserve a slot in the ring buffer
73 sequence = d.Reserve(reserveOne)
74
75 // Write to the ring buffer
76 ringBuffer[sequence&ringBufferMask] = sequence
77
78 // Publish the sequence
79 d.Commit(sequence, sequence)
80 }
81
82 // Cleanup
83 d.Close()
84 }
85}
86
87// BenchmarkDisruptorMultipleProducersMultipleConsumers tests the performance of LMAX Disruptor with multiple producers and consumers
88func BenchmarkDisruptorMultipleProducersMultipleConsumers(b *testing.B) {
89 b.ReportAllocs()
90
91 const numProducers = 4 // Number of producers
92 const numConsumers = 4 // Number of consumers
93 const iterationsPerProducer = iterations / numProducers
94
95 for n := 0; n < b.N; n++ {
96 // Reset for each iteration
97 b.StopTimer()
98
99 // Create a new ring buffer
100 ringBuffer := [bufferSize]int64{}
101
102 // Create multiple consumers
103 consumers := make([]disruptor.Consumer, numConsumers)
104 for i := 0; i < numConsumers; i++ {
105 consumers[i] = &disruptorConsumer{
106 ringBuffer: ringBuffer[:],
107 id: i, // Add an ID to identify the consumer
108 }
109 }
110
111 // Create the disruptor with multiple consumers
112 d := disruptor.New(
113 disruptor.WithCapacity(int64(bufferSize)),
114 disruptor.WithConsumerGroup(consumers...),
115 )
116
117 // Create a channel to signal when all producers are done
118 done := make(chan struct{})
119
120 b.StartTimer()
121
122 // Start multiple producers
123 for p := 0; p < numProducers; p++ {
124 go func(producerID int) {
125 // Each producer sends a portion of the total messages
126 for i := int64(0); i < iterationsPerProducer; {
127 // Reserve a slot in the ring buffer
128 sequence := d.Reserve(reserveOne)
129
130 // Write to the ring buffer
131 ringBuffer[sequence&ringBufferMask] = sequence
132
133 // Publish the sequence
134 d.Commit(sequence, sequence)
135
136 i++
137 }
138
139 // Signal that this producer is done
140 if producerID == numProducers-1 {
141 close(done)
142 }
143 }(p)
144 }
145
146 // Wait for all producers to finish
147 <-done
148
149 // Cleanup
150 d.Close()
151 }
152}
153
154// disruptorConsumer implements the disruptor.Consumer interface
155type disruptorConsumer struct {
156 ringBuffer []int64
157 received int64
158 id int // Consumer ID for multiple consumer scenarios
159}
160
161// Consume reads from the ring buffer
162func (c *disruptorConsumer) Consume(lower, upper int64) {
163 for sequence := lower; sequence <= upper; sequence++ {
164 // Read from the ring buffer
165 message := c.ringBuffer[sequence&ringBufferMask]
166
167 // Ensure the compiler doesn't optimize away the receive
168 if message < 0 {
169 panic("Unexpected negative value")
170 }
171
172 c.received++
173 }
174}
I'm running on an Mac M1 Pro with GOMAXPROCS set to 10 and this is the result:
BenchmarkChannel-10 | 2862 | 353652 ns/op | 0 B/op | 0 allocs/op |
BenchmarkChannelOneProducerMultipleConsumers-10 | 2598 | 450837 ns/op | 22 B/op | 0 allocs/op |
BenchmarkChannelMultipleProducersMultipleConsumers-10 | 2649 | 444455 ns/op | 236 B/op | 0 allocs/op |
BenchmarkDisruptor-10 | 45792 | 25992 ns/op | 0 B/op | 0 allocs/op |
BenchmarkDisruptorOneProducerMultipleConsumers-10 | 46298 | 26152 ns/op | 1 B/op | 0 allocs/op |
BenchmarkDisruptorMultipleProducersMultipleConsumers-10 | 66801 | 18271 ns/op | 352 B/op | 0 allocs/op |
Based on existing research, benchmark and the architectural differences between Go channels and the LMAX Disruptor, we can anticipate certain performance trends. The Disruptor, with its lock-free design and optimized memory access patterns, is generally expected to exhibit significantly higher throughput and lower latency, particularly in scenarios involving high contention.
Go channels, relying on mutexes for synchronization, might experience performance limitations under high contention as goroutines compete for the channel lock.
Use Case Recommendations
Go channels are the preferred choice for concurrency in the vast majority of concurrent programming tasks in Go, especially when extreme low latency is not a strict requirement.select
statement is used extensively to multiplex operations across multiple communication channels, a feature that the Disruptor does not natively support.
However, the LMAX Disruptor should be seriously considered in specific scenarios where significant performance advantages are needed. These scenarios typically involve ultra-low latency systems where response times are measured in microseconds or nanoseconds, such as high-frequency trading platforms, real-time financial risk management systems, or high-performance network packet processing within user-space applications.
There are important trade-offs to consider when deciding between Go channels and the LMAX Disruptor. The Disruptor generally presents a more complex API and requires a deeper understanding of its internal workings compared to the relative simplicity of Go channels.