Communication between two queries

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Communication between two queries

Mikael Gordani
Hello everyone!

So essentially, I've two identical queries (q1 and q2) running in parallel (Streams).
I'm trying to activate the ingestion of data to q2 based on what is processed in q1.
E.g say that we want to start ingesting data to q2 when a tuple with timestamp > 5000 appears in q1.

The queries are constructed in this way. (they share the same source)
q1: Source -> Filter -> Aggregate -> Filter -> Sink
            |
           V
q2:      Filter -> Filter -> Aggregate -> Filter -> Sink

The initial idea was to have a global variable which is shared between the two queries. When this tuple appears in q1, it will set the variable to true in the first Filter operator. While in q2, the first Filter-operator returns tuples depending on the value of the global variable.
When the variable = true, it will let data pass, when set to false, no data is allowed to be ingested.

This works fine when you have all the tasks on the same machine, but of course, it becomes troublesome in distributed deployments (tasks in different nodes and such).

My second approach was to create some sort of "loop" in the query. So let's say that we have the processing logic placed in the last Filter operator in q1, and when this "special" tuple appears, it can communicate with the first Filter operator in q2, in order to allow data to be ingested.
I've tried playing around with IterativeStreams but I don't really get it to work, and I feel like it's the wrong approach..

How can I achieve this sort of functionality?
I'm looking a bit on the BroadcastState part of the DataStream API, but I feel confused on how to use it. Is it possible to broadcast from a downstream to an upstream?
Suggestions would be much appreciated!

Best Regards,
Mikael Gordani
Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Piotr Nowojski-3
Hi,

Could you explain a bit more what are you trying to achieve? 

One problem that pops into my head is that currently in Flink Streaming (it is possible for processing bounded data), there is no way to “not ingest” the data reliably in general case, as this might deadlock the upstream operator once the output buffers will fill out. However instead, you can for example filter out/ignore records until some condition is met.

BroadcastState works for one single operator (and it’s parallel instances) - it doesn’t automatically communicate with any upstream/downstream operators - you have to wire/connect your operators and distribute the information as you want to. For examples how does it work you can take a look at this ITCase for example [1].

What you could do, is create following job topology using side outputs [2]:

Src1 -> OP1 -> broadcast_side_output  
             | 
            V
            Sink1

And use BroadcastProcessFunction to read Src1 and  broadcast_side_output.

Src1 +  broadcast_side_output -> OP2 -> Sink2

But as I wrote before, you have to be careful in OP2. If both OP1 and OP2 are reading from the same data stream Src1, if you stop reading records from Src1 in OP2, you eventually deadlock Src1 itself. Solution for that, would be to create second instance of Src1 operator, that would read records from the external system second time:

Src1" +  broadcast_side_output -> OP2 -> Sink2   

Piotrek

 

On 12 Mar 2020, at 12:53, Mikael Gordani <[hidden email]> wrote:

Hello everyone!

So essentially, I've two identical queries (q1 and q2) running in parallel (Streams).
I'm trying to activate the ingestion of data to q2 based on what is processed in q1.
E.g say that we want to start ingesting data to q2 when a tuple with timestamp > 5000 appears in q1.

The queries are constructed in this way. (they share the same source)
q1: Source -> Filter -> Aggregate -> Filter -> Sink
            |
           V
q2:      Filter -> Filter -> Aggregate -> Filter -> Sink

The initial idea was to have a global variable which is shared between the two queries. When this tuple appears in q1, it will set the variable to true in the first Filter operator. While in q2, the first Filter-operator returns tuples depending on the value of the global variable.
When the variable = true, it will let data pass, when set to false, no data is allowed to be ingested.

This works fine when you have all the tasks on the same machine, but of course, it becomes troublesome in distributed deployments (tasks in different nodes and such).

My second approach was to create some sort of "loop" in the query. So let's say that we have the processing logic placed in the last Filter operator in q1, and when this "special" tuple appears, it can communicate with the first Filter operator in q2, in order to allow data to be ingested.
I've tried playing around with IterativeStreams but I don't really get it to work, and I feel like it's the wrong approach..

How can I achieve this sort of functionality?
I'm looking a bit on the BroadcastState part of the DataStream API, but I feel confused on how to use it. Is it possible to broadcast from a downstream to an upstream?
Suggestions would be much appreciated!

