Bootstrapping the state

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Bootstrapping the state

Henri Heiskanen
Hi,

I've been looking into how to initialise large state and especially checked this presentation by Lyft referenced in this group as well: https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this state and I believe loading this data from s3, creating a savepoint and then restarting in streaming mode from a savepoint would work very well. In the presentation I get an impression that I could read from s3 and when all done (without any custom termination detector etc) I could just make a savepoint by calling the rest api from the app. However, I've noticed that if I read data from files the job will auto-terminate when all data is read and job appears not to be running even if I add the sleep in the main program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating and create the savepoint from outside the app, but that would require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka
public class StreamingJob {

public static void main(String[] args) throws Exception {
if (args.length == 0) {
args = "--initFile init.csv".split(" ");
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool params = ParameterTool.fromArgs(args);

String initFile = params.get("initFile");
if (initFile != null) {
env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String s) throws Exception {
String[] data = s.split(",");
return new Tuple4<String, String, String, String>(data[0], data[1], data[2], data[3]);
}
}).keyBy(0, 1).map(new ProfileInitMapper());
}

// execute program
env.execute("Flink Streaming Java API Skeleton");

// when all data read, save the state
Thread.sleep(10000);
}
}


Reply | Threaded
Open this post in threaded view
|

Re: Bootstrapping the state

vino yang
Hi Henkka,

The behavior of the text file source meets expectation. Flink will not keep your source task thread when it exit from it's invoke method. That means you should keep your source task alive. So to implement this, you should customize a text file source (implement SourceFunction interface).

 For your requirement, you can check a no more data idle time, if expire, then exit, finally the job will stop.

You can also refer the implementation of other source connectors.

Thanks, vino.

2018-07-19 19:52 GMT+08:00 Henri Heiskanen <[hidden email]>:
Hi,

I've been looking into how to initialise large state and especially checked this presentation by Lyft referenced in this group as well: https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this state and I believe loading this data from s3, creating a savepoint and then restarting in streaming mode from a savepoint would work very well. In the presentation I get an impression that I could read from s3 and when all done (without any custom termination detector etc) I could just make a savepoint by calling the rest api from the app. However, I've noticed that if I read data from files the job will auto-terminate when all data is read and job appears not to be running even if I add the sleep in the main program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating and create the savepoint from outside the app, but that would require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka
public class StreamingJob {

public static void main(String[] args) throws Exception {
if (args.length == 0) {
args = "--initFile init.csv".split(" ");
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool params = ParameterTool.fromArgs(args);

String initFile = params.get("initFile");
if (initFile != null) {
env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String s) throws Exception {
String[] data = s.split(",");
return new Tuple4<String, String, String, String>(data[0], data[1], data[2], data[3]);
}
}).keyBy(0, 1).map(new ProfileInitMapper());
}

// execute program
env.execute("Flink Streaming Java API Skeleton");

// when all data read, save the state
Thread.sleep(10000);
}
}



Reply | Threaded
Open this post in threaded view
|

Re: Bootstrapping the state

Henri Heiskanen
Hi,

Thanks. Just to clarify, where would you then invoke the savepoint creation? I basically need to know when all data is read, create a savepoint and then exit. I think I could just as well use the PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to cancel with savepoint.

Any plans to have feature where I could choose Flink to make a savepoint on job exists? I am also keen on hearing other ideas how to bootstrap a state. I was initially thinking of just reading data from Cassandra if no state available.

Br,
Henkka

On Thu, Jul 19, 2018 at 3:15 PM vino yang <[hidden email]> wrote:
Hi Henkka,

The behavior of the text file source meets expectation. Flink will not keep your source task thread when it exit from it's invoke method. That means you should keep your source task alive. So to implement this, you should customize a text file source (implement SourceFunction interface).

 For your requirement, you can check a no more data idle time, if expire, then exit, finally the job will stop.

You can also refer the implementation of other source connectors.

Thanks, vino.

2018-07-19 19:52 GMT+08:00 Henri Heiskanen <[hidden email]>:
Hi,

I've been looking into how to initialise large state and especially checked this presentation by Lyft referenced in this group as well: https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this state and I believe loading this data from s3, creating a savepoint and then restarting in streaming mode from a savepoint would work very well. In the presentation I get an impression that I could read from s3 and when all done (without any custom termination detector etc) I could just make a savepoint by calling the rest api from the app. However, I've noticed that if I read data from files the job will auto-terminate when all data is read and job appears not to be running even if I add the sleep in the main program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating and create the savepoint from outside the app, but that would require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka
public class StreamingJob {

public static void main(String[] args) throws Exception {
if (args.length == 0) {
args = "--initFile init.csv".split(" ");
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool params = ParameterTool.fromArgs(args);

String initFile = params.get("initFile");
if (initFile != null) {
env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String s) throws Exception {
String[] data = s.split(",");
return new Tuple4<String, String, String, String>(data[0], data[1], data[2], data[3]);
}
}).keyBy(0, 1).map(new ProfileInitMapper());
}

