相关文章推荐
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am new to Kafka I have downloaded Kafka(0.10.1) and started both Kafka and zookeeper server on my local.Below mention is the code through which I am producing messages in Kafka.

package com.pkg.kafka;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducer {
    public static void main(String args[]) throws IOException{
        Properties properties = new Properties();
        InputStream stream = new FileInputStream("/Users/apple/Documents/workspace-sts-3.7.3.RELEASE/kafka/properties/producer.properties");
        properties.load(stream);
        @SuppressWarnings("resource")
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
        for(int i = 0;i< 1;i++){
            producer.send(new ProducerRecord<String, String>("producer-testing1",Integer.toString(i),Integer.toString(i)));
        producer.close();

But as I am running the code my my zookeeper log is generating these warnings:

[2016-11-14 08:57:47,553] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,553] INFO Closed socket connection for client /127.0.0.1:63833 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,656] INFO Accepted socket connection from /127.0.0.1:63834 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:47,757] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,757] INFO Closed socket connection for client /127.0.0.1:63834 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,858] INFO Accepted socket connection from /127.0.0.1:63835 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:47,959] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,959] INFO Closed socket connection for client /127.0.0.1:63835 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,060] INFO Accepted socket connection from /127.0.0.1:63836 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:48,161] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,161] INFO Closed socket connection for client /127.0.0.1:63836 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,262] INFO Accepted socket connection from /127.0.0.1:63837 (org.apache.zookeeper.server.NIOServerCnxnFactory)

and the message is not getting produced.So any suggestions how to fix this?

@amethystic - Client was stuck on line producer.send(new ProducerRecord<String, String>("producer-testing1",Integer.toString(i),Integer.toString(i))); – Abhimanyu Nov 14, 2016 at 8:49 Please paste your property setting for the producer to see if you connect to the correct host:port – amethystic Nov 14, 2016 at 9:06 bootstrap.servers = 127.0.0.1:9092 acks = all retries = 0 batch.size = 16384 linger.ms = 1 buffer.memory = 33554432 key.serializer = org.apache.kafka.common.serialization.StringSerializer value.serializer = org.apache.kafka.common.serialization.StringSerializer – Abhimanyu Nov 15, 2016 at 2:15

KafkaProducer.send(ProducerRecord) is an asynchronous method. If you want to check whether the message is delivered successfully or failed, just invoke the same method with a callback:

producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());

If the message got failed to be published, exceptions will be thrown out.

Some versions of zookeeper allowed the client to set the data larger than the max readable size by the server. Increasing the max buffer size JVM property fixes the issue.

-Djute.maxbuffer = xxx

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.

 
推荐文章