Best Regards,
Mikael Gordani

Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Mikael Gordani
Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)
Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Piotr Nowojski-3
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

On 13 Mar 2020, at 15:47, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)

Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Mikael Gordani
Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <[hidden email]>:
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

On 13 Mar 2020, at 15:47, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)



--

Med Vänliga Hälsningar,
Mikael Gordani

Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Piotr Nowojski-3
Hi,

Let us know if something doesn’t work :)

Piotrek

On 16 Mar 2020, at 08:42, Mikael Gordani <[hidden email]> wrote:

Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <[hidden email]>:
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

On 13 Mar 2020, at 15:47, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)



--

Med Vänliga Hälsningar,
Mikael Gordani


Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Mikael Gordani
Hi Piotr!

Continuing with my scenario, since both of the queries will share the same sink, I've realized that some issues will appear when I switch queries. Especially with regards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both of the queries ingest a sequence of integers, and it will perform the average of these integers over some time.
E.g say that query1 ingest the sequence 1,2,3,4....
The windows for query1 will be [1,2,3] [2,3,4] [3,4].

If I'm later on "activating" query2, I need to have both of the queries allowing tuples for a while, in order to allow the aggregation to finish in query1 before denying it input.
But, there is a possibility that query2 might receive the tuples 3,4, which will result in the window: [3][3,4][3,4]
Later on, the output of the respective queries will be:
Query 1: 3, 4.5, 3.5
Query2 : 3, 3.5, 3.5

As one can see, the second output will be different.
I'm thinking of using watermarks somehow to make sure that both queries has processed the same amount of data before writing to the sink, but I'm a bit unsure on how to do it.
Do you have any suggestions or thoughts?
Cheers,

Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <[hidden email]>:
Hi,

Let us know if something doesn’t work :)

Piotrek

On 16 Mar 2020, at 08:42, Mikael Gordani <[hidden email]> wrote:

Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <[hidden email]>:
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

On 13 Mar 2020, at 15:47, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)



--

Med Vänliga Hälsningar,
Mikael Gordani




--

Med Vänliga Hälsningar,
Mikael Gordani

Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Piotr Nowojski-3
Hi,

Yes, you are looking in the right directions with the watermarks. 

First of all you would have to use event time semantic for constant results. With processing time everything would be simpler, but it would be more difficult to reason about the results (your choice). Secondly, you would have to hook up the logic of enabling query1/query2 to the event time/watermarks. Thirdly, you need to somehow to sync the input switching with the windows boundaries. On top of that, watermarks express lower bound of even time that you can expect. However, in order to guarantee consistency of the windows, you would like to control the upper bound. For example:

1. If you want to enable Query2, you would need to check what’s the largest/latest event time that was processed by the input splitter, lets say that’s TS1 
2. That means, records with event time < TS1 have already been processed by Query1, starting some windows
3. The earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time window, let’s say that would be TS2 = TS1 + 1 + start of next window
5. Input splitter now must keep sending records with event time < TS2 to Query1, but already should redirect records with event time >= TS2 to Query2.
6. Once watermark for the input splitter advances past TS2, that’s when it can finally stop sending records to Query1 and query1 logic could be considered “completed”.  

So Query1 would be responsible for all of the data before TS2, and Query2 after TS2.

Alternatively, your input splitter could also buffer some records, so that you could enable Query2 faster, by re-sending the buffered records. But in that case, both Query1 and Query2 would be responsible for some portion of the data.

Piotrek

On 17 Mar 2020, at 10:35, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!

Continuing with my scenario, since both of the queries will share the same sink, I've realized that some issues will appear when I switch queries. Especially with regards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both of the queries ingest a sequence of integers, and it will perform the average of these integers over some time.
E.g say that query1 ingest the sequence 1,2,3,4....
The windows for query1 will be [1,2,3] [2,3,4] [3,4].

If I'm later on "activating" query2, I need to have both of the queries allowing tuples for a while, in order to allow the aggregation to finish in query1 before denying it input.
But, there is a possibility that query2 might receive the tuples 3,4, which will result in the window: [3][3,4][3,4]
Later on, the output of the respective queries will be:
Query 1: 3, 4.5, 3.5
Query2 : 3, 3.5, 3.5

