Flink SQL Streaming Join Creates Duplicates

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL Streaming Join Creates Duplicates

austin.ce
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is reading from a few CSV files and joins some records across them into a couple of data streams (yes, this could be a batch job won't get into why we chose streams unless it's relevant). These joins are producing some duplicate records, one with the joined field present and one with the joined field as `null`, though this happens only ~25% of the time. Reading the docs on joins[1], I thought this could be caused by too strict Idle State Retention[2], so I increased that to min, max (15min, 24h) but that doesn't seem to have an effect, and the problem still occurs when testing on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);

I'm creating my table environment like so:
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));

Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Streaming Join Creates Duplicates

austin.ce
oops, the example query should actually be:

SELECT table_1.a, table_1.b, table_2.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

and duplicate results should actually be:

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = "data b 1", c = null)
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = "data b 2", c = null)

On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is reading from a few CSV files and joins some records across them into a couple of data streams (yes, this could be a batch job won't get into why we chose streams unless it's relevant). These joins are producing some duplicate records, one with the joined field present and one with the joined field as `null`, though this happens only ~25% of the time. Reading the docs on joins[1], I thought this could be caused by too strict Idle State Retention[2], so I increased that to min, max (15min, 24h) but that doesn't seem to have an effect, and the problem still occurs when testing on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);

I'm creating my table environment like so:
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));

Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Streaming Join Creates Duplicates

austin.ce
Ah, I think the "Result Updating" is what got me -- INNER joins do the job!

On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <[hidden email]> wrote:
oops, the example query should actually be:

SELECT table_1.a, table_1.b, table_2.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

and duplicate results should actually be:

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = "data b 1", c = null)
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = "data b 2", c = null)

On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is reading from a few CSV files and joins some records across them into a couple of data streams (yes, this could be a batch job won't get into why we chose streams unless it's relevant). These joins are producing some duplicate records, one with the joined field present and one with the joined field as `null`, though this happens only ~25% of the time. Reading the docs on joins[1], I thought this could be caused by too strict Idle State Retention[2], so I increased that to min, max (15min, 24h) but that doesn't seem to have an effect, and the problem still occurs when testing on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);

I'm creating my table environment like so:
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));

Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Streaming Join Creates Duplicates

Arvid Heise-3
Hi Austin,

Do I assume correctly, that you self-answered your question? If not, could you please update your current progress?

Best,

Arvid

On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <[hidden email]> wrote:
Ah, I think the "Result Updating" is what got me -- INNER joins do the job!

On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <[hidden email]> wrote:
oops, the example query should actually be:

SELECT table_1.a, table_1.b, table_2.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

and duplicate results should actually be:

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = "data b 1", c = null)
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = "data b 2", c = null)

On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is reading from a few CSV files and joins some records across them into a couple of data streams (yes, this could be a batch job won't get into why we chose streams unless it's relevant). These joins are producing some duplicate records, one with the joined field present and one with the joined field as `null`, though this happens only ~25% of the time. Reading the docs on joins[1], I thought this could be caused by too strict Idle State Retention[2], so I increased that to min, max (15min, 24h) but that doesn't seem to have an effect, and the problem still occurs when testing on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);

I'm creating my table environment like so:
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));

Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Streaming Join Creates Duplicates

austin.ce
Hey Arvid,

Yes, I was able to self-answer this one. Was just confused on the non-deterministic behavior of the FULL OUTER join statement. Thinking through it and took a harder read through the Dynamic Tables doc section[1] where "Result Updating" is hinted at, and the behavior makes total sense in a streaming env.

Thanks,
Austin


On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise <[hidden email]> wrote:
Hi Austin,

Do I assume correctly, that you self-answered your question? If not, could you please update your current progress?

Best,

Arvid

On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <[hidden email]> wrote:
Ah, I think the "Result Updating" is what got me -- INNER joins do the job!

On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <[hidden email]> wrote:
oops, the example query should actually be:

SELECT table_1.a, table_1.b, table_2.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

and duplicate results should actually be:

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = "data b 1", c = null)
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = "data b 2", c = null)

On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey all,

I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is reading from a few CSV files and joins some records across them into a couple of data streams (yes, this could be a batch job won't get into why we chose streams unless it's relevant). These joins are producing some duplicate records, one with the joined field present and one with the joined field as `null`, though this happens only ~25% of the time. Reading the docs on joins[1], I thought this could be caused by too strict Idle State Retention[2], so I increased that to min, max (15min, 24h) but that doesn't seem to have an effect, and the problem still occurs when testing on a subset of data that finishes processing in under a minute.

The query roughly looks like:

table_1 has fields a, b
table_2 has fields b, c

SELECT table_1.a, table_1.b, table_1.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

Correct result:
Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")

Results seem to be anywhere between all possible dups and the correct result.

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = null, c = "data c 1")
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = null, c = "data c 2")

The CSV files are registered as Flink Tables with the following:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
.createTemporaryTable(tableName);

I'm creating my table environment like so:
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, tableEnvSettings);

TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));

Is there something I'm misconfiguring or have misunderstood the docs?

Thanks,
Austin



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng