Batch with Flink Steraming API version 1.12.0

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Batch with Flink Steraming API version 1.12.0

Robert Cullen
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Batch with Flink Steraming API version 1.12.0

Arvid Heise-3
Hi Robert,

you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBounded with the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that should be addressed by the first bugfix release of 1.12 in this month. So while it's safe to use for development, I'd wait for 1.12.1 to roll it out on production.

If you have specific questions on the migration from DataSet and DataStream, please let me know.


On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[hidden email]> wrote:
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Batch with Flink Steraming API version 1.12.0

Robert Cullen

Arvid,

Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setUnbounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);

On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <[hidden email]> wrote:
Hi Robert,

you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBounded with the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that should be addressed by the first bugfix release of 1.12 in this month. So while it's safe to use for development, I'd wait for 1.12.1 to roll it out on production.

If you have specific questions on the migration from DataSet and DataStream, please let me know.


On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[hidden email]> wrote:
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Batch with Flink Steraming API version 1.12.0

Arvid Heise-3
Robert,

here I modified your example with some highlights.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setBounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);
You can also explicitely set but that shouldn't be necessary (and may make things more complicated once you also want to execute the application in streaming mode).
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setUnbounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);

On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <[hidden email]> wrote:
Hi Robert,

you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBounded with the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that should be addressed by the first bugfix release of 1.12 in this month. So while it's safe to use for development, I'd wait for 1.12.1 to roll it out on production.

If you have specific questions on the migration from DataSet and DataStream, please let me know.


On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[hidden email]> wrote:
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Batch with Flink Steraming API version 1.12.0

Robert Cullen

Arvid,

Thank you. Sorry, my last post was not clear. This line:

env.addSource(source)

does not compile since addSource is expecting a SourceFunction not a KafkaSource type.


On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <[hidden email]> wrote:
Robert,

here I modified your example with some highlights.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setBounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);
You can also explicitely set but that shouldn't be necessary (and may make things more complicated once you also want to execute the application in streaming mode).
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setUnbounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);

On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <[hidden email]> wrote:
Hi Robert,

you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBounded with the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that should be addressed by the first bugfix release of 1.12 in this month. So while it's safe to use for development, I'd wait for 1.12.1 to roll it out on production.

If you have specific questions on the migration from DataSet and DataStream, please let me know.


On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[hidden email]> wrote:
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Batch with Flink Steraming API version 1.12.0

Arvid Heise-3
Sorry Robert for not checking the complete example. New sources are used with fromSource instead of addSource. It's not ideal but hopefully we can remove the old way rather soonish to avoid confusion.

On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thank you. Sorry, my last post was not clear. This line:

env.addSource(source)

does not compile since addSource is expecting a SourceFunction not a KafkaSource type.


On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <[hidden email]> wrote:
Robert,

here I modified your example with some highlights.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setBounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);
You can also explicitely set but that shouldn't be necessary (and may make things more complicated once you also want to execute the application in streaming mode).
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setUnbounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);

On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <[hidden email]> wrote:
Hi Robert,

you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBounded with the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that should be addressed by the first bugfix release of 1.12 in this month. So while it's safe to use for development, I'd wait for 1.12.1 to roll it out on production.

If you have specific questions on the migration from DataSet and DataStream, please let me know.


On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[hidden email]> wrote:
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Batch with Flink Steraming API version 1.12.0

Robert Cullen

Arvid,

I’m hoping to get your input on a process I’m working on. Originally I was using a streaming solution but noticed that the data in the sliding windows was getting too large over longer intervals and sometimes stopped processing altogether. Anyway, the total counts should be a fixed number so a batch process would be more acceptable.

