finalProducerRecord<String,String>record=newProducerRecord<>(topic,value);producer.send(record,(metadata,e)->{if(e!=null){System.out.println("Record sent to partition "+metadata.partition()+" with offset "+metadata.offset()+" - toString() : "+metadata.toString());}});
Synchronous writes
Future<RecordMetadata>future=producer.send(record);try{RecordMetadatametadata=future.get();System.out.println("Record sent to partition "+metadata.partition()+" with offset "+metadata.offset()+" - toString() : "+metadata.toString());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}
full source
Propertiesprops=newProperties();props.put(ProducerConfig.CLIENT_ID_CONFIG,"client_id");props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");props.put(ProducerConfig.ACKS_CONFIG,"all");Stringtopic="topic_name";Stringvalue="produce test value";try(KafkaProducer<String,String>producer=newKafkaProducer<>(props);){//Asynchronous writesfinalProducerRecord<String,String>record=newProducerRecord<>(topic,value);producer.send(record,(metadata,e)->{if(e!=null){System.out.println("Record sent to partition "+metadata.partition()+" with offset "+metadata.offset()+" - toString() : "+metadata.toString());}});//Synchronous writesFuture<RecordMetadata>future=producer.send(record);try{RecordMetadatametadata=future.get();System.out.println("Record sent to partition "+metadata.partition()+" with offset "+metadata.offset()+" - toString() : "+metadata.toString());}catch(InterruptedExceptione){e.printStackTrace();}catch(ExecutionExceptione){e.printStackTrace();}}