how to hold a stream until another stream is drained?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

how to hold a stream until another stream is drained?

刘宇宝

I’m using JDBCInputFormat to read snapshot of a MySQL table  and FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.

 

DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));

DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);

 

// map() is to convert two streams into same type:  (action,  fields…),  where action is “insert”, “update”, “delete”.  The action for “snapshotStream” is always “insert”.

DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));

 

tableStream.print();

env.execute(“example”);

 

  1. To make sure “tableStream” doesn’t miss any row,  the “binlogStream” must connect to  Kafka first so that binlog starts before the table snapshot,  I can roughly achieve this by “myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() – 600*1000)”.
  2. To make sure changes from “binlogStream” always overwrite upon “snapshotStream”,   I need a way to hold “binlogStream”  until “snapshotStream” is drained,  so that changes from “binlogStream” are all behind changes from “snapshotStream”.  How can I achieve this ?

 

I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and JDBCInputFormat,  but they are different on parallelism  and checkpointing,  I’m not sure how to get the wrapper right and even whether it’s right direction.

 

Any suggestion will be very appreciated!

 

Reply | Threaded
Open this post in threaded view
|

Re: how to hold a stream until another stream is drained?

Fabian Hueske-2
Hi,

With Flink streaming operators

However, these parts are currently being reworked to enable a better integration of batch and streaming use cases (or hybrid use cases such as yours).
A while back, we wrote a blog post about these plans [1]:

> "Unified Stream Operators: Blink extends the Flink streaming runtime operator model to support selectively reading from different inputs, while keeping the push model for very low latency. This control over the inputs helps to now support algorithms like hybrid hash-joins on the same operator and threading model as continuous symmetric joins through RocksDB. These operators also form the basis for future features like “Side Inputs”."

I'm not familiar with the internal details here, but I found the InputSelectable [2] interface that looks like it would do what you are looking for.
Note that this interface is not used on the higher-level DataStream API level, but rather on the lower StreamOperator level.

Best, Fabian





W

Am Mo., 6. Apr. 2020 um 12:49 Uhr schrieb 刘宇宝 <[hidden email]>:

I’m using JDBCInputFormat to read snapshot of a MySQL table  and FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.

 

DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));

DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);

 

// map() is to convert two streams into same type:  (action,  fields…),  where action is “insert”, “update”, “delete”.  The action for “snapshotStream” is always “insert”.

DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));

 

tableStream.print();

env.execute(“example”);

 

  1. To make sure “tableStream” doesn’t miss any row,  the “binlogStream” must connect to  Kafka first so that binlog starts before the table snapshot,  I can roughly achieve this by “myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() – 600*1000)”.
  2. To make sure changes from “binlogStream” always overwrite upon “snapshotStream”,   I need a way to hold “binlogStream”  until “snapshotStream” is drained,  so that changes from “binlogStream” are all behind changes from “snapshotStream”.  How can I achieve this ?

 

I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and JDBCInputFormat,  but they are different on parallelism  and checkpointing,  I’m not sure how to get the wrapper right and even whether it’s right direction.

 

Any suggestion will be very appreciated!

 

Reply | Threaded
Open this post in threaded view
|

Re: how to hold a stream until another stream is drained?

刘宇宝
Hi Fabian,

Thank you very much,  I almost get it working with InputSelectable:

       DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
       DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
       DataStream tableStream = snapshotStream.connect(binlogstream);
       tableStream.transform(“Concat”, new TypeHint<….>(){},   new SequentialReadingStreamOperator<>());

The “SequentialReadingStreamOperator” is basically copied from https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestSequentialReadingStreamOperator.java

But if I enable checkpoining with “streamEnv.enableCheckpointing(10000);”,   Flink throws exception below,   any idea to resolve that?

Caused by: java.lang.UnsupportedOperationException: Checkpointing is currently not supported for operators that implement InputSelectable:example.SequentialReadingStreamOperator
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:219)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
        at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
        at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
        at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
        at org.apache.flink.client.deployment.executors.ExecutorUtils.getJobGraph(ExecutorUtils.java:51)
        at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:57)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at example.Main.main(Main.java:72)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:564)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        ... 8 more

Regards,
Yubao Liu

------------------------------------------------------------------------------------------------------------------------
From: Fabian Hueske <[hidden email]>
Date: Tuesday, April 7, 2020 at 4:45 AM
To: 刘宇宝 <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: how to hold a stream until another stream is drained?

Hi,

With Flink streaming operators

However, these parts are currently being reworked to enable a better integration of batch and streaming use cases (or hybrid use cases such as yours).
A while back, we wrote a blog post about these plans [1]:

> "Unified Stream Operators: Blink extends the Flink streaming runtime operator model to support selectively reading from different inputs, while keeping the push model for very low latency. This control over the inputs helps to now support algorithms like hybrid hash-joins on the same operator and threading model as continuous symmetric joins through RocksDB. These operators also form the basis for future features like https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API."

I'm not familiar with the internal details here, but I found the InputSelectable [2] interface that looks like it would do what you are looking for.
Note that this interface is not used on the higher-level DataStream API level, but rather on the lower StreamOperator level.