// execute program
env.execute("Flink Streaming Java API Skeleton");

// when all data read, save the state
Thread.sleep(10000);
}
}



Reply | Threaded
Open this post in threaded view
|

Re: Re: Bootstrapping the state

vino yang
Hi Henkka,

If you want to customize the datastream text source for your purpose. You can use a read counter, if the value of counter would not change in a interval you can guess all the data has been read. Just a idea, you can choose other solution.

About creating a savepoint automatically on job exists, it sounds a good idea. I did not know any plan about this, I would try to submit this idea to the community.

And about "how to bootstrap a state", what does that mean? can you explain this?

Thank, vino


On 2018-07-20 20:00 , [hidden email] Wrote:

Hi,

Thanks. Just to clarify, where would you then invoke the savepoint creation? I basically need to know when all data is read, create a savepoint and then exit. I think I could just as well use the PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to cancel with savepoint.

Any plans to have feature where I could choose Flink to make a savepoint on job exists? I am also keen on hearing other ideas how to bootstrap a state. I was initially thinking of just reading data from Cassandra if no state available.

Br,
Henkka

On Thu, Jul 19, 2018 at 3:15 PM vino yang <[hidden email]> wrote:
Hi Henkka,

The behavior of the text file source meets expectation. Flink will not keep your source task thread when it exit from it's invoke method. That means you should keep your source task alive. So to implement this, you should customize a text file source (implement SourceFunction interface).

 For your requirement, you can check a no more data idle time, if expire, then exit, finally the job will stop.

You can also refer the implementation of other source connectors.

Thanks, vino.

2018-07-19 19:52 GMT+08:00 Henri Heiskanen <[hidden email]>:
Hi,

I've been looking into how to initialise large state and especially checked this presentation by Lyft referenced in this group as well: https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this state and I believe loading this data from s3, creating a savepoint and then restarting in streaming mode from a savepoint would work very well. In the presentation I get an impression that I could read from s3 and when all done (without any custom termination detector etc) I could just make a savepoint by calling the rest api from the app. However, I've noticed that if I read data from files the job will auto-terminate when all data is read and job appears not to be running even if I add the sleep in the main program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating and create the savepoint from outside the app, but that would require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka
public class StreamingJob {

public static void main(String[] args) throws Exception {
if (args.length == 0) {
args = "--initFile init.csv".split(" ");
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool params = ParameterTool.fromArgs(args);

String initFile = params.get("initFile");
if (initFile != null) {
env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String s) throws Exception {
String[] data = s.split(",");
return new Tuple4<String, String, String, String>(data[0], data[1], data[2], data[3]);
}
}).keyBy(0, 1).map(new ProfileInitMapper());
}

// execute program
env.execute("Flink Streaming Java API Skeleton");

// when all data read, save the state
Thread.sleep(10000);
}
}



Reply | Threaded
Open this post in threaded view
|

Re: Re: Bootstrapping the state

Henri Heiskanen
Hi,

With state bootstrapping I mean loading the state with initial data before starting the actual job. For example, in our case I would like to load information like registration date of our users (>5 years of data) so that I can enrich our event data in streaming (5 days retention).

Before watching the presentation by Lyft, I was loading this data per key from Cassandra DB in the mapper if the state was not found.

Br,
Henkka

Br,
Henkka

On Fri, Jul 20, 2018 at 7:03 PM Vino yang <[hidden email]> wrote:
Hi Henkka,

If you want to customize the datastream text source for your purpose. You can use a read counter, if the value of counter would not change in a interval you can guess all the data has been read. Just a idea, you can choose other solution.

About creating a savepoint automatically on job exists, it sounds a good idea. I did not know any plan about this, I would try to submit this idea to the community.

And about "how to bootstrap a state", what does that mean? can you explain this?

Thank, vino


On 2018-07-20 20:00 , [hidden email] Wrote:

Hi,

Thanks. Just to clarify, where would you then invoke the savepoint creation? I basically need to know when all data is read, create a savepoint and then exit. I think I could just as well use the PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to cancel with savepoint.

Any plans to have feature where I could choose Flink to make a savepoint on job exists? I am also keen on hearing other ideas how to bootstrap a state. I was initially thinking of just reading data from Cassandra if no state available.

Br,
Henkka

On Thu, Jul 19, 2018 at 3:15 PM vino yang <[hidden email]> wrote:
Hi Henkka,

The behavior of the text file source meets expectation. Flink will not keep your source task thread when it exit from it's invoke method. That means you should keep your source task alive. So to implement this, you should customize a text file source (implement SourceFunction interface).

 For your requirement, you can check a no more data idle time, if expire, then exit, finally the job will stop.

You can also refer the implementation of other source connectors.

Thanks, vino.

2018-07-19 19:52 GMT+08:00 Henri Heiskanen <[hidden email]>:
Hi,

