Thanks , Using the ctx.output() inside the process method solved my problem, but my custom flatmap function has to be retired?
发件人: Yun Tang [mailto:[hidden email]]
发送时间: 2020年8月25日
星期二 10:58
收件人: 范超 <[hidden email]>; user <[hidden email]>
主题: Re: How to sink invalid data from flatmap
Hi Chao
I think side output [1] might meet your requirements.
Best
Yun Tang
From:
范超 <[hidden email]>
Sent: Tuesday, August 25, 2020 10:54
To: user <[hidden email]>
Subject: How to sink invalid data from flatmap
Hi,
I’m using the custom flatmap function to validate the kafka json string message, if the kafka message is valid to transform to a pojo (using GSON), then go on with the next sink step.
If it can not be parsed as a POJO, the GSON will throw the
“com.google.gson.JsonSyntaxException”, and in my custom flatmap function , I just catch this exception, and then go on, but this invalidated json message is just omitted.
But now, I want to save the invalidated json message to sink the original kafka json string to another table, but don’t know how to implement in my custom flatmap function, because the richmapfucntion
has limited the collect type.
Could someone give me some advice please?
Thanks in advance!
Chao Fan
Free forum by Nabble | Edit this page |