As one can see, the second output will be different.
I'm thinking of using watermarks somehow to make sure that both queries has processed the same amount of data before writing to the sink, but I'm a bit unsure on how to do it.
Do you have any suggestions or thoughts?
Cheers,

Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <[hidden email]>:
Hi,

Let us know if something doesn’t work :)

Piotrek

On 16 Mar 2020, at 08:42, Mikael Gordani <[hidden email]> wrote:

Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <[hidden email]>:
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

On 13 Mar 2020, at 15:47, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)



--

Med Vänliga Hälsningar,
Mikael Gordani




--

Med Vänliga Hälsningar,
Mikael Gordani


Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Piotr Nowojski-3
Ops, sorry there was a misleading typo/auto correction in my previous e-mail. Second sentence should have been:

> First of all you would have to use event time semantic for consistent results

Piotrek

On 17 Mar 2020, at 14:43, Piotr Nowojski <[hidden email]> wrote:

Hi,

Yes, you are looking in the right directions with the watermarks. 

First of all you would have to use event time semantic for constant results. With processing time everything would be simpler, but it would be more difficult to reason about the results (your choice). Secondly, you would have to hook up the logic of enabling query1/query2 to the event time/watermarks. Thirdly, you need to somehow to sync the input switching with the windows boundaries. On top of that, watermarks express lower bound of even time that you can expect. However, in order to guarantee consistency of the windows, you would like to control the upper bound. For example:

1. If you want to enable Query2, you would need to check what’s the largest/latest event time that was processed by the input splitter, lets say that’s TS1 
2. That means, records with event time < TS1 have already been processed by Query1, starting some windows
3. The earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time window, let’s say that would be TS2 = TS1 + 1 + start of next window
5. Input splitter now must keep sending records with event time < TS2 to Query1, but already should redirect records with event time >= TS2 to Query2.
6. Once watermark for the input splitter advances past TS2, that’s when it can finally stop sending records to Query1 and query1 logic could be considered “completed”.  

So Query1 would be responsible for all of the data before TS2, and Query2 after TS2.

Alternatively, your input splitter could also buffer some records, so that you could enable Query2 faster, by re-sending the buffered records. But in that case, both Query1 and Query2 would be responsible for some portion of the data.

Piotrek

On 17 Mar 2020, at 10:35, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!

Continuing with my scenario, since both of the queries will share the same sink, I've realized that some issues will appear when I switch queries. Especially with regards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both of the queries ingest a sequence of integers, and it will perform the average of these integers over some time.
E.g say that query1 ingest the sequence 1,2,3,4....
The windows for query1 will be [1,2,3] [2,3,4] [3,4].

If I'm later on "activating" query2, I need to have both of the queries allowing tuples for a while, in order to allow the aggregation to finish in query1 before denying it input.
But, there is a possibility that query2 might receive the tuples 3,4, which will result in the window: [3][3,4][3,4]
Later on, the output of the respective queries will be:
Query 1: 3, 4.5, 3.5
Query2 : 3, 3.5, 3.5

As one can see, the second output will be different.
I'm thinking of using watermarks somehow to make sure that both queries has processed the same amount of data before writing to the sink, but I'm a bit unsure on how to do it.
Do you have any suggestions or thoughts?
Cheers,

Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <[hidden email]>:
Hi,

Let us know if something doesn’t work :)

Piotrek

On 16 Mar 2020, at 08:42, Mikael Gordani <[hidden email]> wrote:

Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <[hidden email]>:
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

On 13 Mar 2020, at 15:47, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)



--

Med Vänliga Hälsningar,
Mikael Gordani




--

Med Vänliga Hälsningar,
Mikael Gordani



Reply | Threaded
Open this post in threaded view
|

Re: Communication between two queries

Mikael Gordani
No worries and great idea!
I will play around with it and see what I manage to do.
Cheers!

Den tis 17 mars 2020 kl 15:59 skrev Piotr Nowojski <[hidden email]>:
Ops, sorry there was a misleading typo/auto correction in my previous e-mail. Second sentence should have been:

> First of all you would have to use event time semantic for consistent results

Piotrek

On 17 Mar 2020, at 14:43, Piotr Nowojski <[hidden email]> wrote:

Hi,

Yes, you are looking in the right directions with the watermarks. 

