Hi Guys,
I'm actually trying to understand the purpose of Table and in particular KafkaJsonTableSource. I try to see if for my use case ths can be usefull. Here is my context : I send logs on logstash, i add some information (Type, Tags), Logstash send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka to read from Kafka and parse the logs. Before any processing events from Kafka to Flink look like this : {"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"} Then i use "JSONDeserializationSchema" to deserialize events : FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer011<>("Firewall",new JSONDeserializationSchema(),properties); I take the value of the key "message" : public String map(ObjectNode value) throws Exception { String message = value.get("message").asText(); Then parse it with Java Regex and put each match group in a String/Int/... :
action : accept service_id : doamin-udp src_ip : 1.1.1.1 dst_ip : 2.2.2.2 ..... Now i want to replace "message" key by "rawMessage" and put each match group in JSON object to obain the final result :
{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769", "@timestamp":"2018-04-20T14:47:35.285Z", "host":"FW", "type":"firewall", "tags":["Checkpoint"], "action":"accept", "service_id":"domain-udp", "src_ip":"1.1.1.1", "dst_ip":"2.2.2.2", ...} I'm a newbie with Streaming Application technologies, with Flink, and for the moment i still discover how it works and what are the different fonctionnalities. But when i was looking for a solution to obtain my final result, i came across KafkaJsonTableSource. Does anyone think this can be a good solution for my use case ? I think i will be able to store JSON from Kafka, process data then modify the table and send data to another Kafka, is it correct ? Regards, Sebastien |
HI , Assuming that your looking for streaming use case , i think this is a better approach
On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <[hidden email]> wrote:
|
Hi Sebastien, I think you can do that with Flink's Table API / SQL and the KafkaJsonTableSource. 2018-04-24 8:48 GMT+02:00 miki haiat <[hidden email]>:
|
Free forum by Nabble | Edit this page |