Streaming data from MongoDB using Flink

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming data from MongoDB using Flink

Pedro Monteiro
Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro
Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Tzu-Li (Gordon) Tai
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ([hidden email]) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro
Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Pedro Monteiro
Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();
 
​It requests an implementation of a Source Function interface:
env.addSource(new SourceFunction<Document>() {
           
​​
@Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                
​// TO DO​
            }

            @Override
            public void cancel() {
               
​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ([hidden email]) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro

Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Tzu-Li (Gordon) Tai
I would recommend checking out the Flink RabbitMQ Source for examples:

For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out:

Cheers,
Gordon

On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ([hidden email]) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();
 
​It requests an implementation of a Source Function interface:
env.addSource(new SourceFunction<Document>() {
           
​​
@Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                
​// TO DO​
            }

            @Override
            public void cancel() {
               
​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ([hidden email]) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro

Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Pedro Monteiro
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:

For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out:

Cheers,
Gordon

On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ([hidden email]) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();
 
​It requests an implementation of a Source Function interface:
env.addSource(new SourceFunction<Document>() {
           
​​
@Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                
​// TO DO​
            }

            @Override
            public void cancel() {
               
​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ([hidden email]) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro


Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Pedro Monteiro
Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 11:29, Pedro Monteiro <[hidden email]> wrote:
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:

For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out:

Cheers,
Gordon

On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ([hidden email]) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();
 
​It requests an implementation of a Source Function interface:
env.addSource(new SourceFunction<Document>() {
           
​​
@Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                
​// TO DO​
            }

            @Override
            public void cancel() {
               
​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ([hidden email]) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro



Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Tzu-Li (Gordon) Tai
Good to know!


On February 16, 2017 at 10:13:28 PM, Pedro Monteiro ([hidden email]) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 11:29, Pedro Monteiro <[hidden email]> wrote:
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:

For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out:

Cheers,
Gordon

On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ([hidden email]) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();
 
​It requests an implementation of a Source Function interface:
env.addSource(new SourceFunction<Document>() {
           
​​
@Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                
​// TO DO​
            }

            @Override
            public void cancel() {
               
​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ([hidden email]) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro



Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Till Rohrmann
Hi Pedro,

in order to add new sources you have to first stop the job (maybe taking a savepoint if you want to resume later on) and then restart the job with the changed topology.

Cheers,
Till

On Thu, Feb 16, 2017 at 4:06 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Good to know!


On February 16, 2017 at 10:13:28 PM, Pedro Monteiro ([hidden email]) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 11:29, Pedro Monteiro <[hidden email]> wrote:
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:

For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out:

Cheers,
Gordon

On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ([hidden email]) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();
 
​It requests an implementation of a Source Function interface:
env.addSource(new SourceFunction<Document>() {
           
​​
@Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws Exception {
                
​// TO DO​
            }

            @Override
            public void cancel() {
               
​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ([hidden email]) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro




Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Tzu-Li (Gordon) Tai
Sorry, I just realized I didn’t notice the second part question of your last email when replying.
Thanks Till for answering it!


On February 17, 2017 at 6:05:58 PM, Till Rohrmann ([hidden email]) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,


Reply | Threaded
Open this post in threaded view
|

Re: Streaming data from MongoDB using Flink

Pedro Monteiro
Dear Gordon, Till

Thank you so much for your helpful answers. I managed to solve my problem with your guidelines.

Much appreciated, keep up the good work!

Cheers

Cumprimentos,

Pedro Lima Monteiro

On 17 February 2017 at 10:10, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Sorry, I just realized I didn’t notice the second part question of your last email when replying.
Thanks Till for answering it!


On February 17, 2017 at 6:05:58 PM, Till Rohrmann ([hidden email]) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,