First of all you would have to use event time semantic for constant results. With processing time everything would be simpler, but it would be more difficult to reason about the results (your choice). Secondly, you would have to hook up the logic of enabling query1/query2 to the event time/watermarks. Thirdly, you need to somehow to sync the input switching with the windows boundaries. On top of that, watermarks express lower bound of even time that you can expect. However, in order to guarantee consistency of the windows, you would like to control the upper bound. For example:

1. If you want to enable Query2, you would need to check what’s the largest/latest event time that was processed by the input splitter, lets say that’s TS1 
2. That means, records with event time < TS1 have already been processed by Query1, starting some windows
3. The earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time window, let’s say that would be TS2 = TS1 + 1 + start of next window
5. Input splitter now must keep sending records with event time < TS2 to Query1, but already should redirect records with event time >= TS2 to Query2.
6. Once watermark for the input splitter advances past TS2, that’s when it can finally stop sending records to Query1 and query1 logic could be considered “completed”.  

So Query1 would be responsible for all of the data before TS2, and Query2 after TS2.

Alternatively, your input splitter could also buffer some records, so that you could enable Query2 faster, by re-sending the buffered records. But in that case, both Query1 and Query2 would be responsible for some portion of the data.

Piotrek

On 17 Mar 2020, at 10:35, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!

Continuing with my scenario, since both of the queries will share the same sink, I've realized that some issues will appear when I switch queries. Especially with regards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both of the queries ingest a sequence of integers, and it will perform the average of these integers over some time.
E.g say that query1 ingest the sequence 1,2,3,4....
The windows for query1 will be [1,2,3] [2,3,4] [3,4].

If I'm later on "activating" query2, I need to have both of the queries allowing tuples for a while, in order to allow the aggregation to finish in query1 before denying it input.
But, there is a possibility that query2 might receive the tuples 3,4, which will result in the window: [3][3,4][3,4]
Later on, the output of the respective queries will be:
Query 1: 3, 4.5, 3.5
Query2 : 3, 3.5, 3.5

As one can see, the second output will be different.
I'm thinking of using watermarks somehow to make sure that both queries has processed the same amount of data before writing to the sink, but I'm a bit unsure on how to do it.
Do you have any suggestions or thoughts?
Cheers,

Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <[hidden email]>:
Hi,

Let us know if something doesn’t work :)

Piotrek

On 16 Mar 2020, at 08:42, Mikael Gordani <[hidden email]> wrote:

Hi,
I'll try it out =)

Cheers!

Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <[hidden email]>:
Hi,

In that case you could try to implement your `FilterFunction` as two input operator, with broadcast control input, that would be setting the `global_var`. Broadcast control input can be originating from some source, or from some operator.

Piotrek

On 13 Mar 2020, at 15:47, Mikael Gordani <[hidden email]> wrote:

Hi Piotr!
Thanks for your response, I'll try to explain what I'm trying to achieve in more detail:

Essentially, If I've two queries, in which has the same operators and runs in the same task, I would want to figure out some way of controlling the ingestion from a source to the respective queries in such a way that only one of the queries receive data, based on a condition.
For more context, the second query (query2), is equipped with instrumented operators, which are standard operators extended with some extra functionality, in my case, they enrich the tuples with meta-data.

Source --> Filter1 ---> rest of query1
   |
   v
   Filter2 ---> rest of query2

By using filters prior to the queries, they allow records to pass depending on a condition, let's say a global boolean variable (which is initially set to false).
If it's set to true, Filter1 will accept every record and Filter2 will disregard every record.
If it's set to false, Filter2 will accept every record and Filter1 will disregard every record.
So the filter operators looks something like this:
boolean global_var = false;

private static class filter1
implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return !global_var;
}
}

private static class filter2 implements FilterFunction<Tuple t> {
@Override
public boolean filter(Tuple t) throws Exception {
return global_var;
}
}

Then later on, in the respective queries, there are some processing logic in which changes the value of the global variable, thus enabling and disabling the flow of data from the source to the respective queries.
The problem lies in this global variable being problematic in distributed deployments, in which I'm having a hard time figuring out how to solve.
Is it a bit more clear? =)



--

Med Vänliga Hälsningar,
Mikael Gordani




--

Med Vänliga Hälsningar,
Mikael Gordani





--

Med Vänliga Hälsningar,
Mikael Gordani