1. Why Build a Message Queue from Scratch?
Message queues are the unsung heroes of distributed systems—think of them as traffic cops directing data between services. They decouple apps, handle async tasks, and smooth out traffic spikes. NSQ, a lightweight, Go-based queue from Bitly, caught my eye for its simplicity and speed, making it perfect for small-to-medium projects.
So why rebuild it? Two reasons:
- Learning by Doing: Nothing beats rolling up your sleeves to grok how queues tick.
- Customization: Need priority messages or a leaner persistence layer? DIY lets you tweak it your way.
This tutorial walks you through coding an NSQ-inspired message queue in Go. It’s aimed at devs with 1-2 years of Go experience—folks comfy with goroutines and channels but itching to tackle distributed systems. By the end, you’ll have:
- A working queue with core features like message distribution and persistence.
- Hands-on Go concurrency skills.
- Bragging rights for building something badass.
Ready? Let’s dive into NSQ’s magic and get coding.
2. What Makes NSQ Tick?
NSQ’s design is elegantly simple:
- nsqd: The workhorse that handles messages—think producer and consumer hub.
- nsqlookupd: The coordinator for multi-node setups (we’ll keep it single-node for now).
- nsqadmin: A handy UI for peeking at queue stats.
It’s lightweight (no ZooKeeper nonsense like Kafka), mixes in-memory and disk storage, and supports retries and delayed delivery. Perfect for real-time gigs like log processing or task scheduling.
Compared to the big dogs:
- Kafka: Built for massive data streams, but overkill for smaller setups.
- RabbitMQ: Great for complex routing, less so for raw speed.
- NSQ: Goldilocks—simple, fast, and just right for many use cases.
Our goal? Recreate NSQ’s core vibe—Topics, Channels, and message flow—while keeping it lean and mean in Go.
3. Designing Our Queue
Here’s the game plan:
- Producer: Sends messages to a Topic.
- Topic: Collects messages and fans them out to Channels.
- Channel: Feeds messages to Consumers.
- Consumer: Processes messages.
Picture this flow:
Producer -> Topic -> Channel 1 -> Consumer 1
-> Channel 2 -> Consumer 2
Simple, decoupled, and ready for Go’s concurrency magic. Let’s build it step-by-step.
3.1 Topics and Channels
Topics are message hubs; Channels pipe them to Consumers. We’ll use maps and channels to manage them:
package main
import "sync"
// Message is our data packet
type Message struct {
ID string
Content string
}
// Topic holds Channels and messages
type Topic struct {
name string
channels map[string]*Channel
messages chan *Message
mu sync.RWMutex
}
// Channel feeds Consumers
type Channel struct {
name string
consumers []*Consumer
messages chan *Message
}
// Consumer does the work
type Consumer struct {
id string
}
func NewTopic(name string) *Topic {
return &Topic{
name: name,
channels: make(map[string]*Channel),
messages: make(chan *Message, 100), // Buffer for non-blocking
}
}
func (t *Topic) AddChannel(name string) *Channel {
t.mu.Lock()
defer t.mu.Unlock()
ch := &Channel{name: name, messages: make(chan *Message, 100)}
t.channels[name] = ch
return ch
}
func (t *Topic) Broadcast(msg *Message) {
t.messages <- msg
go func() {
for _, ch := range t.channels {
ch.messages <- msg
}
}()
}
Quick Notes:
-
sync.RWMutex
keeps things thread-safe. - Buffered channels avoid blocking—tune that
100
based on your load.
3.2 Persistence
Speed’s great, but we need durability. Let’s log messages to disk:
import (
"encoding/json"
"os"
)
func (t *Topic) PersistMessage(msg *Message) error {
file, err := os.OpenFile("messages.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
data, _ := json.Marshal(msg)
_, err = file.Write(append(data, '\n'))
return err
}
Watch Out: Disk I/O can bottleneck under heavy load—more on that later.
3.3 Consumers and Concurrency
Consumers process messages in goroutines for max throughput:
func (ch *Channel) StartConsumers(wg *sync.WaitGroup) {
for _, consumer := range ch.consumers {
wg.Add(1)
go func(c *Consumer) {
defer wg.Done()
for msg := range ch.messages {
println("Consumer", c.id, "processed", msg.Content)
}
}(consumer)
}
}
Pro Tip: Cap goroutines (e.g., runtime.NumCPU() * 2
) to avoid thrashing.
3.4 Delayed Delivery
Sometimes you need messages to chill before delivery—like a 30-minute order timeout reminder. We’ll use a simple time heap (though a timer wheel would scale better for big loads):
import (
"container/heap"
"time"
)
// DelayedMessage ties a message to a delivery time
type DelayedMessage struct {
msg *Message
execTime time.Time
index int // For heap management
}
// DelayQueue is our heap
type DelayQueue []*DelayedMessage
func (dq DelayQueue) Len() int { return len(dq) }
func (dq DelayQueue) Less(i, j int) bool { return dq[i].execTime.Before(dq[j].execTime) }
func (dq DelayQueue) Swap(i, j int) {
dq[i], dq[j] = dq[j], dq[i]
dq[i].index, dq[j].index = i, j
}
func (dq *DelayQueue) Push(x interface{}) {
item := x.(*DelayedMessage)
item.index = len(*dq)
*dq = append(*dq, item)
}
func (dq *DelayQueue) Pop() interface{} {
old := *dq
n := len(old)
item := old[n-1]
old[n-1] = nil
item.index = -1
*dq = old[0 : n-1]
return item
}
func (t *Topic) DelayMessage(msg *Message, delay time.Duration) {
dq := &DelayQueue{}
heap.Init(dq)
heap.Push(dq, &DelayedMessage{msg: msg, execTime: time.Now().Add(delay)})
go func() {
for dq.Len() > 0 {
item := heap.Pop(dq).(*DelayedMessage)
time.Sleep(time.Until(item.execTime))
t.Broadcast(item.msg)
}
}()
}
Heads-Up: This works for small-scale delays but can bog down with tons of messages. For production, swap the heap for a timer wheel or shard by time ranges.
4. Taking It for a Spin: Real-World Use Cases
Theory’s cool, but let’s see this baby in action. Here are two scenarios I tested—and the gotchas I hit.
4.1 Log Collection
The Setup: Imagine a microservices app where nodes spit out logs nonstop. We need to funnel them to a real-time monitor (say, Prometheus) and an offline analytics store (like Elasticsearch).
How It Works:
-
Producer: Log agents send to a
log_topic
. -
Topic/Channels: Split into
realtime_channel
andoffline_channel
. - Consumers: One reads for monitoring, the other for analytics.
func main() {
var wg sync.WaitGroup
topic := NewTopic("log_topic")
realtimeCh := topic.AddChannel("realtime_channel")
offlineCh := topic.AddChannel("offline_channel")
// Fire up consumers
wg.Add(2)
go realtimeCh.StartConsumers(&wg)
go offlineCh.StartConsumers(&wg)
// Simulate logs
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
msg := &Message{ID: fmt.Sprintf("log_%d", i), Content: "Log entry"}
topic.Broadcast(msg)
topic.PersistMessage(msg)
time.Sleep(10 * time.Millisecond)
}
}()
wg.Wait()
}
Gotcha: Log spikes filled the queue fast. I added a check—if it’s 80% full, spawn more consumers or drop debug logs. Async disk writes (batch every 100ms) also saved my bacon on persistence lag.
4.2 Async Task Processing
The Setup: E-commerce order timeouts—check an order after 30 minutes and nudge the user if it’s unpaid.
How It Works:
-
Producer: Order service sends a delayed message to
order_topic
. -
Consumer: Notification service grabs it from
notification_channel
and fires off an SMS.
func main() {
topic := NewTopic("order_topic")
ch := topic.AddChannel("notification_channel")
msg := &Message{ID: "order_123", Content: "Pay me!"}
topic.DelayMessage(msg, 30*time.Second) // 30s for demo, imagine 30min
var wg sync.WaitGroup
wg.Add(1)
go ch.StartConsumers(&wg)
wg.Wait()
}
Gotcha: Duplicates crept in during restarts. I slapped unique IDs on messages and checked a Redis cache to ensure one-and-done processing. Huge message piles also slowed the heap—sharding by time helped.
5. Making It Sing: Optimization Tips
I threw this at a 4-core, 8GB box:
- No Persistence: 100k messages/sec, 1ms latency (99th percentile 5ms).
- With Persistence: 50k messages/sec, 10ms latency.
Boosters:
- Bigger Buffers: Upped channel sizes to 1000—less blocking, more throughput.
- Async Persistence: Batched disk writes in a goroutine, reclaiming ~20% QPS.
- Zero-Copy: Passed pointers, not copies, during broadcasts.
Pitfall: Too many goroutines tanked performance. I capped them at runtime.NumCPU() * 2
and watched CPU usage like a hawk.
6. Best Practices to Live By
Building this queue taught me some golden rules—stuff that’ll keep your Go projects humming, not just this one.
6.1 Concurrency Done Right
Go’s goroutines and channels are your superpowers—use ‘em wisely:
- Keep It Loose: Producers and Consumers don’t talk directly—Topics and Channels handle the middleman duties. Less coupling, more flexibility.
-
Scale Smart: Too many goroutines? Chaos. Too few? Bottlenecks. I tied consumer counts to
runtime.NumCPU() * 2
and tweaked based on load. -
Shut Down Clean:
sync.WaitGroup
is your friend—makes sure nothing’s left dangling:
func (ch *Channel) StartConsumers(wg *sync.WaitGroup) {
for _, consumer := range ch.consumers {
wg.Add(1)
go func(c *Consumer) {
defer wg.Done()
for msg := range ch.messages {
println("Consumer", c.id, "processed", msg.Content)
}
}(consumer)
}
}
6.2 Handle Errors Like a Pro
Queues crash—plan for it:
- Retries with Backoff: Limit retries (I picked 3) and stretch delays (1s, 2s, 4s) to avoid pileups.
-
Dead Letter Queue: Failed messages? Stash ‘em in a
dead_letter_topic
for later debugging. - Isolate Crashes: Wrap consumer logic in goroutines—one blows up, others keep trucking.
func (c *Consumer) ConsumeWithRetry(ch *Channel) {
for msg := range ch.messages {
for i := 0; i < 3; i++ {
if c.process(msg) == nil {
break
}
time.Sleep(time.Duration(1<<i) * time.Second)
if i == 2 {
deadLetterTopic.Broadcast(msg) // Off to the graveyard
}
}
}
}
6.3 See What’s Happening
You can’t fix what you can’t see:
- Metrics: Track queue length, latency, and success rates—Prometheus loves this stuff.
-
Logs: Structured JSON logs (via
zap
orlogrus
) make grepping a breeze:
logrus.WithFields(logrus.Fields{
"message_id": msg.ID,
"timestamp": time.Now(),
}).Info("Message processed")
- Alerts: Ping Slack if the queue’s 80% full or latency spikes past 500ms.
7. Pitfalls I Tripped Over (So You Don’t Have To)
This wasn’t all smooth sailing—here’s what bit me and how I fixed it.
7.1 Memory Leaks
Problem: Goroutines piled up under load, eating RAM until the server cried OOM.
Fix: Added a context
to kill ‘em off cleanly:
func (ch *Channel) StartConsumers(wg *sync.WaitGroup, ctx context.Context) {
for _, consumer := range ch.consumers {
wg.Add(1)
go func(c *Consumer) {
defer wg.Done()
for {
select {
case msg := <-ch.messages:
println("Consumer", c.id, "processed", msg.Content)
case <-ctx.Done():
return // Bye-bye, goroutine
}
}
}(consumer)
}
}
Tip: Watch runtime.NumGoroutine()
—if it’s climbing, something’s leaking.
7.2 Persistence Pain
Problem: Disk writes tanked throughput from 100k to 50k QPS.
Fix: Went async with a buffer:
func (t *Topic) AsyncPersist(msgs chan *Message) {
file, _ := os.OpenFile("messages.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
defer file.Close()
for msg := range msgs {
data, _ := json.Marshal(msg)
file.Write(append(data, '\n'))
}
}
func (t *Topic) Broadcast(msg *Message) {
persistChan := make(chan *Message, 1000)
go t.AsyncPersist(persistChan)
persistChan <- msg
// Rest of broadcast logic
}
Tip: Batch writes (every 100ms or 100 messages) to lighten the I/O load.
7.3 Duplicate Madness
Problem: In a multi-node test, Consumers double-dipped messages.
Fix: Locked ‘em down with Redis:
func (c *Consumer) ConsumeWithLock(msg *Message, redisClient *redis.Client) {
lockKey := "lock:" + msg.ID
if redisClient.SetNX(lockKey, "1", 10*time.Second).Val() {
c.process(msg)
redisClient.Del(lockKey)
}
}
Tip: Unique IDs + a quick cache check keeps things idempotent.
8. Wrapping Up and Looking Ahead
8.1 What We Built
We’ve got a slick, NSQ-inspired queue—message distribution, persistence, delayed delivery—all in Go. It’s battle-tested with logs and async tasks, and I’ve leveled up my concurrency chops along the way.
8.2 Next Steps
This is just the start:
-
Go Distributed: Add an
nsqlookupd
-style coordinator for multi-node action. - Auto-Scale: Dynamically tweak Consumers based on traffic.
- Cloud Vibes: Hook into Kubernetes or Prometheus for that production polish.
Message queues are leaning into edge computing and real-time demands—our lightweight setup could shine there.
8.3 Final Thoughts
Building this was a grind, but man, was it worth it. Go’s concurrency makes hard stuff feel approachable, though balancing speed and stability is an art. Hope this sparks your next big project—drop a comment if you build something cool with it!