The use case is this: Get counts on keys for 30 minutes of data, take those totals and take a 30 second time slice on the same data, possibly consecutive time slices, take the results and run it through one function: Originally my code looked like this using Sliding Time Windows in streaming mode:

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<FluentdMessage> stream = env
                .addSource(getConsumer(properties))
                .name("Kafka Source");

        DataStream<Tuple2<String, Long>> keyedCounts  = stream
                .filter(value -> value.getGrokName() != null)
                .map(new MapFunction<FluentdMessage, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(FluentdMessage value) throws Exception {
                        return Tuple2.of(value.getGrokName(), 1L);
                    }
                })
                .keyBy(value -> value.f0)
                .window(SlidingProcessingTimeWindows.of(Time.minutes(30), Time.seconds(30)))
                .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
                //.sum(2);
                .reduce((ReduceFunction<Tuple2<String, Long>>) (data1, data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));

       keyedCounts
                .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(30), Time.seconds(30)))
                .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
                .process(new ProcessAllWindowFunction<Tuple2<String, Long>, Tuple5<String, Long, Long, String, Long>, TimeWindow>() {

                    private ValueState<Long> currentCount;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        currentCount = getRuntimeContext().getState(
                                new ValueStateDescriptor<>("count", Long.class));
                    }

                    @Override
                    public void process(Context context,
                                        Iterable<Tuple2<String, Long>> iterable,
                                        Collector<Tuple5<String, Long, Long, String, Long>> out) throws Exception {
                        long count = StreamSupport.stream(iterable.spliterator(), false).count();
                        if(currentCount.value() == null) {
                            currentCount.update(0L);
                        }
                        Iterator<Tuple2<String, Long>> iterator = iterable.iterator();
                        Map<String, Long> map = new HashMap<>();
                        Map<String, List<Long>> keyTotalMap = new HashMap<>();

                        if(currentCount.value() < count) {
                            while (iterator.hasNext()) {
                                Tuple2<String, Long> tuple = iterator.next();
                                map.put(tuple.f0, keyDifference(tuple.f0, iterable));
                                keyTotalMap.computeIfAbsent(tuple.f0, k -> new ArrayList<>()).add(tuple.f1);
                                //out.collect(Tuple3.of(tuple.f0, keyDifference(tuple.f0, iterable), sum(iterable)));
                            }

                            map.forEach((key, value) -> {
                                if(value > 0L) {
                                    out.collect(Tuple5.of(
                                            key,
                                            value,
                                            sum(key, keyTotalMap),
                                            getChiSqrLoggerScore(value, sumKeys(map), sum(key, keyTotalMap), sum(keyTotalMap)),
                                            System.currentTimeMillis()));
                                }});

                            //out.collect(Tuple5.of(null, null, null, null, null));
                            currentCount.update(count);
                        } else {
                            //This is currently the only way to force the job to end
                            throw new InterruptedException();
                        }
                    }
                })
               .addSink(new RichChiLoggerInputSink())
               .name("Postgres Sink");

        //globalCounts.writeAsText("s3://argo-workflow-bucket/output.txt");
        env.execute("Flink Kafka Chi Log Runner");

This does not work in batch mode. So I need some guidance. Thanks!


On Tue, Jan 5, 2021 at 11:29 AM Arvid Heise <[hidden email]> wrote:
Sorry Robert for not checking the complete example. New sources are used with fromSource instead of addSource. It's not ideal but hopefully we can remove the old way rather soonish to avoid confusion.

On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thank you. Sorry, my last post was not clear. This line:

env.addSource(source)

does not compile since addSource is expecting a SourceFunction not a KafkaSource type.


On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <[hidden email]> wrote:
Robert,

here I modified your example with some highlights.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setBounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);
You can also explicitely set but that shouldn't be necessary (and may make things more complicated once you also want to execute the application in streaming mode).
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setUnbounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);

On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <[hidden email]> wrote:
Hi Robert,

you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBounded with the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that should be addressed by the first bugfix release of 1.12 in this month. So while it's safe to use for development, I'd wait for 1.12.1 to roll it out on production.

If you have specific questions on the migration from DataSet and DataStream, please let me know.


On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[hidden email]> wrote:
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Batch with Flink Steraming API version 1.12.0

Arvid Heise-3
Hi Robert,

The most reliable way to use batch mode in streaming is to use event time [1]. Processing time windows or ingestion time does not make a whole lot of sense if you want to do some kind of reprocessing (indeterministic results and resource usage because the timestamp of records change with every execution).

For windows to work in event time, you often need to define watermark strategy [2]. Note that in your example, you used the old source which doesn't support batch execution mode.

Here is a sketch on how I'd modify it
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

KafkaSource<FluentdMessage> source =
KafkaSource.<FluentdMessage>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
DataStream<FluentdMessage> stream = env .fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
Note that the specific watermark strategy depends on your data. I have chosen the most common strategy for Kafka which assumes that in each partition timestamps are (non-strictly) increasing. If you have some out of order events, you probably need forBoundedOutOfOrderness.


On Tue, Jan 5, 2021 at 10:21 PM Robert Cullen <[hidden email]> wrote:

Arvid,

I’m hoping to get your input on a process I’m working on. Originally I was using a streaming solution but noticed that the data in the sliding windows was getting too large over longer intervals and sometimes stopped processing altogether. Anyway, the total counts should be a fixed number so a batch process would be more acceptable.

