Hey,
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source (in my case, Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out an event (as part of the FilterFunction).
Problem arrises when I use that FilterFunction on the DataSourceStream - the handler's open() function hasn't been called and thus the metrics have never been initiated.
Do I have a way of making this work? Or any other way of counting events that have been filtered out from the DataStreamSource?
Handler:
public abstract class Handler extends RichMapFunction<Event, String> {
And when I init the DataStreamSource:
Handler handler = (Handler) Class.forName(handlerName).getConstructor().newInstance();
Any help would be much appreciated!
Thanks 🙂
|
Hi Sidney, Firstly, the `open` method of UDF's instance is always invoked when the task thread starts to run. From the second code snippet image that you provided, I guess you are trying to get a dynamic handler with reflection technology, is that correct? I also guess that you want to get a dynamic instance of a handler in the runtime, correct me if I am wrong. IMO, you may misunderstand the program you write and the runtime of Task, the purpose of your program is used to build the job graph. The business logic in UDF is used to describe the user's business logic. For your scene, if many types of events exist in one topic, you can consume them and group by the type then count them? Best, Vino Sidney Feiner <[hidden email]> 于2019年12月16日周一 上午12:54写道:
|
You are right with everything you say!
The solution you propose is actually what I'm trying to avoid. I'd prefer not to consume messages I don't plan on actually handling.
But from what you say it sounds I have no other choice. Am I right? I MUST consume the messages, count those I want to filter out and then simply not handle them?
Which means I must filter them in the task itself and I have no way of filtering them directly from the data source?
Sidney Feiner / Data
Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp
From: vino yang <[hidden email]>
Sent: Monday, December 16, 2019 7:56 AM To: Sidney Feiner <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: Re: Fw: Metrics based on data filtered from DataStreamSource Hi Sidney,
Firstly, the `open` method of UDF's instance is always invoked when the task thread starts to run.
From the second code snippet image that you provided, I guess you are trying to get a dynamic handler with reflection technology, is that correct? I also guess that you want to get a dynamic instance of a handler in the runtime, correct me if I am wrong.
IMO, you may misunderstand the program you write and the runtime of Task, the purpose of your program is used to build the job graph. The business logic in UDF is used to describe the user's business logic.
For your scene, if many types of events exist in one topic, you can consume them and group by the type then count them?
Best,
Vino
Sidney Feiner <[hidden email]> 于2019年12月16日周一 上午12:54写道:
|
Hi Sideny, >> I'd prefer not to consume messages I don't plan on actually handling. It depends on your design. If you produce different types into different partitions, then it's easy to filter different types from the Kafka consumer(only consume partial partition). If you do not distinguish different types in the partitions of the Kafka topic. You can filter messages based on type in Flink job. >> I MUST consume the messages, count those I want to filter out and then simply not handle them? I did not say "you MUST", I said "you can". Actually, there are serval solutions. e.g. 1) I described in the last mail; 2) filter in flink source; 3) filter via flink filter transform function 4) side output/split, selet Choosing one solution that suite your scene. The key thing in my last mail is to describe the problem of your reflection problem. Best, Vino Sidney Feiner <[hidden email]> 于2019年12月16日周一 下午9:31写道:
|
Hey Sidney, for the .filter() function, you are passing a function without an open() method. The function that getFilter() returns, has no open() method. What could work is creating a Handler extends AbstractRichFunction implements MapFunction, FilterFunction and passing those instances into the filter() and map() operator. On Tue, Dec 17, 2019 at 3:18 AM vino yang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |