Flink Streaming Counter

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Streaming Counter

Vijayendra Yadav
Hi Team,

Could you provide a sample how to pass Flink Datastream Source and sink results to increment COUNTER and then I want to display the Counter in Local IDE.
Counter to display for #1 through #3.

1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
2) DataStream<String> outputStream = messageStream.rebalance().map(CustomMapFunction());
3) outputStream.addSink(Streaming File Sink).

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Counter

Matthias
Hi Vijayendra,
thanks for reaching out to the Flink community. What do you mean by displaying it in your local IDE? Would it be ok to log the information out onto stdout? You might want to have a look at the docs about setting up a slf4j metrics report [1] if that's the case.

Best,
Matthias


On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you provide a sample how to pass Flink Datastream Source and sink results to increment COUNTER and then I want to display the Counter in Local IDE.
Counter to display for #1 through #3.

1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
2) DataStream<String> outputStream = messageStream.rebalance().map(CustomMapFunction());
3) outputStream.addSink(Streaming File Sink).

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Counter

Vijayendra Yadav
Hi Pohl,

Thanks for getting back to me so quickly. I am looking for a sample example where I can increment counters on each stage #1 thru #3 for DATASTREAM.
Then probably I can print it using slf4j.

Thanks,
Vijay

On Tue, Mar 23, 2021 at 6:35 AM Matthias Pohl <[hidden email]> wrote:
Hi Vijayendra,
thanks for reaching out to the Flink community. What do you mean by displaying it in your local IDE? Would it be ok to log the information out onto stdout? You might want to have a look at the docs about setting up a slf4j metrics report [1] if that's the case.

Best,
Matthias


On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you provide a sample how to pass Flink Datastream Source and sink results to increment COUNTER and then I want to display the Counter in Local IDE.
Counter to display for #1 through #3.

1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
2) DataStream<String> outputStream = messageStream.rebalance().map(CustomMapFunction());
3) outputStream.addSink(Streaming File Sink).

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

Thanks,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Counter

Matthias
Hi Vijayendra,
what about the example from the docs you already referred to [1]?

Best,
Matthias


On Tue, Mar 23, 2021 at 6:48 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Pohl,

Thanks for getting back to me so quickly. I am looking for a sample example where I can increment counters on each stage #1 thru #3 for DATASTREAM.
Then probably I can print it using slf4j.

Thanks,
Vijay

On Tue, Mar 23, 2021 at 6:35 AM Matthias Pohl <[hidden email]> wrote:
Hi Vijayendra,
thanks for reaching out to the Flink community. What do you mean by displaying it in your local IDE? Would it be ok to log the information out onto stdout? You might want to have a look at the docs about setting up a slf4j metrics report [1] if that's the case.

Best,
Matthias


On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you provide a sample how to pass Flink Datastream Source and sink results to increment COUNTER and then I want to display the Counter in Local IDE.
Counter to display for #1 through #3.

1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
2) DataStream<String> outputStream = messageStream.rebalance().map(CustomMapFunction());
3) outputStream.addSink(Streaming File Sink).

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}

Thanks,
Vijay