Best, Fabian

[1] https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
[2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java




W

Am Mo., 6. Apr. 2020 um 12:49 Uhr schrieb 刘宇宝 <mailto:[hidden email]>:
I’m using JDBCInputFormat to read snapshot of a MySQL table  and FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.
 
       DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
       DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
        
       // map() is to convert two streams into same type:  (action,  fields…),  where action is “insert”, “update”, “delete”.  The action for “snapshotStream” is always “insert”.
       DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));
        
       tableStream.print();
       env.execute(“example”);
 
1. To make sure “tableStream” doesn’t miss any row,  the “binlogStream” must connect to  Kafka first so that binlog starts before the table snapshot,  I can roughly achieve this by “myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() – 600*1000)”.
2. To make sure changes from “binlogStream” always overwrite upon “snapshotStream”,   I need a way to hold “binlogStream”  until “snapshotStream” is drained,  so that changes from “binlogStream” are all behind changes from “snapshotStream”.  How can I achieve this ?
 
I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and JDBCInputFormat,  but they are different on parallelism  and checkpointing,  I’m not sure how to get the wrapper right and even whether it’s right direction.
 
Any suggestion will be very appreciated!
 

Reply | Threaded
Open this post in threaded view
|

Re: how to hold a stream until another stream is drained?

刘宇宝
+Sun Haibo  who added that validation in https://issues.apache.org/jira/browse/FLINK-11879

Hi  Haibo,

Any suggestion how to enable checkpointing for InputSelectable and BounedMultiInput?

Thanks,
Yubao Liu

On 2020/4/10, 10:21 PM, "刘宇宝" <[hidden email]> wrote:

    Hi Fabian,
   
    Thank you very much,  I almost get it working with InputSelectable:
   
           DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
           DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
           DataStream tableStream = snapshotStream.connect(binlogstream);
           tableStream.transform(“Concat”, new TypeHint<….>(){},   new SequentialReadingStreamOperator<>());
   
    The “SequentialReadingStreamOperator” is basically copied from https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestSequentialReadingStreamOperator.java
   
    But if I enable checkpoining with “streamEnv.enableCheckpointing(10000);”,   Flink throws exception below,   any idea to resolve that?
   
    Caused by: java.lang.UnsupportedOperationException: Checkpointing is currently not supported for operators that implement InputSelectable:example.SequentialReadingStreamOperator
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:219)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
    at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
    at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
    at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
    at org.apache.flink.client.deployment.executors.ExecutorUtils.getJobGraph(ExecutorUtils.java:51)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:57)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
    at example.Main.main(Main.java:72)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
    ... 8 more
   
    Regards,
    Yubao Liu
   
    ------------------------------------------------------------------------------------------------------------------------
    From: Fabian Hueske <[hidden email]>
    Date: Tuesday, April 7, 2020 at 4:45 AM
    To: 刘宇宝 <[hidden email]>
    Cc: user <[hidden email]>
    Subject: Re: how to hold a stream until another stream is drained?
   
    Hi,
   
    With Flink streaming operators
   
    However, these parts are currently being reworked to enable a better integration of batch and streaming use cases (or hybrid use cases such as yours).
    A while back, we wrote a blog post about these plans [1]:
   
    > "Unified Stream Operators: Blink extends the Flink streaming runtime operator model to support selectively reading from different inputs, while keeping the push model for very low latency. This control over the inputs helps to now support algorithms like hybrid hash-joins on the same operator and threading model as continuous symmetric joins through RocksDB. These operators also form the basis for future features like https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API."
   
    I'm not familiar with the internal details here, but I found the InputSelectable [2] interface that looks like it would do what you are looking for.
    Note that this interface is not used on the higher-level DataStream API level, but rather on the lower StreamOperator level.
   
    Best, Fabian
   
    [1] https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
    [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java
   
   
   
   
    W
   
    Am Mo., 6. Apr. 2020 um 12:49 Uhr schrieb 刘宇宝 <mailto:[hidden email]>:
    I’m using JDBCInputFormat to read snapshot of a MySQL table  and FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.
     
           DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
           DataStream snapshotStream = env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
           
           // map() is to convert two streams into same type:  (action,  fields…),  where action is “insert”, “update”, “delete”.  The action for “snapshotStream” is always “insert”.
           DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));
           
           tableStream.print();
           env.execute(“example”);
     
    1. To make sure “tableStream” doesn’t miss any row,  the “binlogStream” must connect to  Kafka first so that binlog starts before the table snapshot,  I can roughly achieve this by “myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() – 600*1000)”.
    2. To make sure changes from “binlogStream” always overwrite upon “snapshotStream”,   I need a way to hold “binlogStream”  until “snapshotStream” is drained,  so that changes from “binlogStream” are all behind changes from “snapshotStream”.  How can I achieve this ?
     
    I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and JDBCInputFormat,  but they are different on parallelism  and checkpointing,  I’m not sure how to get the wrapper right and even whether it’s right direction.
     
    Any suggestion will be very appreciated!