This is an automated archive.

The original was posted on /r/golang by /u/Ok_Yesterday_4941 on 2023-09-01 18:51:41+00:00.


I have some publishers and consumers running in Kubernetes. Everything seems OK, and RabbitMQ is definitely up and working because all of the Publishers are working, and all of the consumers are working. Then, an issue arrives in SOME of the consumers, while all publishers continue to work,and some consumers continue to work.

I receive a completely empty message: nil d.Body, nil d.Acknowledger, etc, while readingf from the messages channel. I am unable to even remove this message from the queue, because i get the delivery not initialized error, via Ack or Nack. I am wondering what I have done wrong here - is this indicative that my Channel has an error/been closed, and I just need to reestablish a connection to the Channel?

I am not Auto-acking, nor am I Acking or Nacking with multiple set to true, so the deliveries should not be acknowledged before being received, as so:

msgs, err := r.ch.Consume(r.queue.Name,"",false, // disable auto-acknowledge until after processingfalse,false,false,nil,)

acking:

rmqMsg.Ack(false)

and here is where I run into the error:

go func() {

for {

select {

case <-ctx.Done():

defer r.Close(ctx)

return

case d := <-msgs:

if d.Body != nil {

slog.Info("rabbitmq message found", "msgId", d.MessageId)

} else {

// error with channel if we got a nil body message

// it wont be able to be acknowledged.

// re-establish connection? i dunno what to do here tbh

ch, err := r.conn.Channel()

if err != nil {

slog.Error("error establishing channel", "err", err.Error())

}

r.ch = ch

if err := r.Start(ctx, r.queue.Name); err != nil {

slog.Error("error starting queue", "err", err.Error())

}

}

}

}

}()