如何检查Ruby-Kafka重试是否有效?

In the documentation it is mentioned that producer retries to send the message to the queue based on max_retries.

因此,我关闭了Kafka,然后尝试了制作人。我得到这个错误

Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)

which it make sense, however the retries never happens after that. I have read the doc inside-out and I can't figure it out how this retries actually going to trigger?

这是我的代码:

 def self.deliver_message(kafka, message, topic, transactional_id)
      producer = kafka.producer(idempotent: true,
                                transactional_id: transactional_id,
                                required_acks: :all,
                                max_retries: 5,
                                retry_backoff: 5)
      producer.produce(message, topic: topic)
      producer.deliver_messages
    end

链接到文档:

https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method

先感谢您。

评论
  • wiusto
    wiusto 回复

    The retries are based on the type of Exception thrown by the producer callback. According to the Callback Docs there are the following Exception possible happening during callback:

    处理此记录期间引发的异常。如果没有发生错误,则为Null。可能引发的异常包括:      不可重试异常(致命,永远不会发送消息):
    • InvalidTopicException
    • OffsetMetadataTooLargeException
    • RecordBatchTooLargeException
    • RecordTooLargeException
    • UnknownServerException
    可恢复的异常(瞬态,可以通过增加#.retries来覆盖):
    • CorruptRecordException
    • InchvalidMetadataException
    • NotEnoughReplicasAfterAppendException
    • NotEnoughReplicasException
    • OffsetOutOfRangeException
    • TimeoutException
    • UnknownTopicOrPartitionException

    完全关闭Kafka看起来像是一个不可重试的Exception。