KafkaJsonTableSource purpose

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

KafkaJsonTableSource purpose

Lehuede sebastien
Hi Guys,

I'm actually trying to understand the purpose of Table and in particular KafkaJsonTableSource. I try to see if for my use case ths can be usefull. 

Here is my context :

I send logs on logstash, i add some information (Type, Tags), Logstash send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka to read from Kafka and parse the logs.


Before any processing events from Kafka to Flink look like this :

{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}

Then i use "JSONDeserializationSchema" to deserialize events :

FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer011<>("Firewall",new JSONDeserializationSchema(),properties);

I take the value of the key "message" :

public String map(ObjectNode value) throws Exception {
                                String message = value.get("message").asText();

Then parse it with Java Regex and put each match group in a String/Int/... :

action : accept
service_id : doamin-udp
src_ip : 1.1.1.1
dst_ip : 2.2.2.2
.....

Now i want to replace "message" key by "rawMessage" and put each match group in JSON object to obain the final result :

{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",
"@timestamp":"2018-04-20T14:47:35.285Z",
"host":"FW",
"type":"firewall",
"tags":["Checkpoint"],
"action":"accept",
"service_id":"domain-udp",
"src_ip":"1.1.1.1",
"dst_ip":"2.2.2.2",
...}

I'm a newbie with Streaming Application technologies, with Flink, and for the moment i still discover how it works and what are the different fonctionnalities. But when i was looking for a solution to obtain my final result, i came across KafkaJsonTableSource. 

Does anyone think this can be a good solution for my use case ? 

I think i will be able to store JSON from Kafka, process data then modify the table and send data to another Kafka, is it correct ? 

Regards,
Sebastien


Reply | Threaded
Open this post in threaded view
|

Re: KafkaJsonTableSource purpose

miki haiat
HI ,
Assuming that your looking for streaming   use case , i think this is a better approach 
  1. Send Avro from logstash  ,better performance.
  2. Deserialize it to POJO .
  3. Do logic...



On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <[hidden email]> wrote:
Hi Guys,

I'm actually trying to understand the purpose of Table and in particular KafkaJsonTableSource. I try to see if for my use case ths can be usefull. 

Here is my context :

I send logs on logstash, i add some information (Type, Tags), Logstash send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka to read from Kafka and parse the logs.


Before any processing events from Kafka to Flink look like this :

{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}

Then i use "JSONDeserializationSchema" to deserialize events :

FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer011<>("Firewall",new JSONDeserializationSchema(),properties);

I take the value of the key "message" :

public String map(ObjectNode value) throws Exception {
                                String message = value.get("message").asText();

Then parse it with Java Regex and put each match group in a String/Int/... :

action : accept
service_id : doamin-udp
src_ip : 1.1.1.1
dst_ip : 2.2.2.2
.....

Now i want to replace "message" key by "rawMessage" and put each match group in JSON object to obain the final result :

{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",
"@timestamp":"2018-04-20T14:47:35.285Z",
"host":"FW",
"type":"firewall",
"tags":["Checkpoint"],
"action":"accept",
"service_id":"domain-udp",
"src_ip":"1.1.1.1",
"dst_ip":"2.2.2.2",
...}

I'm a newbie with Streaming Application technologies, with Flink, and for the moment i still discover how it works and what are the different fonctionnalities. But when i was looking for a solution to obtain my final result, i came across KafkaJsonTableSource. 

Does anyone think this can be a good solution for my use case ? 

I think i will be able to store JSON from Kafka, process data then modify the table and send data to another Kafka, is it correct ? 

Regards,
Sebastien



Reply | Threaded
Open this post in threaded view
|

Re: KafkaJsonTableSource purpose

Fabian Hueske-2
Hi Sebastien,

I think you can do that with Flink's Table API / SQL and the KafkaJsonTableSource.
Note that in Flink 1.4.2, the KafkaJsonTableSource does not support flat JSON yet.
You'd also need a table-valued UDFs for the parsing of the message and joining the result with the original row. Depending on what you want to do, you might need additional UDFs.

Best,
Fabian

2018-04-24 8:48 GMT+02:00 miki haiat <[hidden email]>:
HI ,
Assuming that your looking for streaming   use case , i think this is a better approach 
  1. Send Avro from logstash  ,better performance.
  2. Deserialize it to POJO .
  3. Do logic...



On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <[hidden email]> wrote:
Hi Guys,

I'm actually trying to understand the purpose of Table and in particular KafkaJsonTableSource. I try to see if for my use case ths can be usefull. 

Here is my context :

I send logs on logstash, i add some information (Type, Tags), Logstash send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka to read from Kafka and parse the logs.


Before any processing events from Kafka to Flink look like this :

{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}

Then i use "JSONDeserializationSchema" to deserialize events :

FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer011<>("Firewall",new JSONDeserializationSchema(),properties);

I take the value of the key "message" :

public String map(ObjectNode value) throws Exception {
                                String message = value.get("message").asText();

Then parse it with Java Regex and put each match group in a String/Int/... :

action : accept
service_id : doamin-udp
src_ip : 1.1.1.1
dst_ip : 2.2.2.2
.....

Now i want to replace "message" key by "rawMessage" and put each match group in JSON object to obain the final result :

{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port: 32769",
"@timestamp":"2018-04-20T14:47:35.285Z",
"host":"FW",
"type":"firewall",
"tags":["Checkpoint"],
"action":"accept",
"service_id":"domain-udp",
"src_ip":"1.1.1.1",
"dst_ip":"2.2.2.2",
...}

I'm a newbie with Streaming Application technologies, with Flink, and for the moment i still discover how it works and what are the different fonctionnalities. But when i was looking for a solution to obtain my final result, i came across KafkaJsonTableSource. 

Does anyone think this can be a good solution for my use case ? 

I think i will be able to store JSON from Kafka, process data then modify the table and send data to another Kafka, is it correct ? 

Regards,
Sebastien