We can get a stream from a DataStream api by SideOutput. But it's hard to do
the same thing with Flink SQL. I have an idea about how to get the late records while using Flink SQL. Assuming we have a source table for the late records, then we can query late records on it. Obviously, it's not a real dynamic source table, it can be a virtual source. After optimizing, we can get a graph with some window aggregate nodes, which can produced late records. And another graph for handling late records with a virtual source node. [scan] --> [group] --> [sink] [virtual scan] --> [sink] Then we can just "connect" these window nodes into the virtual source node. The "connect" can be done by the following: 1. A side output node from each window node; 2. A mapper node may needed to encoding the record from the window node to match the row type of virtual source; [scan] --> [group] --> [sink] \ --> [side output] --> [mapper] --> [sink] Does it make sense? Or is there another way in progress for the similar purpose? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
your explanation makes sense but I'm wondering how the implementation would look like. This would mean bigger changes in a Flink fork, right? Late data handling in SQL is a frequently asked question. Currently, we don't have a good way of supporting it. Usually, we recommend to use DataStream API before Table API for branching late events into a separate processing pipeline. Another idea (not well thought though) could be a temporal join at a later stage with a LookupTableSource that contains the late events to perform the "connect"? Regards, Timo On 15.03.21 09:58, Yi Tang wrote: > We can get a stream from a DataStream api by SideOutput. But it's hard to do > the same thing with Flink SQL. > > I have an idea about how to get the late records while using Flink SQL. > > Assuming we have a source table for the late records, then we can query late > records on it. Obviously, it's not a real dynamic source table, it can be a > virtual source. > > After optimizing, we can get a graph with some window aggregate nodes, which > can produced late records. And another graph for handling late records with > a virtual source node. > > [scan] --> [group] --> [sink] > > [virtual scan] --> [sink] > > Then we can just "connect" these window nodes into the virtual source node. > > The "connect" can be done by the following: > > 1. A side output node from each window node; > 2. A mapper node may needed to encoding the record from the window node to > match the row type of virtual source; > > [scan] --> [group] --> [sink] > \ > --> [side output] --> [mapper] --> [sink] > > > Does it make sense? Or is there another way in progress for the similar > purpose? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Thanks Timo.
The whole idea is also based on the side output and output tag. Let me explain it in detail: 1. Introduce a VirtualTableScan(or SideOutputTableScan), which can be optimized as Physical RelNode. Then we can create a source catalog table which will be converted to a VirtualTableScan, and user can query from it like normal. 2. Introduce a WithSideOutput interface, which announce that it can expose one or more side outputs with specific output tags. E.g. StreamExecGroupWindowAggregate can implement it and expose a tag for late messages. 3. After optimized, we get some nodes. We can split them to the normal part and those with virtual scan node. In the normal part, some nodes can expose some side outputs. For each output tag, we can derive a SideOutputExecNode with the node as input. Then check whether one SideOutputExecNode is accepted by any virtual scan nodes. 4. As we may not know the side output type of the WithSideOutput in advance (or maybe more than one WithSideOutputs), we may provide the encoded data from the VirtualScan. If needed, the Mapper(Encoding) function will be provided by the virtual scan node. Then we can expand(replace) the virtual scan node with some pairs of SideOutputExecNode and Mapper Node. 5. Now, we get a completed ExecNode DAG, and can translates it to a Transformation DAG. The virtual source catalog table can be built-in if user enable this feature. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |