I encounter this exception when write history of data from a big file to Kafka, and would like to share this experience.
Two cases
Process in real-time
I would classify using Kafka producer in two cases. One is embedding a producer in micro-service, and the send() of producer is triggered every time by a http request, which means the producer writes data in real-time. Many data volumes might appear in a short time, but not always.
Process from history
The other case is the Kafka producer sends data from history of data. send() of producer is not triggered in real-time, instead, it’s triggered depending on the history reader, which can be a big file, hdfs, or a Kafka topic, and read the data as fast as possible that you don’t want to wait for a long time until the processing to be finished.
In the first case the KafkaProducer setting is pretty simple.
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokers());
properties.put(ProducerConfig.CLIENT_ID_CONFIG, config.getTopic());
properties.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
In the second case, you may encounter a NetworkException. This exception signifies that there is too much network traffic and the connection to the brokers has been lost. I have encountered this exception when writing a 20 GB file in 20 minutes using a single KafkaProducer (i.e., one connection). The occurrence of this exception is highly dependent on your Kafka brokers’ settings.
NetworkException: Server disconnected before response received.
Producer settings
There have already been several discussions on this topic. The main goal is to reduce network traffic, particularly when reading from history. Real-time guarantees may not be necessary in this case, so we can batch the writes and reduce the frequency of writes to Kafka. This should improve speed and decrease traffic by sending in batches.
Some best practices about Kafka
More explanations for configuration of producer
The main configuration controls the throughput is as following:
properties.put(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "163840");
By default LINGER_MS_CONFIG is 0, which means no delay is planned when data arrives KafkaProducer and thus Brokers, which suits the case that embedding a KafkaProducer in a micro-service.
You can see the above config as either after 100 millis pass, or the buffer size meets the defined one, producer then sends data to Kafka.
The second article above describes the details about the two settings. You could think of when send() is triggered, the write to kafka does not happen immediately, but the data arrives to a Buffer, and the data in the buffer is ready to be written, only when the configuration of size and time limit meets.
From Kafka 2.0 you can simply use delivery.timeout.ms, and it defines the latency, as described in the second article.
The other way of handling the NetworkException is to recreate a connection when the callback exception is NetworkException.
producer.send(record, (metadata, exception) -> {
if (exception instanceof NetworkException) {
producer.flush();
producer.close();
producer = producerProvider.createProducer();
}
}
);
How many producers
I often see one KafkaProducer is used in one application(or docker container) and many threads use the same connection. Some discuss about it already in stackoverflow.
Basically it’s recommended to start with one connection, i.e. one KafkaProducer, and whenever a higher throughput is desired, you can then create more KafkaProducers.