How to sink invalid data from flatmap

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to sink invalid data from flatmap

范超

Hi,

I’m using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo  (using GSON), then go on with the next sink step.

If it can not be parsed as a POJO, the GSON will throw the “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.

 

But now, I want to save the invalidated json message to sink the original kafka json string to another table, but don’t know how to implement in my custom flatmap function, because the richmapfucntion has limited the collect type.

Could someone give me some advice please?

Thanks in advance!

Chao Fan

 

Reply | Threaded
Open this post in threaded view
|

Re: How to sink invalid data from flatmap

Yun Tang
Hi Chao

I think side output [1] might meet your requirements.


Best
Yun Tang

From: 范超 <[hidden email]>
Sent: Tuesday, August 25, 2020 10:54
To: user <[hidden email]>
Subject: How to sink invalid data from flatmap
 

Hi,

I’m using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo  (using GSON), then go on with the next sink step.

If it can not be parsed as a POJO, the GSON will throw the “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.

 

But now, I want to save the invalidated json message to sink the original kafka json string to another table, but don’t know how to implement in my custom flatmap function, because the richmapfucntion has limited the collect type.

Could someone give me some advice please?

Thanks in advance!

Chao Fan

 

Reply | Threaded
Open this post in threaded view
|

Re: How to sink invalid data from flatmap

Jake
In reply to this post by 范超
Hi fanchao

use side output, see[1]

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html

Jake

On Aug 25, 2020, at 10:54 AM, 范超 <[hidden email]> wrote:

Hi, 
I’m using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo  (using GSON), then go on with the next sink step.
If it can not be parsed as a POJO, the GSON will throw the “com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.
 
But now, I want to save the invalidated json message to sink the original kafka json string to another table, but don’t know how to implement in my custom flatmap function, because the richmapfucntion has limited the collect type.
Could someone give me some advice please?
Thanks in advance!
Chao Fan

Reply | Threaded
Open this post in threaded view
|

答复: How to sink invalid data from flatmap

范超
In reply to this post by Yun Tang

Thanks , Using the ctx.output() inside the process method solved my problem, but my custom flatmap function has to be retired?

 

发件人: Yun Tang [mailto:[hidden email]]
发送时间: 2020825日 星期二 10:58
收件人: 范超 <[hidden email]>; user <[hidden email]>
主题: Re: How to sink invalid data from flatmap

 

Hi Chao

 

I think side output [1] might meet your requirements.

 

 

Best

Yun Tang


From: 范超 <[hidden email]>
Sent: Tuesday, August 25, 2020 10:54
To: user <[hidden email]>
Subject: How to sink invalid data from flatmap

 

Hi,

Im using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo  (using GSON), then go on with the next sink step.

If it can not be parsed as a POJO, the GSON will throw the com.google.gson.JsonSyntaxException, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.

 

But now, I want to save the invalidated json message to sink the original kafka json string to another table, but dont know how to implement in my custom flatmap function, because the richmapfucntion has limited the collect type.

Could someone give me some advice please?

Thanks in advance!

Chao Fan

 

Reply | Threaded
Open this post in threaded view
|

答复: How to sink invalid data from flatmap

范超
In reply to this post by Jake

Thanks Jake. But can I just want to  implement the ouput-tag function in my flatmap function not in the process function. I check the parameters for the flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite my flatmap function?

 

发件人: Jake [mailto:[hidden email]]
发送时间: 2020825日 星期二 11:06
收件人: 范超 <[hidden email]>
抄送: user <[hidden email]>
主题: Re: How to sink invalid data from flatmap

 

Hi fanchao

 

use side output, see[1]

 

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html

 

Jake



On Aug 25, 2020, at 10:54 AM, 范超 <[hidden email]> wrote:

 

Hi, 

Im using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo  (using GSON), then go on with the next sink step.

If it can not be parsed as a POJO, the GSON will throw the com.google.gson.JsonSyntaxException, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.

 

But now, I want to save the invalidated json message to sink the original kafka json string to another table, but dont know how to implement in my custom flatmap function, because the richmapfucntion has limited the collect type.

Could someone give me some advice please?

Thanks in advance!

Chao Fan

 

Reply | Threaded
Open this post in threaded view
|

Re: How to sink invalid data from flatmap

Jake
Hi fanchao

Yes. I suggest that.

Jake

On Aug 25, 2020, at 11:20 AM, 范超 <[hidden email]> wrote:

Thanks Jake. But can I just want to  implement the ouput-tag function in my flatmap function not in the process function. I check the parameters for the flatmap ,there is no ‘context’, so is it means I’ve to use process to rewrite my flatmap function?
 
发件人: Jake [[hidden email]] 
发送时间: 2020825日 星期二 11:06
收件人: 范超 <[hidden email]>
抄送: user <[hidden email]>
主题: Re: How to sink invalid data from flatmap
 
Hi fanchao
 
use side output, see[1]
 
 
Jake


On Aug 25, 2020, at 10:54 AM, 范超 <[hidden email]> wrote:
 
Hi, 
Im using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo  (using GSON), then go on with the next sink step.
If it can not be parsed as a POJO, the GSON will throw the com.google.gson.JsonSyntaxException, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.
 
But now, I want to save the invalidated json message to sink the original kafka json string to another table, but dont know how to implement in my custom flatmap function, because the richmapfucntion has limited the collect type.
Could someone give me some advice please?
Thanks in advance!
Chao Fan

Reply | Threaded
Open this post in threaded view
|

答复: How to sink invalid data from flatmap

范超

Thanks a lot Jake for the quick response

 

发件人: Jake [mailto:[hidden email]]
发送时间: 2020825日 星期二 11:31
收件人: 范超 <[hidden email]>
抄送: user <[hidden email]>
主题: Re: How to sink invalid data from flatmap

 

Hi fanchao

 

Yes. I suggest that.

 

Jake



On Aug 25, 2020, at 11:20 AM, 范超 <[hidden email]> wrote:

 

Thanks Jake. But can I just want to  implement the ouput-tag function in my flatmap function not in the process function. I check the parameters for the flatmap ,there is no context, so is it means Ive to use process to rewrite my flatmap function?

 

发件人: Jake [[hidden email]] 
发送时间: 2020825日 星期二 11:06
收件人: 范超 <[hidden email]>
抄送: user <[hidden email]>
主题: Re: How to sink invalid data from flatmap

 

Hi fanchao

 

use side output, see[1]

 

 

Jake




On Aug 25, 2020, at 10:54 AM, 范超 <[hidden email]> wrote:

 

Hi, 

Im using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo  (using GSON), then go on with the next sink step.

If it can not be parsed as a POJO, the GSON will throw the com.google.gson.JsonSyntaxException, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.

 

But now, I want to save the invalidated json message to sink the original kafka json string to another table, but dont know how to implement in my custom flatmap function, because the richmapfucntion has limited the collect type.

Could someone give me some advice please?

Thanks in advance!

Chao Fan