Hey all, I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is reading from a few CSV files and joins some records across them into a couple of data streams (yes, this could be a batch job won't get into why we chose streams unless it's relevant). These joins are producing some duplicate records, one with the joined field present and one with the joined field as `null`, though this happens only ~25% of the time. Reading the docs on joins[1], I thought this could be caused by too strict Idle State Retention[2], so I increased that to min, max (15min, 24h) but that doesn't seem to have an effect, and the problem still occurs when testing on a subset of data that finishes processing in under a minute. The query roughly looks like: table_1 has fields a, b table_2 has fields b, c SELECT table_1.a, table_1.b, table_1.c FROM table_1 LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; Correct result: Record(a = "data a 1", b = "data b 1", c = "data c 1") Record(a = "data a 2", b = "data b 2", c = "data c 2") Results seem to be anywhere between all possible dups and the correct result. Record(a = "data a 1", b = "data b 1", c = "data c 1") Record(a = "data a 1", b = null, c = "data c 1") Record(a = "data a 2", b = "data b 2", c = "data c 2") Record(a = "data a 2", b = null, c = "data c 2") The CSV files are registered as Flink Tables with the following: tableEnv.connect( I'm creating my table environment like so: EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() Is there something I'm misconfiguring or have misunderstood the docs? Thanks, Austin |
oops, the example query should actually be: SELECT table_1.a, table_1.b, table_2.c FROM table_1 LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; and duplicate results should actually be: Record(a = "data a 1", b = "data b 1", c = "data c 1") Record(a = "data a 1", b = "data b 1", c = null) Record(a = "data a 2", b = "data b 2", c = "data c 2") Record(a = "data a 2", b = "data b 2", c = null) On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <[hidden email]> wrote:
|
Ah, I think the "Result Updating" is what got me -- INNER joins do the job! On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <[hidden email]> wrote:
|
Hi Austin, Do I assume correctly, that you self-answered your question? If not, could you please update your current progress? Best, Arvid On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hey Arvid, Yes, I was able to self-answer this one. Was just confused on the non-deterministic behavior of the FULL OUTER join statement. Thinking through it and took a harder read through the Dynamic Tables doc section[1] where "Result Updating" is hinted at, and the behavior makes total sense in a streaming env. Thanks, Austin On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |