Is it possible to transfer files using Kafka? -
i have thousands of files generated each day want stream using kafka. when try read file, each line taken separate message.
i know how can make each file's content single message in kafka topic , consumer how write each message kafka topic in separate file.
you can write own serializer/deserializer handling files. example :
producer props :
props.put(producerconfig.key_serializer_class_config, org.apache.kafka.common.serialization.stringserializer); props.put(producerconfig.value_serializer_class_config, your_file_serializer_uri);
consumer props :
props.put(consumerconfig.key_deserializer_class_config, org.apache.kafka.common.serialization.stringdeserializer); props.put(consumerconfig.value_deserializer_class_config, your_file_deserializer_uri);
serializer
public class filemapserializer implements serializer<map<?,?>> { @override public void close() { } @override public void configure(map configs, boolean iskey) { } @override public byte[] serialize(string topic, map data) { bytearrayoutputstream bos = new bytearrayoutputstream(); objectoutput out = null; byte[] bytes = null; try { out = new objectoutputstream(bos); out.writeobject(data); bytes = bos.tobytearray(); } catch (ioexception e) { e.printstacktrace(); } { try { if (out != null) { out.close(); } } catch (ioexception ex) { // ignore close exception } try { bos.close(); } catch (ioexception ex) { // ignore close exception } } return bytes; } }
deserializer
public class mapdeserializer implements deserializer<map> { @override public void close() { } @override public void configure(map config, boolean iskey) { } @override public map deserialize(string topic, byte[] message) { bytearrayinputstream bis = new bytearrayinputstream(message); objectinput in = null; try { in = new objectinputstream(bis); object o = in.readobject(); if (o instanceof map) { return (map) o; } else return new hashmap<string, string>(); } catch (classnotfoundexception e) { e.printstacktrace(); } catch (ioexception e) { e.printstacktrace(); } { try { bis.close(); } catch (ioexception ex) { } try { if (in != null) { in.close(); } } catch (ioexception ex) { // ignore close exception } } return new hashmap<string, string>(); } }
compose messages in following form
final object kafkamessage = new producerrecord<string, map>((string) <topic>,integer.tostring(messageid++), messagemap);
messagemap contain filename key , file content value. value can serializable object. hence each message contain map file_name versus filecontent map.can single value or multiple value.
Comments
Post a Comment