The use case is this: Get counts on keys for 30 minutes of data, take those totals and take a 30 second time slice on the same data, possibly consecutive time slices, take the results and run it through one function: Originally my code looked like this using Sliding Time Windows in streaming mode:

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<FluentdMessage> stream = env
                .addSource(getConsumer(properties))
                .name("Kafka Source");

        DataStream<Tuple2<String, Long>> keyedCounts  = stream
                .filter(value -> value.getGrokName() != null)
                .map(new MapFunction<FluentdMessage, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(FluentdMessage value) throws Exception {
                        return Tuple2.of(value.getGrokName(), 1L);
                    }
                })
                .keyBy(value -> value.f0)
                .window(SlidingProcessingTimeWindows.of(Time.minutes(30), Time.seconds(30)))
                .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
                //.sum(2);
                .reduce((ReduceFunction<Tuple2<String, Long>>) (data1, data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));

       keyedCounts
                .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(30), Time.seconds(30)))
                .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
                .process(new ProcessAllWindowFunction<Tuple2<String, Long>, Tuple5<String, Long, Long, String, Long>, TimeWindow>() {

                    private ValueState<Long> currentCount;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        currentCount = getRuntimeContext().getState(
                                new ValueStateDescriptor<>("count", Long.class));
                    }

                    @Override
                    public void process(Context context,
                                        Iterable<Tuple2<String, Long>> iterable,
                                        Collector<Tuple5<String, Long, Long, String, Long>> out) throws Exception {
                        long count = StreamSupport.stream(iterable.spliterator(), false).count();
                        if(currentCount.value() == null) {
                            currentCount.update(0L);
                        }
                        Iterator<Tuple2<String, Long>> iterator = iterable.iterator();
                        Map<String, Long> map = new HashMap<>();
                        Map<String, List<Long>> keyTotalMap = new HashMap<>();

                        if(currentCount.value() < count) {
                            while (iterator.hasNext()) {
                                Tuple2<String, Long> tuple = iterator.next();
                                map.put(tuple.f0, keyDifference(tuple.f0, iterable));
                                keyTotalMap.computeIfAbsent(tuple.f0, k -> new ArrayList<>()).add(tuple.f1);
                                //out.collect(Tuple3.of(tuple.f0, keyDifference(tuple.f0, iterable), sum(iterable)));
                            }

                            map.forEach((key, value) -> {
                                if(value > 0L) {
                                    out.collect(Tuple5.of(
                                            key,
                                            value,
                                            sum(key, keyTotalMap),
                                            getChiSqrLoggerScore(value, sumKeys(map), sum(key, keyTotalMap), sum(keyTotalMap)),
                                            System.currentTimeMillis()));
                                }});

                            //out.collect(Tuple5.of(null, null, null, null, null));
                            currentCount.update(count);
                        } else {
                            //This is currently the only way to force the job to end
                            throw new InterruptedException();
                        }
                    }
                })
               .addSink(new RichChiLoggerInputSink())
               .name("Postgres Sink");

        //globalCounts.writeAsText("s3://argo-workflow-bucket/output.txt");
        env.execute("Flink Kafka Chi Log Runner");

This does not work in batch mode. So I need some guidance. Thanks!


On Tue, Jan 5, 2021 at 11:29 AM Arvid Heise <[hidden email]> wrote:
Sorry Robert for not checking the complete example. New sources are used with fromSource instead of addSource. It's not ideal but hopefully we can remove the old way rather soonish to avoid confusion.

On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thank you. Sorry, my last post was not clear. This line:

env.addSource(source)

does not compile since addSource is expecting a SourceFunction not a KafkaSource type.


On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise <[hidden email]> wrote:
Robert,

here I modified your example with some highlights.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setBounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);
You can also explicitely set but that shouldn't be necessary (and may make things more complicated once you also want to execute the application in streaming mode).
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen <[hidden email]> wrote:

Arvid,

Thanks, Can you show me an example of how the source is tied to the ExecutionEnivornment.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers("kafka-headless:9092")
        .setTopics(Arrays.asList("log-input"))
        .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
        .setUnbounded(OffsetsInitializer.latest())
        .build();

env.addSource(source);

On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise <[hidden email]> wrote:
Hi Robert,

you basically just (re)write your application with DataStream API, use the new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded. Assuming that you just have a Kafka source, you need to use setBounded with the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that should be addressed by the first bugfix release of 1.12 in this month. So while it's safe to use for development, I'd wait for 1.12.1 to roll it out on production.

If you have specific questions on the migration from DataSet and DataStream, please let me know.


On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen <[hidden email]> wrote:
I have a Kafka source that I would like to run a batch job on.  Since Version 1.12.0 is now soft deprecating the DataSet API in favor of the DataStream API, can someone show me an example of this? (Using DataStream)

thanks
--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Robert Cullen
240-475-4490


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng