Is it possible to transfer files using Kafka? -


i have thousands of files generated each day want stream using kafka. when try read file, each line taken separate message.

i know how can make each file's content single message in kafka topic , consumer how write each message kafka topic in separate file.

you can write own serializer/deserializer handling files. example :

producer props :

props.put(producerconfig.key_serializer_class_config, org.apache.kafka.common.serialization.stringserializer);   props.put(producerconfig.value_serializer_class_config, your_file_serializer_uri); 

consumer props :

props.put(consumerconfig.key_deserializer_class_config, org.apache.kafka.common.serialization.stringdeserializer); props.put(consumerconfig.value_deserializer_class_config, your_file_deserializer_uri); 

serializer

public class filemapserializer implements serializer<map<?,?>> {  @override public void close() {  }  @override public void configure(map configs, boolean iskey) { }  @override public byte[] serialize(string topic, map data) {     bytearrayoutputstream bos = new bytearrayoutputstream();     objectoutput out = null;     byte[] bytes = null;     try {         out = new objectoutputstream(bos);         out.writeobject(data);         bytes = bos.tobytearray();     } catch (ioexception e) {         e.printstacktrace();     } {         try {             if (out != null) {                 out.close();             }         } catch (ioexception ex) {             // ignore close exception         }         try {             bos.close();         } catch (ioexception ex) {             // ignore close exception         }     }     return bytes; } } 

deserializer

public class mapdeserializer implements deserializer<map> {  @override public void close() {  }  @override public void configure(map config, boolean iskey) {  }  @override public map deserialize(string topic, byte[] message) {     bytearrayinputstream bis = new bytearrayinputstream(message);     objectinput in = null;     try {         in = new objectinputstream(bis);         object o = in.readobject();         if (o instanceof map) {             return (map) o;         } else             return new hashmap<string, string>();     } catch (classnotfoundexception e) {         e.printstacktrace();     } catch (ioexception e) {         e.printstacktrace();     } {         try {             bis.close();         } catch (ioexception ex) {         }         try {             if (in != null) {                 in.close();             }         } catch (ioexception ex) {             // ignore close exception         }     }     return new hashmap<string, string>(); } } 

compose messages in following form

final object kafkamessage = new producerrecord<string, map>((string) <topic>,integer.tostring(messageid++), messagemap); 

messagemap contain filename key , file content value. value can serializable object. hence each message contain map file_name versus filecontent map.can single value or multiple value.


Comments

Popular posts from this blog

mysql - Dreamhost PyCharm Django Python 3 Launching a Site -

java - Sending SMS with SMSLib and Web Services -

java - How to resolve The method toString() in the type Object is not applicable for the arguments (InputStream) -