Customer operator in BATCH execution mode

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Customer operator in BATCH execution mode

ChangZhuo Chen (陳昌倬)
Hi,

Currently, we want to use batch execution mode [0] and historical data
to build state for our streaming application. Due to different between
batch & streaming mode, we want to check current execution mode in
custom operator. So our question is:


* Is there any API for custom operator to know current execution mode
  (batch or streaming)?

* If we want to output after all elements of one specific key are
  processed, can we just use timer since timer is triggered at the end
  of input [0]?


[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/

--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Customer operator in BATCH execution mode

Piotr Nowojski-4
Hi,

1. I don't know if there is a built-in way of doing it. You can always pass this information anyway on your own when you are starting the job (via operator/function's constructors).
2. Yes, I think this should work.

Best,
Piotrek

wt., 25 maj 2021 o 17:05 ChangZhuo Chen (陳昌倬) <[hidden email]> napisał(a):
Hi,

Currently, we want to use batch execution mode [0] and historical data
to build state for our streaming application. Due to different between
batch & streaming mode, we want to check current execution mode in
custom operator. So our question is:


* Is there any API for custom operator to know current execution mode
  (batch or streaming)?

* If we want to output after all elements of one specific key are
  processed, can we just use timer since timer is triggered at the end
  of input [0]?


[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/

--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: Customer operator in BATCH execution mode

Dawid Wysakowicz-2
In reply to this post by ChangZhuo Chen (陳昌倬)
Hi,

No there is no API in the operator to know which mode it works in. We
aim to have separate operators for both modes if required. You can check
e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
you want to apply a transformation at the end of each key. You could
also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

A side note, I don't fully get what you mean by "build state for our
streaming application". Bear in mind though you cannot take a savepoint
from a job running in the BATCH execution mode. Moreover it uses a
different kind of StateBackend. Actually a dummy one, which just
imitates a real state backend.

Best,

Dawid


[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/KeyedBroadcastStateTransformationTranslator.java

On 25/05/2021 17:04, ChangZhuo Chen (陳昌倬) wrote:

> Hi,
>
> Currently, we want to use batch execution mode [0] and historical data
> to build state for our streaming application. Due to different between
> batch & streaming mode, we want to check current execution mode in
> custom operator. So our question is:
>
>
> * Is there any API for custom operator to know current execution mode
>   (batch or streaming)?
>
> * If we want to output after all elements of one specific key are
>   processed, can we just use timer since timer is triggered at the end
>   of input [0]?
>
>
> [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
>


OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Customer operator in BATCH execution mode

ChangZhuo Chen (陳昌倬)
On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote:
> Hi,
>
> No there is no API in the operator to know which mode it works in. We
> aim to have separate operators for both modes if required. You can check
> e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Thanks for the information. We implement this according to Piotrek's
suggestion.

>
> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
> you want to apply a transformation at the end of each key. You could
> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

According to [0], timer time is irrelevant since timer will be triggered
at the end of time right? If that is the case, we can use the same code
for both streaming and batch mode.

[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/


>
> A side note, I don't fully get what you mean by "build state for our
> streaming application". Bear in mind though you cannot take a savepoint
> from a job running in the BATCH execution mode. Moreover it uses a
> different kind of StateBackend. Actually a dummy one, which just
> imitates a real state backend.

What we plan to do here is:

1. Load configuration from broadcast event (custom source backed by REST
   API).
2. Load historical events as batch mode input (From GCS).
3. Use timer to trigger output so that the following will happen:
   a. Serialize keyed states into JSON.
   b. Output to Kafka.
   c. Streaming application consumes data from Kafka, and update its
      keyed states according to it.

We hope that in this way, we can rebuild our states with almost the same
code in streaming.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Customer operator in BATCH execution mode

Piotr Nowojski-4
>>
>> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
>> you want to apply a transformation at the end of each key. You could
>> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.
>
> According to [0], timer time is irrelevant since timer will be triggered
> at the end of time right? If that is the case, we can use the same code
> for both streaming and batch mode.

Yes, timers will fire regardless of it's value. However what I believe Dawid meant, is that if you pick a value not very far from the future, you are risking that the timer will fire while your job is still running. Picking MAX_WATERMARK would prevent that from happening. 

> Currently, we want to use batch execution mode [0] and historical data
> to build state for our streaming application
(...)
> We hope that in this way, we can rebuild our states with almost the same code in streaming.

If that's your main purpose, you can also consider using State Processor API [1] to bootstrap the state of your job. That's after the main purpose of the State Processor API.

Piotrek


śr., 26 maj 2021 o 14:04 ChangZhuo Chen (陳昌倬) <[hidden email]> napisał(a):
On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote:
> Hi,
>
> No there is no API in the operator to know which mode it works in. We
> aim to have separate operators for both modes if required. You can check
> e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Thanks for the information. We implement this according to Piotrek's
suggestion.

>
> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
> you want to apply a transformation at the end of each key. You could
> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

According to [0], timer time is irrelevant since timer will be triggered
at the end of time right? If that is the case, we can use the same code
for both streaming and batch mode.

[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/


>
> A side note, I don't fully get what you mean by "build state for our
> streaming application". Bear in mind though you cannot take a savepoint
> from a job running in the BATCH execution mode. Moreover it uses a
> different kind of StateBackend. Actually a dummy one, which just
> imitates a real state backend.

What we plan to do here is:

1. Load configuration from broadcast event (custom source backed by REST
   API).
2. Load historical events as batch mode input (From GCS).
3. Use timer to trigger output so that the following will happen:
   a. Serialize keyed states into JSON.
   b. Output to Kafka.
   c. Streaming application consumes data from Kafka, and update its
      keyed states according to it.

We hope that in this way, we can rebuild our states with almost the same
code in streaming.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B