-
Notifications
You must be signed in to change notification settings - Fork 151
Open
Description
readerConfig := kafka.ReaderConfig{
Brokers: c.Brokers,
GroupID: c.Group,
Topic: c.Topic,
StartOffset: offset,
MinBytes: c.MinBytes, // 10KB
MaxBytes: c.MaxBytes, // 10MB
MaxWait: options.maxWait,
CommitInterval: options.commitInterval,
QueueCapacity: options.queueCapacity,
}
if len(c.Username) > 0 && len(c.Password) > 0 {
readerConfig.Dialer = &kafka.Dialer{
SASLMechanism: plain.Mechanism{
Username: c.Username,
Password: c.Password,
},
}
}
if len(c.CaFile) > 0 {
caCert, err := os.ReadFile(c.CaFile)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
log.Fatal(err)
}
readerConfig.Dialer.TLS = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: true,
}
}
consumer := kafka.NewReader(readerConfig)
q := &kafkaQueue{
c: c,
consumer: consumer,
handler: handler,
channel: make(chan kafka.Message),
producerRoutines: threading.NewRoutineGroup(),
consumerRoutines: threading.NewRoutineGroup(),
metrics: options.metrics,
errorHandler: options.errorHandler,
}
例如这个:
// ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
//
// Default: 10s
ReadBatchTimeout time.Duration
不支持设置的话默认只有 10s, 我们业务里有特殊情况会超过 10s
Metadata
Metadata
Assignees
Labels
No labels