When I set out to learn Go one of the aims I had in mind was to write a version of this little Python utility which accompanies a blog I wrote recently about understanding and diagnosing problems with Kafka advertised listeners. Having successfully got Producer, Consumer, and AdminClient API examples working, it is now time to turn to that task.
I’m quite keen to figure out how to do this properly and take the opportunity to learn. It would be easy enough to copy & paste all my snippets together, ignore any error handling, and check the task off as done. But since I don’t have a strong background in coding, now seems a good chance to try and instill a little bit of rigour in what I’m doing.
Command line arguments
Instead of hard coding the broker host and port, I want to be able to pass this as a commandline argument. This is easy enough using the OS package, which behaves very similar to the Python equivalent:
import (
"os"
)
func main() {
// Read the first commandline argument to get the broker details
broker := os.Args[1]
fmt.Printf("Broker: %v", broker)
I’m using VSCode for my IDE as it works very nicely with Go - both for writing code, and debugging it. To pass an argument to the command line as part of debugging go to Run
→ Open Configurations
and in the JSON file set the argument(s) that you want to pass:
"args": ["localhost:9092"]
This works fine when the code is run
Broker: localhost:9092
But what if the user doesn’t specify the required commandline arguments?
panic: runtime error: index out of range [1] with length 1
goroutine 1 [running]:
main.main()
/Users/rmoff/git/rmoff-blog/content/code/go/kafka/producer_function/producer_function.go:15 +0x24e
So we need a bit of care here, and check for the length too
var broker string
if len(os.Args) == 2 {
broker = os.Args[1]
} else {
fmt.Println("(No broker specifed on commandline; defaulting to localhost:9092)")
broker = "localhost:9092"
}
fmt.Printf("Broker: %v", broker)
Functions
The main()
code is going to look something like this:
// Do init stuff to set vars etc
// …
// Create AdminClient connection to check metadata
doAdmin(broker)
// Produce message
doProduce(broker,topic)
// Consume message
doConsume(broker,topic)
// fin.
Each one is dependent on the other, so we need to know if there was an error…
Errors
Following the same pattern as I explored here I’m expecting to have something that looks like this:
// Do init stuff to set vars etc
// …
// Create AdminClient connection to check metadata
if e := doAdmin(broker); e != nil {
fmt.Printf("There was a problem with AdminClient :-(\n%v", e)
} else {
// Produce message
if e := doProduce(broker, topic); e != nil {
fmt.Printf("There was a problem calling the producer :-(\n%v", e)
} else {
// Consume message
if e := doConsume(broker, topic); e != nil {
fmt.Printf("There was a problem calling the consumer :-(\n%v", e)
}
}
}
// fin.
To do this, each function needs to return an error
, so the function looks like this:
func doProduce(broker, topic string) error {
// If you hit an error then
return errors.New("OH NO! THERE WAS AN ERROR")
// assuming everything has gone ok return no error
return nil
}
Where we were previously dumping messages to the output:
fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
We now want to take this and pass it back as the error. Since errors.New()
takes a string
it makes sense to replace our existing fmt.Printf
with fmt.Sprintf
and pass this to errors.New()
:
return errors.New(fmt.Sprintf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e))
The Go linter in VSCode is brilliant here as it actively tells you this is not the best way to do it, with some nice orange underlining:
If you hover over it you get a nice tip of how to write the code better:
So, instead of
return errors.New(fmt.Sprintf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e))
We just replace
fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
with
return fmt.Errorf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
Error in Go routines
This all works, except in the Go routine within the function. In the case of the Producer code the Go routine is there to handle events such as message delivery reports, errors, etc. Since a Go routine is a function itself with no return variables, then we can’t return
an error from within it. If you try to, you get this:
too many arguments to return
have (error)
want ()
Now, maybe the code in your function will catch an error as a side effect of the error thrown in the Go routine, but it’s not great to rely on that happening. Instead we can provide a channel that the Go routine can write to if there’s an error, and then check that from our parent function and return an error if we find one. Something like this:
package main
import (
"errors"
"fmt"
"time"
)
func main() {
if e := doThisThing(); e != nil {
fmt.Printf("doThisThing failed.\n%v", e)
} else {
fmt.Printf("doThisThing worked.\n")
}
}
func doThisThing() error {
ec := make(chan string, 8)
go func() {
// If we're all good then do stuff
// …
// If we hit an error then log an error
ec <- fmt.Sprintf("here is an error from the go routine :(\n")
ec <- fmt.Sprintf("here is another error from the go routine :(\n")
close(ec)
}()
// Do all our stuff in the function that we need to
// …
time.Sleep(2 * time.Second)
// pretend we're doing stuff
// …
// When we're ready to return, check if the go routine has sent errors
// Note that we're relying on the Go routine to close the channel, otherwise
// we deadlock.
// If there are no errors then the channel is simply closed and we read no values.
done := false
var e string
for !done {
if t, o := <-ec; o == false {
// o is false if we've read all the values and the channel is closed
// If that's the case, then we're done here
done = true
} else {
// We've read a value so let's concatenate it with the others
// that we've got
e += t
}
}
if len(e) > 0 {
// If we've got any errors, then return an error to the caller
return errors.New(e)
}
// assuming everything has gone ok return no error
return nil
}
When run this looks like:
doThisThing failed.
here is an error from the go routine :(
here is another error from the go routine :(
The Producer
With this error handling in place, I can now call my doProduce
function and get an error (or not) back from it successfully:
-
It works!
ℹ️ No broker specified on commandline; defaulting to localhost:9092 Broker: localhost:9092 -- ✨ All messages flushed from the queue ✅ Message 'foo / Thu, 16 Jul 2020 00:05:57 +0100' delivered to topic 'rmoff_test_00' (partition 0 at offset 11) -=-= Wrapping up… 👋 … and we're done. Oh joy! Oh rejoice! Calling the producer worked *just fine*.
-
It doesn’t!
ℹ️ No broker specified on commandline; defaulting to localhost:9092 Broker: localhost:9092 -=-= Wrapping up… ❌ … returning an error There was a problem calling the producer :-( **☠️ Uh oh, caught an error: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 3ms in state CONNECT) **☠️ Uh oh, caught an error: 1/1 brokers are down -- ⚠️ Failed to flush all messages after 1000 milliseconds. 1 message(s) remain
Here’s the full code, and a Docker Compose you can use to try it with.
package main
import (
"errors"
"fmt"
"os"
"time"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
// Read the first commandline argument to get the broker details
var broker string
if len(os.Args) == 2 {
broker = os.Args[1]
} else {
fmt.Printf("ℹ️ No broker specified on commandline; defaulting to localhost:9092\n\n")
broker = "localhost:9092"
}
fmt.Printf("Broker: %v\n", broker)
// Set the topic name that we'll use
topic := "rmoff_test_00"
// Create Admin Connection
// doAdmin(broker)
// Produce message
if e := doProduce(broker, topic); e != nil {
fmt.Printf("\nThere was a problem calling the producer :-(\n%v", e)
} else {
fmt.Println("Oh joy! Oh rejoice! Calling the producer worked *just fine*.")
}
// Consume message
// doConsume()
// fin.
}
func doProduce(broker, topic string) error {
// --
// Create Producer instance
// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewProducer
// Store the config
c := kafka.ConfigMap{
"bootstrap.servers": broker}
// Check for errors in creating the Producer
if p, e := kafka.NewProducer(&c); e != nil {
if ke, ok := e.(kafka.Error); ok == true {
switch ec := ke.Code(); ec {
case kafka.ErrInvalidArg:
return fmt.Errorf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
default:
return fmt.Errorf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v", ec, e)
}
} else {
// It's not a kafka.Error
return fmt.Errorf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
}
} else {
defer p.Close()
// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)
// For capturing errors from the go-routine
errorChan := make(chan string, 8)
// --
// Send a message using Produce()
// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Producer.Produce
//
// Build the message
m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
Value: []byte(fmt.Sprintf("foo / %v",
time.Now().Format(time.RFC1123Z)))}
// Handle any events that we get
go func() {
doTerm := false
for !doTerm {
// The `select` blocks until one of the `case` conditions
// are met - therefore we run it in a Go Routine.
select {
case ev := <-p.Events():
// Look at the type of Event we've received
switch ev.(type) {
case *kafka.Message:
// It's a delivery report
km := ev.(*kafka.Message)
if km.TopicPartition.Error != nil {
errorChan <- fmt.Sprintf("\n**☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Error)
} else {
fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
string(km.Value),
string(*km.TopicPartition.Topic),
km.TopicPartition.Partition,
km.TopicPartition.Offset)
}
case kafka.Error:
// It's an error
em := ev.(kafka.Error)
errorChan <- fmt.Sprintf("\n**☠️ Uh oh, caught an error:\n\t%v\n", em)
default:
// It's not anything we were expecting
errorChan <- fmt.Sprintf("\n**Got an event that's not a Message or Error 👻\n\t%v\n", ev)
}
case <-termChan:
doTerm = true
}
}
close(errorChan)
close(doneChan)
}()
// Produce the message
if e := p.Produce(&m, nil); e != nil {
errorChan <- fmt.Sprintf("😢 Darn, there's an error producing the message! %v", e.Error())
}
// --
// Flush the Producer queue
t := 1000
if r := p.Flush(t); r > 0 {
errorChan <- fmt.Sprintf("\n--\n⚠️ Failed to flush all messages after %d milliseconds. %d message(s) remain", t, r)
} else {
fmt.Println("\n--\n✨ All messages flushed from the queue")
}
// --
// Stop listening to events and close the producer
// We're ready to finish
termChan <- true
// wait for go-routine to terminate
<-doneChan
// Now we can get ready to exit
fmt.Printf("\n-=-=\nWrapping up…\n")
// When we're ready to return, check if the go routine has sent errors
// Note that we're relying on the Go routine to close the channel, otherwise
// we deadlock.
// If there are no errors then the channel is simply closed and we read no values.
done := false
var e string
for !done {
if t, o := <-errorChan; o == false {
// o is false if we've read all the values and the channel is closed
// If that's the case, then we're done here
done = true
} else {
// We've read a value so let's concatenate it with the others
// that we've got
e += t
}
}
if len(e) > 0 {
// If we've got any errors, then return an error to the caller
fmt.Printf("❌ … returning an error\n")
return errors.New(e)
}
// assuming everything has gone ok return no error
fmt.Printf("👋 … and we're done.\n")
return nil
}
}