I've been looking into how to initialise large state and especially checked this presentation by Lyft referenced in this group as well: https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this state and I believe loading this data from s3, creating a savepoint and then restarting in streaming mode from a savepoint would work very well. In the presentation I get an impression that I could read from s3 and when all done (without any custom termination detector etc) I could just make a savepoint by calling the rest api from the app. However, I've noticed that if I read data from files the job will auto-terminate when all data is read and job appears not to be running even if I add the sleep in the main program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating and create the savepoint from outside the app, but that would require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka
public class StreamingJob {

public static void main(String[] args) throws Exception {
if (args.length == 0) {
args = "--initFile init.csv".split(" ");
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool params = ParameterTool.fromArgs(args);

String initFile = params.get("initFile");
if (initFile != null) {
env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String s) throws Exception {
String[] data = s.split(",");
return new Tuple4<String, String, String, String>(data[0], data[1], data[2], data[3]);
}
}).keyBy(0, 1).map(new ProfileInitMapper());
}

// execute program
env.execute("Flink Streaming Java API Skeleton");

// when all data read, save the state
Thread.sleep(10000);
}
}



Reply | Threaded
Open this post in threaded view
|

Re: Re: Bootstrapping the state

Fabian Hueske-2
Hi Henkka,

You might want to consider implementing a dedicated job for state bootstrapping that uses the same operator UUID and state names. That might be easier than integrating the logic into your regular job.

I think you have to use the monitoring file source because AFAIK it won't be possible to start a savepoint once a task is finished, because Flink is not able to inject a checkpoint / savepoint barrier into finished tasks.
Detecting that all data was read is of course tricky, but you could monitor the processed records count metrics and take a save point once they don't change anymore.

Best, Fabian

2018-07-23 8:24 GMT+02:00 Henri Heiskanen <[hidden email]>:
Hi,

With state bootstrapping I mean loading the state with initial data before starting the actual job. For example, in our case I would like to load information like registration date of our users (>5 years of data) so that I can enrich our event data in streaming (5 days retention).

Before watching the presentation by Lyft, I was loading this data per key from Cassandra DB in the mapper if the state was not found.

Br,
Henkka

Br,
Henkka

On Fri, Jul 20, 2018 at 7:03 PM Vino yang <[hidden email]> wrote:
Hi Henkka,

If you want to customize the datastream text source for your purpose. You can use a read counter, if the value of counter would not change in a interval you can guess all the data has been read. Just a idea, you can choose other solution.

About creating a savepoint automatically on job exists, it sounds a good idea. I did not know any plan about this, I would try to submit this idea to the community.

And about "how to bootstrap a state", what does that mean? can you explain this?

Thank, vino


On 2018-07-20 20:00 , [hidden email] Wrote:

Hi,

Thanks. Just to clarify, where would you then invoke the savepoint creation? I basically need to know when all data is read, create a savepoint and then exit. I think I could just as well use the PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to cancel with savepoint.

Any plans to have feature where I could choose Flink to make a savepoint on job exists? I am also keen on hearing other ideas how to bootstrap a state. I was initially thinking of just reading data from Cassandra if no state available.

Br,
Henkka

On Thu, Jul 19, 2018 at 3:15 PM vino yang <[hidden email]> wrote:
Hi Henkka,

The behavior of the text file source meets expectation. Flink will not keep your source task thread when it exit from it's invoke method. That means you should keep your source task alive. So to implement this, you should customize a text file source (implement SourceFunction interface).

 For your requirement, you can check a no more data idle time, if expire, then exit, finally the job will stop.

You can also refer the implementation of other source connectors.

Thanks, vino.

2018-07-19 19:52 GMT+08:00 Henri Heiskanen <[hidden email]>:
Hi,

I've been looking into how to initialise large state and especially checked this presentation by Lyft referenced in this group as well: https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this state and I believe loading this data from s3, creating a savepoint and then restarting in streaming mode from a savepoint would work very well. In the presentation I get an impression that I could read from s3 and when all done (without any custom termination detector etc) I could just make a savepoint by calling the rest api from the app. However, I've noticed that if I read data from files the job will auto-terminate when all data is read and job appears not to be running even if I add the sleep in the main program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating and create the savepoint from outside the app, but that would require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka
public class StreamingJob {

public static void main(String[] args) throws Exception {
if (args.length == 0) {
args = "--initFile init.csv".split(" ");
}

// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool params = ParameterTool.fromArgs(args);

String initFile = params.get("initFile");
if (initFile != null) {
env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String s) throws Exception {
String[] data = s.split(",");
return new Tuple4<String, String, String, String>(data[0], data[1], data[2], data[3]);
}
}).keyBy(0, 1).map(new ProfileInitMapper());
}

// execute program
env.execute("Flink Streaming Java API Skeleton");

// when all data read, save the state
Thread.sleep(10000);
}
}