Handle late message with flink SQL

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Handle late message with flink SQL

Yi Tang
We can get a stream from a DataStream api by SideOutput. But it's hard to do
the same thing with Flink SQL.

I have an idea about how to get the late records while using Flink SQL.

Assuming we have a source table for the late records, then we can query late
records on it. Obviously, it's not a real dynamic source table, it can be a
virtual source.

After optimizing, we can get a graph with some window aggregate nodes, which
can produced late records. And another graph for handling late records with
a virtual source node.

[scan] --> [group] --> [sink]

[virtual scan] --> [sink]

Then we can just "connect" these window nodes into the virtual source node.

The "connect" can be done by the following:

1. A side output node from each window node;
2. A mapper node may needed to encoding the record from the window node to
match the row type of virtual source;

[scan] --> [group] --> [sink]
                 \
                   --> [side output] --> [mapper] --> [sink]


Does it make sense? Or is there another way in progress for the similar
purpose?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Handle late message with flink SQL

Timo Walther
Hi,

your explanation makes sense but I'm wondering how the implementation
would look like. This would mean bigger changes in a Flink fork, right?

Late data handling in SQL is a frequently asked question. Currently, we
don't have a good way of supporting it. Usually, we recommend to use
DataStream API before Table API for branching late events into a
separate processing pipeline.

Another idea (not well thought though) could be a temporal join at a
later stage with a LookupTableSource that contains the late events to
perform the "connect"?

Regards,
Timo


On 15.03.21 09:58, Yi Tang wrote:

> We can get a stream from a DataStream api by SideOutput. But it's hard to do
> the same thing with Flink SQL.
>
> I have an idea about how to get the late records while using Flink SQL.
>
> Assuming we have a source table for the late records, then we can query late
> records on it. Obviously, it's not a real dynamic source table, it can be a
> virtual source.
>
> After optimizing, we can get a graph with some window aggregate nodes, which
> can produced late records. And another graph for handling late records with
> a virtual source node.
>
> [scan] --> [group] --> [sink]
>
> [virtual scan] --> [sink]
>
> Then we can just "connect" these window nodes into the virtual source node.
>
> The "connect" can be done by the following:
>
> 1. A side output node from each window node;
> 2. A mapper node may needed to encoding the record from the window node to
> match the row type of virtual source;
>
> [scan] --> [group] --> [sink]
>                   \
>                     --> [side output] --> [mapper] --> [sink]
>
>
> Does it make sense? Or is there another way in progress for the similar
> purpose?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Handle late message with flink SQL

Yi Tang
Thanks Timo.

The whole idea is also based on the side output and output tag. Let me
explain it in detail:

1. Introduce a VirtualTableScan(or SideOutputTableScan), which can be
optimized as Physical RelNode. Then we can create a source catalog table
which will be converted to a VirtualTableScan, and user can query from it
like normal.
2. Introduce a WithSideOutput interface, which announce that it can expose
one or more side outputs with specific output tags. E.g.
StreamExecGroupWindowAggregate can implement it and expose a tag for late
messages.
3. After optimized, we get some nodes. We can split them to the normal part
and those with virtual scan node. In the normal part, some nodes can expose
some side outputs. For each output tag, we can derive a SideOutputExecNode
with the node as input. Then check whether one SideOutputExecNode is
accepted by any virtual scan nodes.
4. As we may not know the side output type of the WithSideOutput in advance
(or maybe more than one WithSideOutputs), we may provide the encoded data
from the VirtualScan. If needed, the Mapper(Encoding) function will be
provided by the virtual scan node. Then we can expand(replace) the virtual
scan node with some pairs of SideOutputExecNode and Mapper Node.
5. Now, we get a completed ExecNode DAG, and can translates it to a
Transformation DAG.

The virtual source catalog table can be built-in if user enable this
feature.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/