Hi,
Currently, we want to use batch execution mode [0] and historical data to build state for our streaming application. Due to different between batch & streaming mode, we want to check current execution mode in custom operator. So our question is: * Is there any API for custom operator to know current execution mode (batch or streaming)? * If we want to output after all elements of one specific key are processed, can we just use timer since timer is triggered at the end of input [0]? [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/ -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
Hi, 1. I don't know if there is a built-in way of doing it. You can always pass this information anyway on your own when you are starting the job (via operator/function's constructors). 2. Yes, I think this should work. Best, Piotrek wt., 25 maj 2021 o 17:05 ChangZhuo Chen (陳昌倬) <[hidden email]> napisał(a): Hi, |
In reply to this post by ChangZhuo Chen (陳昌倬)
Hi,
No there is no API in the operator to know which mode it works in. We aim to have separate operators for both modes if required. You can check e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1]. Yes, it should be possible to register a timer for Long.MAX_WATERMARK if you want to apply a transformation at the end of each key. You could also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode. A side note, I don't fully get what you mean by "build state for our streaming application". Bear in mind though you cannot take a savepoint from a job running in the BATCH execution mode. Moreover it uses a different kind of StateBackend. Actually a dummy one, which just imitates a real state backend. Best, Dawid [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/KeyedBroadcastStateTransformationTranslator.java On 25/05/2021 17:04, ChangZhuo Chen (陳昌倬) wrote: > Hi, > > Currently, we want to use batch execution mode [0] and historical data > to build state for our streaming application. Due to different between > batch & streaming mode, we want to check current execution mode in > custom operator. So our question is: > > > * Is there any API for custom operator to know current execution mode > (batch or streaming)? > > * If we want to output after all elements of one specific key are > processed, can we just use timer since timer is triggered at the end > of input [0]? > > > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/ > OpenPGP_signature (855 bytes) Download Attachment |
On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote:
> Hi, > > No there is no API in the operator to know which mode it works in. We > aim to have separate operators for both modes if required. You can check > e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1]. Thanks for the information. We implement this according to Piotrek's suggestion. > > Yes, it should be possible to register a timer for Long.MAX_WATERMARK if > you want to apply a transformation at the end of each key. You could > also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode. According to [0], timer time is irrelevant since timer will be triggered at the end of time right? If that is the case, we can use the same code for both streaming and batch mode. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/ > > A side note, I don't fully get what you mean by "build state for our > streaming application". Bear in mind though you cannot take a savepoint > from a job running in the BATCH execution mode. Moreover it uses a > different kind of StateBackend. Actually a dummy one, which just > imitates a real state backend. What we plan to do here is: 1. Load configuration from broadcast event (custom source backed by REST API). 2. Load historical events as batch mode input (From GCS). 3. Use timer to trigger output so that the following will happen: a. Serialize keyed states into JSON. b. Output to Kafka. c. Streaming application consumes data from Kafka, and update its keyed states according to it. We hope that in this way, we can rebuild our states with almost the same code in streaming. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
>> >> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if >> you want to apply a transformation at the end of each key. You could >> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode. > > According to [0], timer time is irrelevant since timer will be triggered > at the end of time right? If that is the case, we can use the same code > for both streaming and batch mode. Yes, timers will fire regardless of it's value. However what I believe Dawid meant, is that if you pick a value not very far from the future, you are risking that the timer will fire while your job is still running. Picking MAX_WATERMARK would prevent that from happening. > Currently, we want to use batch execution mode [0] and historical data > to build state for our streaming application(...) > We hope that in this way, we can rebuild our states with almost the same code in streaming. If that's your main purpose, you can also consider using State Processor API [1] to bootstrap the state of your job. That's after the main purpose of the State Processor API. Piotrek śr., 26 maj 2021 o 14:04 ChangZhuo Chen (陳昌倬) <[hidden email]> napisał(a): On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote: |
Free forum by Nabble | Edit this page |