Restore state from save point with add new flink sql

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Restore state from save point with add new flink sql

James (Jian Wu) [FDS Data Platform]

Hi:

 

   My application use flink sql, I want to add new sql to the application,

 

For example first version is

 

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

Then second version, I add new sql

 

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql)

 

Regards

 

James

 

Reply | Threaded
Open this post in threaded view
|

Re: Restore state from save point with add new flink sql

Fabian Hueske-2
Hi,

At the moment (Flink 1.5.0), the operator UIDs depend on the overall application and not only on the query.
Hence, changing the application by adding another query might change the existing UIDs.

In general, you can only expect savepoint restarts to work if you don't change the application and stick to the same Flink version.
We are not ready yet to guarantee savepoint compatibility across versions yet.

Best,
Fabian


2018-06-15 9:59 GMT+02:00 James (Jian Wu) [FDS Data Platform] <[hidden email]>:

Hi:

 

   My application use flink sql, I want to add new sql to the application,

 

For example first version is

 

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

Then second version, I add new sql

 

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql)

 

Regards

 

James

 


Reply | Threaded
Open this post in threaded view
|

Re: Restore state from save point with add new flink sql

Till Rohrmann
In reply to this post by James (Jian Wu) [FDS Data Platform]
Hi James,

as long as you do not change anything for `sql1`, it should work to recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to the CLI when resuming your program from the savepoint. The reason is that an operators generated uid depends on the operator and on its inputs.

I've also pulled in Fabian and Timo who will be able to tell you a little bit more about the job modification support for stream SQL.

Cheers,
Till

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

   My application use flink sql, I want to add new sql to the application,

 

For example first version is

 

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

Then second version, I add new sql

 

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql)

 

Regards

 

James

 

Reply | Threaded
Open this post in threaded view
|

Re: Restore state from save point with add new flink sql

James (Jian Wu) [FDS Data Platform]

Hi Till:

 

Thanks for your answer, so if I just add new sql and not modified old sql then use `/`--allowNonRestoredState option to restart job can resume old sql state from savepoints?

 

Regards

 

James

 

From: Till Rohrmann <[hidden email]>
Date: Friday, June 15, 2018 at 8:13 PM
To: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Cc: user <[hidden email]>, Fabian Hueske <[hidden email]>, Timo Walther <[hidden email]>
Subject: Re: Restore state from save point with add new flink sql

 

Hi James,

 

as long as you do not change anything for `sql1`, it should work to recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to the CLI when resuming your program from the savepoint. The reason is that an operators generated uid depends on the operator and on its inputs.

 

I've also pulled in Fabian and Timo who will be able to tell you a little bit more about the job modification support for stream SQL.

 

Cheers,
Till

 

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

   My application use flink sql, I want to add new sql to the application,

 

For example first version is

 

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

Then second version, I add new sql

 

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql)

 

Regards

 

James

 

Reply | Threaded
Open this post in threaded view
|

Re: Restore state from save point with add new flink sql

Till Rohrmann
I think so. Maybe Fabian or Timo can correct me if I'm wrong here.

On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi Till:

 

Thanks for your answer, so if I just add new sql and not modified old sql then use `/`--allowNonRestoredState option to restart job can resume old sql state from savepoints?

 

Regards

 

James

 

From: Till Rohrmann <[hidden email]>
Date: Friday, June 15, 2018 at 8:13 PM
To: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Cc: user <[hidden email]>, Fabian Hueske <[hidden email]>, Timo Walther <[hidden email]>
Subject: Re: Restore state from save point with add new flink sql

 

Hi James,

 

as long as you do not change anything for `sql1`, it should work to recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to the CLI when resuming your program from the savepoint. The reason is that an operators generated uid depends on the operator and on its inputs.

 

I've also pulled in Fabian and Timo who will be able to tell you a little bit more about the job modification support for stream SQL.

 

Cheers,
Till

 

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

   My application use flink sql, I want to add new sql to the application,

 

For example first version is

 

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

Then second version, I add new sql

 

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql)

 

Regards

 

James

 

Reply | Threaded
Open this post in threaded view
|

Re: Restore state from save point with add new flink sql

Hequn Cheng
Hi 

I'm not sure about the answer. I have a feeling that if we only add new code below the old code(i.e., append new code after old code), the uid will not be changed.

On Tue, Jun 26, 2018 at 3:06 PM, Till Rohrmann <[hidden email]> wrote:
I think so. Maybe Fabian or Timo can correct me if I'm wrong here.

On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi Till:

 

Thanks for your answer, so if I just add new sql and not modified old sql then use `/`--allowNonRestoredState option to restart job can resume old sql state from savepoints?

 

Regards

 

James

 

From: Till Rohrmann <[hidden email]>
Date: Friday, June 15, 2018 at 8:13 PM
To: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Cc: user <[hidden email]>, Fabian Hueske <[hidden email]>, Timo Walther <[hidden email]>
Subject: Re: Restore state from save point with add new flink sql

 

Hi James,

 

as long as you do not change anything for `sql1`, it should work to recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to the CLI when resuming your program from the savepoint. The reason is that an operators generated uid depends on the operator and on its inputs.

 

I've also pulled in Fabian and Timo who will be able to tell you a little bit more about the job modification support for stream SQL.

 

Cheers,
Till

 

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

   My application use flink sql, I want to add new sql to the application,

 

For example first version is

 

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

Then second version, I add new sql

 

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql)

 

Regards

 

James

 


Reply | Threaded
Open this post in threaded view
|

Re: Restore state from save point with add new flink sql

Till Rohrmann
As long as the inputs don't change, this should be correct.

On Tue, Jun 26, 2018 at 2:35 PM Hequn Cheng <[hidden email]> wrote:
Hi 

I'm not sure about the answer. I have a feeling that if we only add new code below the old code(i.e., append new code after old code), the uid will not be changed.

On Tue, Jun 26, 2018 at 3:06 PM, Till Rohrmann <[hidden email]> wrote:
I think so. Maybe Fabian or Timo can correct me if I'm wrong here.

On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi Till:

 

Thanks for your answer, so if I just add new sql and not modified old sql then use `/`--allowNonRestoredState option to restart job can resume old sql state from savepoints?

 

Regards

 

James

 

From: Till Rohrmann <[hidden email]>
Date: Friday, June 15, 2018 at 8:13 PM
To: "James (Jian Wu) [FDS Data Platform]" <[hidden email]>
Cc: user <[hidden email]>, Fabian Hueske <[hidden email]>, Timo Walther <[hidden email]>
Subject: Re: Restore state from save point with add new flink sql

 

Hi James,

 

as long as you do not change anything for `sql1`, it should work to recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to the CLI when resuming your program from the savepoint. The reason is that an operators generated uid depends on the operator and on its inputs.

 

I've also pulled in Fabian and Timo who will be able to tell you a little bit more about the job modification support for stream SQL.

 

Cheers,
Till

 

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <[hidden email]> wrote:

Hi:

 

   My application use flink sql, I want to add new sql to the application,

 

For example first version is

 

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

Then second version, I add new sql

 

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

 

After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql)

 

Regards

 

James