Issue with Flink - unrelated executeSql causes other executeSqls to fail.

classic Classic list List threaded Threaded
20 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
I'm writing a test for a batch job using MiniClusterResourceConfiguration.

Here's a simple description of my working test case:
1) I use TableEnvironment.executeSql(...) to create a source and sink table using tmp filesystem directory.
2) I use executeSql to insert some test data into the source tabel.
3) I use executeSql to select from source and insert into sink.
4) I use executeSql from the same source to a different sink.

When I do these steps, it works.  If I remove step 4, no data gets written to the sink.  My actual code is more complex than this (has create view, join and more tables).  This is a simplified description but highlights the weird error.

Has anyone hit issues like this?  I'm assuming I have a small code bug in my queries that's causing issues.  These queries appear to work in production so I'm confused.  Are there ways of viewing failed jobs or queries with MiniClusterResourceConfiguration?

Thanks!
- Dan


Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Aljoscha Krettek
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:

> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
I've tried to enable additional logging for a few hours today.  I think something with junit5 is swallowing the logs.  I'm using Bazel and junit5.  I setup MiniClusterResourceConfiguration using a custom extension.  Are there any known issues with Flink and junit5?  I can try switching to junit4.

When I've binary searched this issue, this failure happens if my query in step 3 has a join it.  If I remove the join, I can remove step 4 and the code still works.  I've renamed a bunch of my tables too and the problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:
> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
I don't think any of the gotchas apply to me (at the bottom of this link).
I'm assuming for a batch job that I don't have to do anything for: "You can implement a custom parallel source function for emitting watermarks if your job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
I've tried to enable additional logging for a few hours today.  I think something with junit5 is swallowing the logs.  I'm using Bazel and junit5.  I setup MiniClusterResourceConfiguration using a custom extension.  Are there any known issues with Flink and junit5?  I can try switching to junit4.

When I've binary searched this issue, this failure happens if my query in step 3 has a join it.  If I remove the join, I can remove step 4 and the code still works.  I've renamed a bunch of my tables too and the problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:
> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

austin.ce
Hey Dan,

We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and haven’t had issues, though we’re only testing on streaming jobs.

Happy to help setting up logging with that if you’d like. 

Best,
Austin

On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
I don't think any of the gotchas apply to me (at the bottom of this link).
I'm assuming for a batch job that I don't have to do anything for: "You can implement a custom parallel source function for emitting watermarks if your job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
I've tried to enable additional logging for a few hours today.  I think something with junit5 is swallowing the logs.  I'm using Bazel and junit5.  I setup MiniClusterResourceConfiguration using a custom extension.  Are there any known issues with Flink and junit5?  I can try switching to junit4.

When I've binary searched this issue, this failure happens if my query in step 3 has a join it.  If I remove the join, I can remove step 4 and the code still works.  I've renamed a bunch of my tables too and the problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:
> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

austin.ce
Unless it's related to this issue[1], which was w/ my JOIN and time characteristics, though not sure that applies for batch.

Best,
Austin



On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Dan,

We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and haven’t had issues, though we’re only testing on streaming jobs.

Happy to help setting up logging with that if you’d like. 

Best,
Austin

On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
I don't think any of the gotchas apply to me (at the bottom of this link).
I'm assuming for a batch job that I don't have to do anything for: "You can implement a custom parallel source function for emitting watermarks if your job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
I've tried to enable additional logging for a few hours today.  I think something with junit5 is swallowing the logs.  I'm using Bazel and junit5.  I setup MiniClusterResourceConfiguration using a custom extension.  Are there any known issues with Flink and junit5?  I can try switching to junit4.

When I've binary searched this issue, this failure happens if my query in step 3 has a join it.  If I remove the join, I can remove step 4 and the code still works.  I've renamed a bunch of my tables too and the problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:
> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

austin.ce
Oops, this is actually the JOIN issue thread [1]. Guess I should revise my previous "haven't had issues" statement hah. Sorry for the spam!


On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <[hidden email]> wrote:
Unless it's related to this issue[1], which was w/ my JOIN and time characteristics, though not sure that applies for batch.

Best,
Austin



On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Dan,

We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and haven’t had issues, though we’re only testing on streaming jobs.

Happy to help setting up logging with that if you’d like. 

Best,
Austin

On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
I don't think any of the gotchas apply to me (at the bottom of this link).
I'm assuming for a batch job that I don't have to do anything for: "You can implement a custom parallel source function for emitting watermarks if your job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
I've tried to enable additional logging for a few hours today.  I think something with junit5 is swallowing the logs.  I'm using Bazel and junit5.  I setup MiniClusterResourceConfiguration using a custom extension.  Are there any known issues with Flink and junit5?  I can try switching to junit4.

When I've binary searched this issue, this failure happens if my query in step 3 has a join it.  If I remove the join, I can remove step 4 and the code still works.  I've renamed a bunch of my tables too and the problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:
> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
Thanks!

Great to know.  I copied this junit5-jupiter-starter-bazel rule into my repository (I don't think junit5 is supported directly with java_test yet).  I tried a few ways of bundling `log4j.properties` into the jar and didn't get them to work.  My current iteration hacks the log4j.properties file as an absolute path.  My failed attempts would spit an error saying log4j.properties file was not found.  This route finds it but the log properties are not used for the java logger.  

Are there a better set of rules to use for junit5?  

# build rule
java_junit5_test(
    name = "tests",
    srcs = glob(["*.java"]),
    test_package = "ai.promoted.logprocessor.batch",
    deps = [...],
    jvm_flags = ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
)

# log4j.properties
status = error
name = Log4j2PropertiesConfig
appenders = console
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = LogToConsole

On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <[hidden email]> wrote:
Oops, this is actually the JOIN issue thread [1]. Guess I should revise my previous "haven't had issues" statement hah. Sorry for the spam!


On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <[hidden email]> wrote:
Unless it's related to this issue[1], which was w/ my JOIN and time characteristics, though not sure that applies for batch.

Best,
Austin



On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Dan,

We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and haven’t had issues, though we’re only testing on streaming jobs.

Happy to help setting up logging with that if you’d like. 

Best,
Austin

On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
I don't think any of the gotchas apply to me (at the bottom of this link).
I'm assuming for a batch job that I don't have to do anything for: "You can implement a custom parallel source function for emitting watermarks if your job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
I've tried to enable additional logging for a few hours today.  I think something with junit5 is swallowing the logs.  I'm using Bazel and junit5.  I setup MiniClusterResourceConfiguration using a custom extension.  Are there any known issues with Flink and junit5?  I can try switching to junit4.

When I've binary searched this issue, this failure happens if my query in step 3 has a join it.  If I remove the join, I can remove step 4 and the code still works.  I've renamed a bunch of my tables too and the problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:
> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
I'm trying to use Table API for my job.  I'll soon try to get a test working for my stream job.
- I'll parameterize so I can have different sources and sink for tests.  How should I mock out a Kafka source?  For my test, I was planning on changing the input to be from a temp file (instead of Kafka).
- What's a good way of forcing a watermark using the Table API?


On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
Thanks!

Great to know.  I copied this junit5-jupiter-starter-bazel rule into my repository (I don't think junit5 is supported directly with java_test yet).  I tried a few ways of bundling `log4j.properties` into the jar and didn't get them to work.  My current iteration hacks the log4j.properties file as an absolute path.  My failed attempts would spit an error saying log4j.properties file was not found.  This route finds it but the log properties are not used for the java logger.  

Are there a better set of rules to use for junit5?  

# build rule
java_junit5_test(
    name = "tests",
    srcs = glob(["*.java"]),
    test_package = "ai.promoted.logprocessor.batch",
    deps = [...],
    jvm_flags = ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
)

# log4j.properties
status = error
name = Log4j2PropertiesConfig
appenders = console
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = LogToConsole

On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <[hidden email]> wrote:
Oops, this is actually the JOIN issue thread [1]. Guess I should revise my previous "haven't had issues" statement hah. Sorry for the spam!


On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <[hidden email]> wrote:
Unless it's related to this issue[1], which was w/ my JOIN and time characteristics, though not sure that applies for batch.

Best,
Austin



On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Dan,

We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and haven’t had issues, though we’re only testing on streaming jobs.

Happy to help setting up logging with that if you’d like. 

Best,
Austin

On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
I don't think any of the gotchas apply to me (at the bottom of this link).
I'm assuming for a batch job that I don't have to do anything for: "You can implement a custom parallel source function for emitting watermarks if your job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
I've tried to enable additional logging for a few hours today.  I think something with junit5 is swallowing the logs.  I'm using Bazel and junit5.  I setup MiniClusterResourceConfiguration using a custom extension.  Are there any known issues with Flink and junit5?  I can try switching to junit4.

When I've binary searched this issue, this failure happens if my query in step 3 has a join it.  If I remove the join, I can remove step 4 and the code still works.  I've renamed a bunch of my tables too and the problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

  - https://issues.apache.org/jira/browse/FLINK-19123
  - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:
> I'm writing a test for a batch job using MiniClusterResourceConfiguration.
>
> Here's a simple description of my working test case:
> 1) I use TableEnvironment.executeSql(...) to create a source and sink table
> using tmp filesystem directory.
> 2) I use executeSql to insert some test data into the source tabel.
> 3) I use executeSql to select from source and insert into sink.
> 4) I use executeSql from the same source to a different sink.
>
> When I do these steps, it works.  If I remove step 4, no data gets written
> to the sink.  My actual code is more complex than this (has create view,
> join and more tables).  This is a simplified description but highlights the
> weird error.
>
> Has anyone hit issues like this?  I'm assuming I have a small code bug in
> my queries that's causing issues.  These queries appear to work in
> production so I'm confused.  Are there ways of viewing failed jobs or
> queries with MiniClusterResourceConfiguration?
>
> Thanks!
> - Dan
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Aljoscha Krettek
Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:

> I'm trying to use Table API for my job.  I'll soon try to get a test
> working for my stream job.
> - I'll parameterize so I can have different sources and sink for tests.
> How should I mock out a Kafka source?  For my test, I was planning on
> changing the input to be from a temp file (instead of Kafka).
> - What's a good way of forcing a watermark using the Table API?
>
>
> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
>
>> Thanks!
>>
>> Great to know.  I copied this junit5-jupiter-starter-bazel
>> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel> rule
>> into my repository (I don't think junit5 is supported directly with
>> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
>> jar and didn't get them to work.  My current iteration hacks the
>> log4j.properties file as an absolute path.  My failed attempts would spit
>> an error saying log4j.properties file was not found.  This route finds it
>> but the log properties are not used for the java logger.
>>
>> Are there a better set of rules to use for junit5?
>>
>> # build rule
>> java_junit5_test(
>>      name = "tests",
>>      srcs = glob(["*.java"]),
>>      test_package = "ai.promoted.logprocessor.batch",
>>      deps = [...],
>>      jvm_flags =
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> )
>>
>> # log4j.properties
>> status = error
>> name = Log4j2PropertiesConfig
>> appenders = console
>> appender.console.type = Console
>> appender.console.name = LogToConsole
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> rootLogger.level = info
>> rootLogger.appenderRefs = stdout
>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>
>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> [hidden email]> wrote:
>>
>>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>
>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> [hidden email]> wrote:
>>>
>>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>>> characteristics, though not sure that applies for batch.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hey Dan,
>>>>>
>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>
>>>>> Happy to help setting up logging with that if you’d like.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
>>>>>
>>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>>> link).
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>
>>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>>> "You can implement a custom parallel source function for emitting
>>>>>> watermarks if your job uses event time timers."
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
>>>>>>
>>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>>> switching to junit4.
>>>>>>>
>>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>>> problem still exists.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>>>> fixed:
>>>>>>>>
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>
>>>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>>>> enable
>>>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>>>>> correctly on the MiniCluster?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>
>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>>>>> sink table
>>>>>>>>> using tmp filesystem directory.
>>>>>>>>> 2) I use executeSql to insert some test data into the source tabel.
>>>>>>>>> 3) I use executeSql to select from source and insert into sink.
>>>>>>>>> 4) I use executeSql from the same source to a different sink.
>>>>>>>>>
>>>>>>>>> When I do these steps, it works.  If I remove step 4, no data gets
>>>>>>>> written
>>>>>>>>> to the sink.  My actual code is more complex than this (has create
>>>>>>>> view,
>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>> highlights the
>>>>>>>>> weird error.
>>>>>>>>>
>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a small code
>>>>>>>> bug in
>>>>>>>>> my queries that's causing issues.  These queries appear to work in
>>>>>>>>> production so I'm confused.  Are there ways of viewing failed jobs
>>>>>>>> or
>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> - Dan
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

austin.ce
What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:
> I'm trying to use Table API for my job.  I'll soon try to get a test
> working for my stream job.
> - I'll parameterize so I can have different sources and sink for tests.
> How should I mock out a Kafka source?  For my test, I was planning on
> changing the input to be from a temp file (instead of Kafka).
> - What's a good way of forcing a watermark using the Table API?
>
>
> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
>
>> Thanks!
>>
>> Great to know.  I copied this junit5-jupiter-starter-bazel
>> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel> rule
>> into my repository (I don't think junit5 is supported directly with
>> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
>> jar and didn't get them to work.  My current iteration hacks the
>> log4j.properties file as an absolute path.  My failed attempts would spit
>> an error saying log4j.properties file was not found.  This route finds it
>> but the log properties are not used for the java logger.
>>
>> Are there a better set of rules to use for junit5?
>>
>> # build rule
>> java_junit5_test(
>>      name = "tests",
>>      srcs = glob(["*.java"]),
>>      test_package = "ai.promoted.logprocessor.batch",
>>      deps = [...],
>>      jvm_flags =
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> )
>>
>> # log4j.properties
>> status = error
>> name = Log4j2PropertiesConfig
>> appenders = console
>> appender.console.type = Console
>> appender.console.name = LogToConsole
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> rootLogger.level = info
>> rootLogger.appenderRefs = stdout
>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>
>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> [hidden email]> wrote:
>>
>>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>
>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> [hidden email]> wrote:
>>>
>>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>>> characteristics, though not sure that applies for batch.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hey Dan,
>>>>>
>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>
>>>>> Happy to help setting up logging with that if you’d like.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
>>>>>
>>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>>> link).
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>
>>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>>> "You can implement a custom parallel source function for emitting
>>>>>> watermarks if your job uses event time timers."
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
>>>>>>
>>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>>> switching to junit4.
>>>>>>>
>>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>>> problem still exists.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>>>> fixed:
>>>>>>>>
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>
>>>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>>>> enable
>>>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>>>>> correctly on the MiniCluster?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>
>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>>>>> sink table
>>>>>>>>> using tmp filesystem directory.
>>>>>>>>> 2) I use executeSql to insert some test data into the source tabel.
>>>>>>>>> 3) I use executeSql to select from source and insert into sink.
>>>>>>>>> 4) I use executeSql from the same source to a different sink.
>>>>>>>>>
>>>>>>>>> When I do these steps, it works.  If I remove step 4, no data gets
>>>>>>>> written
>>>>>>>>> to the sink.  My actual code is more complex than this (has create
>>>>>>>> view,
>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>> highlights the
>>>>>>>>> weird error.
>>>>>>>>>
>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a small code
>>>>>>>> bug in
>>>>>>>>> my queries that's causing issues.  These queries appear to work in
>>>>>>>>> production so I'm confused.  Are there ways of viewing failed jobs
>>>>>>>> or
>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> - Dan
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path reference.  However, the actual log calls are not printing to the console.  Only errors appear in my terminal window and the test logs.  Maybe console logger does not work for this junit setup.  I'll see if the file version works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <[hidden email]> wrote:
What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:
> I'm trying to use Table API for my job.  I'll soon try to get a test
> working for my stream job.
> - I'll parameterize so I can have different sources and sink for tests.
> How should I mock out a Kafka source?  For my test, I was planning on
> changing the input to be from a temp file (instead of Kafka).
> - What's a good way of forcing a watermark using the Table API?
>
>
> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
>
>> Thanks!
>>
>> Great to know.  I copied this junit5-jupiter-starter-bazel
>> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel> rule
>> into my repository (I don't think junit5 is supported directly with
>> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
>> jar and didn't get them to work.  My current iteration hacks the
>> log4j.properties file as an absolute path.  My failed attempts would spit
>> an error saying log4j.properties file was not found.  This route finds it
>> but the log properties are not used for the java logger.
>>
>> Are there a better set of rules to use for junit5?
>>
>> # build rule
>> java_junit5_test(
>>      name = "tests",
>>      srcs = glob(["*.java"]),
>>      test_package = "ai.promoted.logprocessor.batch",
>>      deps = [...],
>>      jvm_flags =
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> )
>>
>> # log4j.properties
>> status = error
>> name = Log4j2PropertiesConfig
>> appenders = console
>> appender.console.type = Console
>> appender.console.name = LogToConsole
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> rootLogger.level = info
>> rootLogger.appenderRefs = stdout
>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>
>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> [hidden email]> wrote:
>>
>>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>
>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> [hidden email]> wrote:
>>>
>>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>>> characteristics, though not sure that applies for batch.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hey Dan,
>>>>>
>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>
>>>>> Happy to help setting up logging with that if you’d like.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
>>>>>
>>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>>> link).
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>
>>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>>> "You can implement a custom parallel source function for emitting
>>>>>> watermarks if your job uses event time timers."
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
>>>>>>
>>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>>> switching to junit4.
>>>>>>>
>>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>>> problem still exists.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>>>> fixed:
>>>>>>>>
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>
>>>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>>>> enable
>>>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>>>>> correctly on the MiniCluster?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>
>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>>>>> sink table
>>>>>>>>> using tmp filesystem directory.
>>>>>>>>> 2) I use executeSql to insert some test data into the source tabel.
>>>>>>>>> 3) I use executeSql to select from source and insert into sink.
>>>>>>>>> 4) I use executeSql from the same source to a different sink.
>>>>>>>>>
>>>>>>>>> When I do these steps, it works.  If I remove step 4, no data gets
>>>>>>>> written
>>>>>>>>> to the sink.  My actual code is more complex than this (has create
>>>>>>>> view,
>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>> highlights the
>>>>>>>>> weird error.
>>>>>>>>>
>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a small code
>>>>>>>> bug in
>>>>>>>>> my queries that's causing issues.  These queries appear to work in
>>>>>>>>> production so I'm confused.  Are there ways of viewing failed jobs
>>>>>>>> or
>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> - Dan
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
Switching to junit4 did not help.

If I make a request to the url returned from MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(), I get 
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]> wrote:
@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path reference.  However, the actual log calls are not printing to the console.  Only errors appear in my terminal window and the test logs.  Maybe console logger does not work for this junit setup.  I'll see if the file version works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <[hidden email]> wrote:
What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:
> I'm trying to use Table API for my job.  I'll soon try to get a test
> working for my stream job.
> - I'll parameterize so I can have different sources and sink for tests.
> How should I mock out a Kafka source?  For my test, I was planning on
> changing the input to be from a temp file (instead of Kafka).
> - What's a good way of forcing a watermark using the Table API?
>
>
> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
>
>> Thanks!
>>
>> Great to know.  I copied this junit5-jupiter-starter-bazel
>> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel> rule
>> into my repository (I don't think junit5 is supported directly with
>> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
>> jar and didn't get them to work.  My current iteration hacks the
>> log4j.properties file as an absolute path.  My failed attempts would spit
>> an error saying log4j.properties file was not found.  This route finds it
>> but the log properties are not used for the java logger.
>>
>> Are there a better set of rules to use for junit5?
>>
>> # build rule
>> java_junit5_test(
>>      name = "tests",
>>      srcs = glob(["*.java"]),
>>      test_package = "ai.promoted.logprocessor.batch",
>>      deps = [...],
>>      jvm_flags =
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> )
>>
>> # log4j.properties
>> status = error
>> name = Log4j2PropertiesConfig
>> appenders = console
>> appender.console.type = Console
>> appender.console.name = LogToConsole
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> rootLogger.level = info
>> rootLogger.appenderRefs = stdout
>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>
>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> [hidden email]> wrote:
>>
>>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>
>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> [hidden email]> wrote:
>>>
>>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>>> characteristics, though not sure that applies for batch.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hey Dan,
>>>>>
>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>
>>>>> Happy to help setting up logging with that if you’d like.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
>>>>>
>>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>>> link).
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>
>>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>>> "You can implement a custom parallel source function for emitting
>>>>>> watermarks if your job uses event time timers."
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
>>>>>>
>>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>>> switching to junit4.
>>>>>>>
>>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>>> problem still exists.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>>>> fixed:
>>>>>>>>
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>
>>>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>>>> enable
>>>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>>>>> correctly on the MiniCluster?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>
>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>>>>> sink table
>>>>>>>>> using tmp filesystem directory.
>>>>>>>>> 2) I use executeSql to insert some test data into the source tabel.
>>>>>>>>> 3) I use executeSql to select from source and insert into sink.
>>>>>>>>> 4) I use executeSql from the same source to a different sink.
>>>>>>>>>
>>>>>>>>> When I do these steps, it works.  If I remove step 4, no data gets
>>>>>>>> written
>>>>>>>>> to the sink.  My actual code is more complex than this (has create
>>>>>>>> view,
>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>> highlights the
>>>>>>>>> weird error.
>>>>>>>>>
>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a small code
>>>>>>>> bug in
>>>>>>>>> my queries that's causing issues.  These queries appear to work in
>>>>>>>>> production so I'm confused.  Are there ways of viewing failed jobs
>>>>>>>> or
>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> - Dan
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
I was able to get finer grained logs showing.  I switched from -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my larger test case, I was hitting a silent log4j error.  When I created a small test case to just test logging, I received a log4j error.

Here is a tar with the info logs for:
- (test-nojoin.log) this one works as expected
- (test-join.log) this does not work as expected

I don't see an obvious issue just by scanning the logs.  I'll take a deeper in 9 hours.




On Wed, Oct 7, 2020 at 8:28 PM Dan Hill <[hidden email]> wrote:
Switching to junit4 did not help.

If I make a request to the url returned from MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(), I get 
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]> wrote:
@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path reference.  However, the actual log calls are not printing to the console.  Only errors appear in my terminal window and the test logs.  Maybe console logger does not work for this junit setup.  I'll see if the file version works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <[hidden email]> wrote:
What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:
> I'm trying to use Table API for my job.  I'll soon try to get a test
> working for my stream job.
> - I'll parameterize so I can have different sources and sink for tests.
> How should I mock out a Kafka source?  For my test, I was planning on
> changing the input to be from a temp file (instead of Kafka).
> - What's a good way of forcing a watermark using the Table API?
>
>
> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
>
>> Thanks!
>>
>> Great to know.  I copied this junit5-jupiter-starter-bazel
>> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel> rule
>> into my repository (I don't think junit5 is supported directly with
>> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
>> jar and didn't get them to work.  My current iteration hacks the
>> log4j.properties file as an absolute path.  My failed attempts would spit
>> an error saying log4j.properties file was not found.  This route finds it
>> but the log properties are not used for the java logger.
>>
>> Are there a better set of rules to use for junit5?
>>
>> # build rule
>> java_junit5_test(
>>      name = "tests",
>>      srcs = glob(["*.java"]),
>>      test_package = "ai.promoted.logprocessor.batch",
>>      deps = [...],
>>      jvm_flags =
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> )
>>
>> # log4j.properties
>> status = error
>> name = Log4j2PropertiesConfig
>> appenders = console
>> appender.console.type = Console
>> appender.console.name = LogToConsole
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> rootLogger.level = info
>> rootLogger.appenderRefs = stdout
>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>
>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> [hidden email]> wrote:
>>
>>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>
>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> [hidden email]> wrote:
>>>
>>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>>> characteristics, though not sure that applies for batch.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hey Dan,
>>>>>
>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>
>>>>> Happy to help setting up logging with that if you’d like.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
>>>>>
>>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>>> link).
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>
>>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>>> "You can implement a custom parallel source function for emitting
>>>>>> watermarks if your job uses event time timers."
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
>>>>>>
>>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>>> switching to junit4.
>>>>>>>
>>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>>> problem still exists.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>>>> fixed:
>>>>>>>>
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>
>>>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>>>> enable
>>>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>>>>> correctly on the MiniCluster?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>
>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>>>>> sink table
>>>>>>>>> using tmp filesystem directory.
>>>>>>>>> 2) I use executeSql to insert some test data into the source tabel.
>>>>>>>>> 3) I use executeSql to select from source and insert into sink.
>>>>>>>>> 4) I use executeSql from the same source to a different sink.
>>>>>>>>>
>>>>>>>>> When I do these steps, it works.  If I remove step 4, no data gets
>>>>>>>> written
>>>>>>>>> to the sink.  My actual code is more complex than this (has create
>>>>>>>> view,
>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>> highlights the
>>>>>>>>> weird error.
>>>>>>>>>
>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a small code
>>>>>>>> bug in
>>>>>>>>> my queries that's causing issues.  These queries appear to work in
>>>>>>>>> production so I'm confused.  Are there ways of viewing failed jobs
>>>>>>>> or
>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> - Dan
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

austin.ce
Can't comment on the SQL issues, but here's our exact setup for Bazel and Junit5 w/ the resource files approach: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit

Best,
Austin

On Thu, Oct 8, 2020 at 2:41 AM Dan Hill <[hidden email]> wrote:
I was able to get finer grained logs showing.  I switched from -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my larger test case, I was hitting a silent log4j error.  When I created a small test case to just test logging, I received a log4j error.

Here is a tar with the info logs for:
- (test-nojoin.log) this one works as expected
- (test-join.log) this does not work as expected

I don't see an obvious issue just by scanning the logs.  I'll take a deeper in 9 hours.




On Wed, Oct 7, 2020 at 8:28 PM Dan Hill <[hidden email]> wrote:
Switching to junit4 did not help.

If I make a request to the url returned from MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(), I get 
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]> wrote:
@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path reference.  However, the actual log calls are not printing to the console.  Only errors appear in my terminal window and the test logs.  Maybe console logger does not work for this junit setup.  I'll see if the file version works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <[hidden email]> wrote:
What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:
> I'm trying to use Table API for my job.  I'll soon try to get a test
> working for my stream job.
> - I'll parameterize so I can have different sources and sink for tests.
> How should I mock out a Kafka source?  For my test, I was planning on
> changing the input to be from a temp file (instead of Kafka).
> - What's a good way of forcing a watermark using the Table API?
>
>
> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
>
>> Thanks!
>>
>> Great to know.  I copied this junit5-jupiter-starter-bazel
>> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel> rule
>> into my repository (I don't think junit5 is supported directly with
>> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
>> jar and didn't get them to work.  My current iteration hacks the
>> log4j.properties file as an absolute path.  My failed attempts would spit
>> an error saying log4j.properties file was not found.  This route finds it
>> but the log properties are not used for the java logger.
>>
>> Are there a better set of rules to use for junit5?
>>
>> # build rule
>> java_junit5_test(
>>      name = "tests",
>>      srcs = glob(["*.java"]),
>>      test_package = "ai.promoted.logprocessor.batch",
>>      deps = [...],
>>      jvm_flags =
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> )
>>
>> # log4j.properties
>> status = error
>> name = Log4j2PropertiesConfig
>> appenders = console
>> appender.console.type = Console
>> appender.console.name = LogToConsole
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> rootLogger.level = info
>> rootLogger.appenderRefs = stdout
>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>
>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> [hidden email]> wrote:
>>
>>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>
>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> [hidden email]> wrote:
>>>
>>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>>> characteristics, though not sure that applies for batch.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hey Dan,
>>>>>
>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>
>>>>> Happy to help setting up logging with that if you’d like.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
>>>>>
>>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>>> link).
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>
>>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>>> "You can implement a custom parallel source function for emitting
>>>>>> watermarks if your job uses event time timers."
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
>>>>>>
>>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>>> switching to junit4.
>>>>>>>
>>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>>> problem still exists.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>>>> fixed:
>>>>>>>>
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>
>>>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>>>> enable
>>>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>>>>> correctly on the MiniCluster?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>
>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>>>>> sink table
>>>>>>>>> using tmp filesystem directory.
>>>>>>>>> 2) I use executeSql to insert some test data into the source tabel.
>>>>>>>>> 3) I use executeSql to select from source and insert into sink.
>>>>>>>>> 4) I use executeSql from the same source to a different sink.
>>>>>>>>>
>>>>>>>>> When I do these steps, it works.  If I remove step 4, no data gets
>>>>>>>> written
>>>>>>>>> to the sink.  My actual code is more complex than this (has create
>>>>>>>> view,
>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>> highlights the
>>>>>>>>> weird error.
>>>>>>>>>
>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a small code
>>>>>>>> bug in
>>>>>>>>> my queries that's causing issues.  These queries appear to work in
>>>>>>>>> production so I'm confused.  Are there ways of viewing failed jobs
>>>>>>>> or
>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> - Dan
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
I figured out the issue.  The join caused part of the job's execution to be delayed.  I added my own hacky wait condition into the test to make sure the join job finishes first and it's fine.

What common test utilities exist for Flink?  I found flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait for jobs to finish.  I'm guessing this can be done with one of the other utilities.

Are there any open source test examples?

How are watermarks usually sent with Table API in tests?

After I collect some answers, I'm fine updating the Flink testing page.

On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <[hidden email]> wrote:
Can't comment on the SQL issues, but here's our exact setup for Bazel and Junit5 w/ the resource files approach: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit

Best,
Austin

On Thu, Oct 8, 2020 at 2:41 AM Dan Hill <[hidden email]> wrote:
I was able to get finer grained logs showing.  I switched from -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my larger test case, I was hitting a silent log4j error.  When I created a small test case to just test logging, I received a log4j error.

Here is a tar with the info logs for:
- (test-nojoin.log) this one works as expected
- (test-join.log) this does not work as expected

I don't see an obvious issue just by scanning the logs.  I'll take a deeper in 9 hours.




On Wed, Oct 7, 2020 at 8:28 PM Dan Hill <[hidden email]> wrote:
Switching to junit4 did not help.

If I make a request to the url returned from MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(), I get 
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]> wrote:
@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path reference.  However, the actual log calls are not printing to the console.  Only errors appear in my terminal window and the test logs.  Maybe console logger does not work for this junit setup.  I'll see if the file version works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <[hidden email]> wrote:
What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:
> I'm trying to use Table API for my job.  I'll soon try to get a test
> working for my stream job.
> - I'll parameterize so I can have different sources and sink for tests.
> How should I mock out a Kafka source?  For my test, I was planning on
> changing the input to be from a temp file (instead of Kafka).
> - What's a good way of forcing a watermark using the Table API?
>
>
> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]> wrote:
>
>> Thanks!
>>
>> Great to know.  I copied this junit5-jupiter-starter-bazel
>> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel> rule
>> into my repository (I don't think junit5 is supported directly with
>> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
>> jar and didn't get them to work.  My current iteration hacks the
>> log4j.properties file as an absolute path.  My failed attempts would spit
>> an error saying log4j.properties file was not found.  This route finds it
>> but the log properties are not used for the java logger.
>>
>> Are there a better set of rules to use for junit5?
>>
>> # build rule
>> java_junit5_test(
>>      name = "tests",
>>      srcs = glob(["*.java"]),
>>      test_package = "ai.promoted.logprocessor.batch",
>>      deps = [...],
>>      jvm_flags =
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> )
>>
>> # log4j.properties
>> status = error
>> name = Log4j2PropertiesConfig
>> appenders = console
>> appender.console.type = Console
>> appender.console.name = LogToConsole
>> appender.console.layout.type = PatternLayout
>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> rootLogger.level = info
>> rootLogger.appenderRefs = stdout
>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>
>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> [hidden email]> wrote:
>>
>>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>
>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>> [hidden email]> wrote:
>>>
>>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>>> characteristics, though not sure that applies for batch.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]:
>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hey Dan,
>>>>>
>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>
>>>>> Happy to help setting up logging with that if you’d like.
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]> wrote:
>>>>>
>>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>>> link).
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>
>>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>>> "You can implement a custom parallel source function for emitting
>>>>>> watermarks if your job uses event time timers."
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]> wrote:
>>>>>>
>>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>>> switching to junit4.
>>>>>>>
>>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>>> problem still exists.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>>>> fixed:
>>>>>>>>
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>    - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>
>>>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>>>> enable
>>>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>>>>>>>> correctly on the MiniCluster?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>
>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a source and
>>>>>>>> sink table
>>>>>>>>> using tmp filesystem directory.
>>>>>>>>> 2) I use executeSql to insert some test data into the source tabel.
>>>>>>>>> 3) I use executeSql to select from source and insert into sink.
>>>>>>>>> 4) I use executeSql from the same source to a different sink.
>>>>>>>>>
>>>>>>>>> When I do these steps, it works.  If I remove step 4, no data gets
>>>>>>>> written
>>>>>>>>> to the sink.  My actual code is more complex than this (has create
>>>>>>>> view,
>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>> highlights the
>>>>>>>>> weird error.
>>>>>>>>>
>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a small code
>>>>>>>> bug in
>>>>>>>>> my queries that's causing issues.  These queries appear to work in
>>>>>>>>> production so I'm confused.  Are there ways of viewing failed jobs
>>>>>>>> or
>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> - Dan
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Aljoscha Krettek
Hi Dan,

did you try using the JobClient you can get from the TableResult to wait
for job completion? You can get a CompletableFuture for the JobResult
which should help you.

Best,
Aljoscha

On 08.10.20 23:55, Dan Hill wrote:

> I figured out the issue.  The join caused part of the job's execution to be
> delayed.  I added my own hacky wait condition into the test to make sure
> the join job finishes first and it's fine.
>
> What common test utilities exist for Flink?  I found
> flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
> for jobs to finish.  I'm guessing this can be done with one of the other
> utilities.
>
> Are there any open source test examples?
>
> How are watermarks usually sent with Table API in tests?
>
> After I collect some answers, I'm fine updating the Flink testing page.
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
>
> On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
> [hidden email]> wrote:
>
>> Can't comment on the SQL issues, but here's our exact setup for Bazel and
>> Junit5 w/ the resource files approach:
>> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
>>
>> Best,
>> Austin
>>
>> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill <[hidden email]> wrote:
>>
>>> I was able to get finer grained logs showing.  I switched from
>>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
>>> larger test case, I was hitting a silent log4j error.  When I created a
>>> small test case to just test logging, I received a log4j error.
>>>
>>> Here is a tar
>>> <https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing>
>>> with the info logs for:
>>> - (test-nojoin.log) this one works as expected
>>> - (test-join.log) this does not work as expected
>>>
>>> I don't see an obvious issue just by scanning the logs.  I'll take a
>>> deeper in 9 hours.
>>>
>>>
>>>
>>>
>>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill <[hidden email]> wrote:
>>>
>>>> Switching to junit4 did not help.
>>>>
>>>> If I make a request to the url returned from
>>>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>>>> I get
>>>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]> wrote:
>>>>
>>>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
>>>>> reference.  However, the actual log calls are not printing to the console.
>>>>> Only errors appear in my terminal window and the test logs.  Maybe console
>>>>> logger does not work for this junit setup.  I'll see if the file version
>>>>> works.
>>>>>
>>>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> What Aljoscha suggested is what works for us!
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Dan,
>>>>>>>
>>>>>>> to make the log properties file work this should do it: assuming the
>>>>>>> log4j.properties is in //src/main/resources. You will need a
>>>>>>> BUILD.bazel
>>>>>>> in that directory that has only the line
>>>>>>> "exports_files(["log4j.properties"]). Then you can reference it in
>>>>>>> your
>>>>>>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>>>>>>> course you also need to have the right log4j deps (or slf4j if you're
>>>>>>> using that)
>>>>>>>
>>>>>>> Hope that helps!
>>>>>>>
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 07.10.20 00:41, Dan Hill wrote:
>>>>>>>> I'm trying to use Table API for my job.  I'll soon try to get a test
>>>>>>>> working for my stream job.
>>>>>>>> - I'll parameterize so I can have different sources and sink for
>>>>>>> tests.
>>>>>>>> How should I mock out a Kafka source?  For my test, I was planning
>>>>>>> on
>>>>>>>> changing the input to be from a temp file (instead of Kafka).
>>>>>>>> - What's a good way of forcing a watermark using the Table API?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> Great to know.  I copied this junit5-jupiter-starter-bazel
>>>>>>>>> <
>>>>>>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>>>>>>> rule
>>>>>>>>> into my repository (I don't think junit5 is supported directly with
>>>>>>>>> java_test yet).  I tried a few ways of bundling `log4j.properties`
>>>>>>> into the
>>>>>>>>> jar and didn't get them to work.  My current iteration hacks the
>>>>>>>>> log4j.properties file as an absolute path.  My failed attempts
>>>>>>> would spit
>>>>>>>>> an error saying log4j.properties file was not found.  This route
>>>>>>> finds it
>>>>>>>>> but the log properties are not used for the java logger.
>>>>>>>>>
>>>>>>>>> Are there a better set of rules to use for junit5?
>>>>>>>>>
>>>>>>>>> # build rule
>>>>>>>>> java_junit5_test(
>>>>>>>>>       name = "tests",
>>>>>>>>>       srcs = glob(["*.java"]),
>>>>>>>>>       test_package = "ai.promoted.logprocessor.batch",
>>>>>>>>>       deps = [...],
>>>>>>>>>       jvm_flags =
>>>>>>>>>
>>>>>>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>>>>>>>>> )
>>>>>>>>>
>>>>>>>>> # log4j.properties
>>>>>>>>> status = error
>>>>>>>>> name = Log4j2PropertiesConfig
>>>>>>>>> appenders = console
>>>>>>>>> appender.console.type = Console
>>>>>>>>> appender.console.name = LogToConsole
>>>>>>>>> appender.console.layout.type = PatternLayout
>>>>>>>>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>>>>>>>>> rootLogger.level = info
>>>>>>>>> rootLogger.appenderRefs = stdout
>>>>>>>>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>
>>>>>>>>>> Oops, this is actually the JOIN issue thread [1]. Guess I should
>>>>>>> revise
>>>>>>>>>> my previous "haven't had issues" statement hah. Sorry for the
>>>>>>> spam!
>>>>>>>>>>
>>>>>>>>>> [1]:
>>>>>>>>>>
>>>>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Unless it's related to this issue[1], which was w/ my JOIN and
>>>>>>> time
>>>>>>>>>>> characteristics, though not sure that applies for batch.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Austin
>>>>>>>>>>>
>>>>>>>>>>> [1]:
>>>>>>>>>>>
>>>>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Dan,
>>>>>>>>>>>>
>>>>>>>>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini
>>>>>>> cluster and
>>>>>>>>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>>>>>>>>
>>>>>>>>>>>> Happy to help setting up logging with that if you’d like.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Austin
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think any of the gotchas apply to me (at the bottom of
>>>>>>> this
>>>>>>>>>>>>> link).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm assuming for a batch job that I don't have to do anything
>>>>>>> for:
>>>>>>>>>>>>> "You can implement a custom parallel source function for
>>>>>>> emitting
>>>>>>>>>>>>> watermarks if your job uses event time timers."
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've tried to enable additional logging for a few hours
>>>>>>> today.  I
>>>>>>>>>>>>>> think something with junit5 is swallowing the logs.  I'm
>>>>>>> using Bazel and
>>>>>>>>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a
>>>>>>> custom
>>>>>>>>>>>>>> extension.  Are there any known issues with Flink and
>>>>>>> junit5?  I can try
>>>>>>>>>>>>>> switching to junit4.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When I've binary searched this issue, this failure happens if
>>>>>>> my
>>>>>>>>>>>>>> query in step 3 has a join it.  If I remove the join, I can
>>>>>>> remove step 4
>>>>>>>>>>>>>> and the code still works.  I've renamed a bunch of my tables
>>>>>>> too and the
>>>>>>>>>>>>>> problem still exists.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <
>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> there were some bugs and quirks in the MiniCluster that we
>>>>>>> recently
>>>>>>>>>>>>>>> fixed:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>>>>>>>>     - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But I think they are probably unrelated to your case. Could
>>>>>>> you
>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>> logging and see from the logs whether the 2) and 3) jobs
>>>>>>> execute
>>>>>>>>>>>>>>> correctly on the MiniCluster?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a
>>>>>>> source and
>>>>>>>>>>>>>>> sink table
>>>>>>>>>>>>>>>> using tmp filesystem directory.
>>>>>>>>>>>>>>>> 2) I use executeSql to insert some test data into the
>>>>>>> source tabel.
>>>>>>>>>>>>>>>> 3) I use executeSql to select from source and insert into
>>>>>>> sink.
>>>>>>>>>>>>>>>> 4) I use executeSql from the same source to a different
>>>>>>> sink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When I do these steps, it works.  If I remove step 4, no
>>>>>>> data gets
>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>> to the sink.  My actual code is more complex than this (has
>>>>>>> create
>>>>>>>>>>>>>>> view,
>>>>>>>>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>>>>>>>>> highlights the
>>>>>>>>>>>>>>>> weird error.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a
>>>>>>> small code
>>>>>>>>>>>>>>> bug in
>>>>>>>>>>>>>>>> my queries that's causing issues.  These queries appear to
>>>>>>> work in
>>>>>>>>>>>>>>>> production so I'm confused.  Are there ways of viewing
>>>>>>> failed jobs
>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>> - Dan
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>

Dan
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Dan
No, thanks!  I used JobClient to getJobStatus and sleep if it was not terminal.  I'll switch to this. 


On Sat, Oct 10, 2020 at 12:50 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

did you try using the JobClient you can get from the TableResult to wait
for job completion? You can get a CompletableFuture for the JobResult
which should help you.

Best,
Aljoscha

On 08.10.20 23:55, Dan Hill wrote:
> I figured out the issue.  The join caused part of the job's execution to be
> delayed.  I added my own hacky wait condition into the test to make sure
> the join job finishes first and it's fine.
>
> What common test utilities exist for Flink?  I found
> flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
> for jobs to finish.  I'm guessing this can be done with one of the other
> utilities.
>
> Are there any open source test examples?
>
> How are watermarks usually sent with Table API in tests?
>
> After I collect some answers, I'm fine updating the Flink testing page.
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
>
> On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
> [hidden email]> wrote:
>
>> Can't comment on the SQL issues, but here's our exact setup for Bazel and
>> Junit5 w/ the resource files approach:
>> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
>>
>> Best,
>> Austin
>>
>> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill <[hidden email]> wrote:
>>
>>> I was able to get finer grained logs showing.  I switched from
>>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
>>> larger test case, I was hitting a silent log4j error.  When I created a
>>> small test case to just test logging, I received a log4j error.
>>>
>>> Here is a tar
>>> <https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing>
>>> with the info logs for:
>>> - (test-nojoin.log) this one works as expected
>>> - (test-join.log) this does not work as expected
>>>
>>> I don't see an obvious issue just by scanning the logs.  I'll take a
>>> deeper in 9 hours.
>>>
>>>
>>>
>>>
>>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill <[hidden email]> wrote:
>>>
>>>> Switching to junit4 did not help.
>>>>
>>>> If I make a request to the url returned from
>>>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>>>> I get
>>>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]> wrote:
>>>>
>>>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
>>>>> reference.  However, the actual log calls are not printing to the console.
>>>>> Only errors appear in my terminal window and the test logs.  Maybe console
>>>>> logger does not work for this junit setup.  I'll see if the file version
>>>>> works.
>>>>>
>>>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> What Aljoscha suggested is what works for us!
>>>>>>
>>>>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Dan,
>>>>>>>
>>>>>>> to make the log properties file work this should do it: assuming the
>>>>>>> log4j.properties is in //src/main/resources. You will need a
>>>>>>> BUILD.bazel
>>>>>>> in that directory that has only the line
>>>>>>> "exports_files(["log4j.properties"]). Then you can reference it in
>>>>>>> your
>>>>>>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>>>>>>> course you also need to have the right log4j deps (or slf4j if you're
>>>>>>> using that)
>>>>>>>
>>>>>>> Hope that helps!
>>>>>>>
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 07.10.20 00:41, Dan Hill wrote:
>>>>>>>> I'm trying to use Table API for my job.  I'll soon try to get a test
>>>>>>>> working for my stream job.
>>>>>>>> - I'll parameterize so I can have different sources and sink for
>>>>>>> tests.
>>>>>>>> How should I mock out a Kafka source?  For my test, I was planning
>>>>>>> on
>>>>>>>> changing the input to be from a temp file (instead of Kafka).
>>>>>>>> - What's a good way of forcing a watermark using the Table API?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> Great to know.  I copied this junit5-jupiter-starter-bazel
>>>>>>>>> <
>>>>>>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>>>>>>> rule
>>>>>>>>> into my repository (I don't think junit5 is supported directly with
>>>>>>>>> java_test yet).  I tried a few ways of bundling `log4j.properties`
>>>>>>> into the
>>>>>>>>> jar and didn't get them to work.  My current iteration hacks the
>>>>>>>>> log4j.properties file as an absolute path.  My failed attempts
>>>>>>> would spit
>>>>>>>>> an error saying log4j.properties file was not found.  This route
>>>>>>> finds it
>>>>>>>>> but the log properties are not used for the java logger.
>>>>>>>>>
>>>>>>>>> Are there a better set of rules to use for junit5?
>>>>>>>>>
>>>>>>>>> # build rule
>>>>>>>>> java_junit5_test(
>>>>>>>>>       name = "tests",
>>>>>>>>>       srcs = glob(["*.java"]),
>>>>>>>>>       test_package = "ai.promoted.logprocessor.batch",
>>>>>>>>>       deps = [...],
>>>>>>>>>       jvm_flags =
>>>>>>>>>
>>>>>>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>>>>>>>>> )
>>>>>>>>>
>>>>>>>>> # log4j.properties
>>>>>>>>> status = error
>>>>>>>>> name = Log4j2PropertiesConfig
>>>>>>>>> appenders = console
>>>>>>>>> appender.console.type = Console
>>>>>>>>> appender.console.name = LogToConsole
>>>>>>>>> appender.console.layout.type = PatternLayout
>>>>>>>>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>>>>>>>>> rootLogger.level = info
>>>>>>>>> rootLogger.appenderRefs = stdout
>>>>>>>>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>
>>>>>>>>>> Oops, this is actually the JOIN issue thread [1]. Guess I should
>>>>>>> revise
>>>>>>>>>> my previous "haven't had issues" statement hah. Sorry for the
>>>>>>> spam!
>>>>>>>>>>
>>>>>>>>>> [1]:
>>>>>>>>>>
>>>>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Unless it's related to this issue[1], which was w/ my JOIN and
>>>>>>> time
>>>>>>>>>>> characteristics, though not sure that applies for batch.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Austin
>>>>>>>>>>>
>>>>>>>>>>> [1]:
>>>>>>>>>>>
>>>>>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Dan,
>>>>>>>>>>>>
>>>>>>>>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini
>>>>>>> cluster and
>>>>>>>>>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>>>>>>>>>
>>>>>>>>>>>> Happy to help setting up logging with that if you’d like.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Austin
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think any of the gotchas apply to me (at the bottom of
>>>>>>> this
>>>>>>>>>>>>> link).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm assuming for a batch job that I don't have to do anything
>>>>>>> for:
>>>>>>>>>>>>> "You can implement a custom parallel source function for
>>>>>>> emitting
>>>>>>>>>>>>> watermarks if your job uses event time timers."
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've tried to enable additional logging for a few hours
>>>>>>> today.  I
>>>>>>>>>>>>>> think something with junit5 is swallowing the logs.  I'm
>>>>>>> using Bazel and
>>>>>>>>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a
>>>>>>> custom
>>>>>>>>>>>>>> extension.  Are there any known issues with Flink and
>>>>>>> junit5?  I can try
>>>>>>>>>>>>>> switching to junit4.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When I've binary searched this issue, this failure happens if
>>>>>>> my
>>>>>>>>>>>>>> query in step 3 has a join it.  If I remove the join, I can
>>>>>>> remove step 4
>>>>>>>>>>>>>> and the code still works.  I've renamed a bunch of my tables
>>>>>>> too and the
>>>>>>>>>>>>>> problem still exists.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <
>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> there were some bugs and quirks in the MiniCluster that we
>>>>>>> recently
>>>>>>>>>>>>>>> fixed:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>>>>>>>>     - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But I think they are probably unrelated to your case. Could
>>>>>>> you
>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>> logging and see from the logs whether the 2) and 3) jobs
>>>>>>> execute
>>>>>>>>>>>>>>> correctly on the MiniCluster?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a
>>>>>>> source and
>>>>>>>>>>>>>>> sink table
>>>>>>>>>>>>>>>> using tmp filesystem directory.
>>>>>>>>>>>>>>>> 2) I use executeSql to insert some test data into the
>>>>>>> source tabel.
>>>>>>>>>>>>>>>> 3) I use executeSql to select from source and insert into
>>>>>>> sink.
>>>>>>>>>>>>>>>> 4) I use executeSql from the same source to a different
>>>>>>> sink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When I do these steps, it works.  If I remove step 4, no
>>>>>>> data gets
>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>> to the sink.  My actual code is more complex than this (has
>>>>>>> create
>>>>>>>>>>>>>>> view,
>>>>>>>>>>>>>>>> join and more tables).  This is a simplified description but
>>>>>>>>>>>>>>> highlights the
>>>>>>>>>>>>>>>> weird error.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a
>>>>>>> small code
>>>>>>>>>>>>>>> bug in
>>>>>>>>>>>>>>>> my queries that's causing issues.  These queries appear to
>>>>>>> work in
>>>>>>>>>>>>>>>> production so I'm confused.  Are there ways of viewing
>>>>>>> failed jobs
>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>> - Dan
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Till Rohrmann
Hi Dan,

I think it is a good idea to use an exponential backoff strategy in the RpcGatewayRetriever. So from my side you can open an issue and a PR for fixing it.

Cheers,
Till

On Fri, Oct 16, 2020 at 7:24 PM Dan Hill <[hidden email]> wrote:
To be clear, I'd be fine coding this.

On Fri, Oct 16, 2020 at 9:35 AM Dan Hill <[hidden email]> wrote:
Makes sense.  Thanks for the details!

I just looked into it.  It's this code in this diff from ~2 years ago.

+Till - would you be fine if we change this?  Context: I was able to speed up my test by writing my own future.  I think 20ms retry is long when the test is simple.  One idea is to introduce an exponential backoff up to some max (e.g. start at 2ms, 4ms, 8ms, 16ms, 20ms, 20ms).  

dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
    commonRpcService,
    DispatcherGateway.class,
    DispatcherId::fromUuid,
    20,
    Time.milliseconds(20L));
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
    commonRpcService,
    ResourceManagerGateway.class,
    ResourceManagerId::fromUuid,
    20,
    Time.milliseconds(20L));


On Fri, Oct 16, 2020 at 2:04 AM Aljoscha Krettek <[hidden email]> wrote:
I think it's because there's just so many layers involved and so many
futures that this passes through that it takes some time. Plus it's not
a critical path so no-one took the time to optimize it.

There's no one single person that I could point you towards because it's
a mix of older code and newer code that has grown over time and
different people have worked on it.

On 15.10.20 01:56, Dan Hill wrote:
> I think it was MiniClusterJobClient.
>
> Each wait call slowed down by about 20-40ms.  It added about 3% to my total
> test runtime.  I ended up using getJobStatus because it's cleaner (better
> than optimizing for latency now).  I was curious what the future
> implementation did (e.g. does it just sleep?) and if it's configurable.
>
> On Tue, Oct 13, 2020 at 5:28 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> That's interesting!
>>
>> Which JobClient implementation is being used underneath? You're using
>> the MiniCluster resource so it should be MiniClusterJobClient (or
>> PerJobMiniClusterClient as it's called prior to Flink 1.12), right?
>>
>> Also, what does that second mean percentage-wise? Is it more a 1 second
>> improvement on a 60 second total runtime or 10 second total runtime.
>>
>>
>> On 11.10.20 22:17, Dan Hill wrote:
>>> -others.  Any idea who wrote this futures code?  I'm curious how it's
>>> implemented.  My sleep version seems to finish my tests a few tens of
>>> milliseconds faster per call (my overall test suite runs a second
>> faster).
>>> I tried diving deeper into this last night but, once I got a few layers
>>> deeper, it made sense to ask about it.
>>>
>>> On Sat, Oct 10, 2020 at 10:37 AM Dan Hill <[hidden email]> wrote:
>>>
>>>> No, thanks!  I used JobClient to getJobStatus and sleep if it was not
>>>> terminal.  I'll switch to this.
>>>>
>>>>
>>>> On Sat, Oct 10, 2020 at 12:50 AM Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> did you try using the JobClient you can get from the TableResult to
>> wait
>>>>> for job completion? You can get a CompletableFuture for the JobResult
>>>>> which should help you.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 08.10.20 23:55, Dan Hill wrote:
>>>>>> I figured out the issue.  The join caused part of the job's execution
>>>>> to be
>>>>>> delayed.  I added my own hacky wait condition into the test to make
>> sure
>>>>>> the join job finishes first and it's fine.
>>>>>>
>>>>>> What common test utilities exist for Flink?  I found
>>>>>> flink/flink-test-utils-parent.  I implemented a simple sleep loop to
>>>>> wait
>>>>>> for jobs to finish.  I'm guessing this can be done with one of the
>> other
>>>>>> utilities.
>>>>>>
>>>>>> Are there any open source test examples?
>>>>>>
>>>>>> How are watermarks usually sent with Table API in tests?
>>>>>>
>>>>>> After I collect some answers, I'm fine updating the Flink testing
>> page.
>>>>>>
>>>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
>>>>>> [hidden email]> wrote:
>>>>>>
>>>>>>> Can't comment on the SQL issues, but here's our exact setup for Bazel
>>>>> and
>>>>>>> Junit5 w/ the resource files approach:
>>>>>>>
>>>>>
>> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
>>>>>>>
>>>>>>> Best,
>>>>>>> Austin
>>>>>>>
>>>>>>> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill <[hidden email]>
>> wrote:
>>>>>>>
>>>>>>>> I was able to get finer grained logs showing.  I switched from
>>>>>>>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.
>>>>> With my
>>>>>>>> larger test case, I was hitting a silent log4j error.  When I
>> created
>>>>> a
>>>>>>>> small test case to just test logging, I received a log4j error.
>>>>>>>>
>>>>>>>> Here is a tar
>>>>>>>> <
>>>>>
>> https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing
>>>>>>
>>>>>>>> with the info logs for:
>>>>>>>> - (test-nojoin.log) this one works as expected
>>>>>>>> - (test-join.log) this does not work as expected
>>>>>>>>
>>>>>>>> I don't see an obvious issue just by scanning the logs.  I'll take a
>>>>>>>> deeper in 9 hours.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill <[hidden email]>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Switching to junit4 did not help.
>>>>>>>>>
>>>>>>>>> If I make a request to the url returned from
>>>>>>>>>
>>>>>
>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>>>>>>>>> I get
>>>>>>>>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute
>> path
>>>>>>>>>> reference.  However, the actual log calls are not printing to the
>>>>> console.
>>>>>>>>>> Only errors appear in my terminal window and the test logs.  Maybe
>>>>> console
>>>>>>>>>> logger does not work for this junit setup.  I'll see if the file
>>>>> version
>>>>>>>>>> works.
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> What Aljoscha suggested is what works for us!
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <
>>>>> [hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>
>>>>>>>>>>>> to make the log properties file work this should do it: assuming
>>>>> the
>>>>>>>>>>>> log4j.properties is in //src/main/resources. You will need a
>>>>>>>>>>>> BUILD.bazel
>>>>>>>>>>>> in that directory that has only the line
>>>>>>>>>>>> "exports_files(["log4j.properties"]). Then you can reference it
>> in
>>>>>>>>>>>> your
>>>>>>>>>>>> test via "resources =
>>>>> ["//src/main/resources:log4j.properties"],". Of
>>>>>>>>>>>> course you also need to have the right log4j deps (or slf4j if
>>>>> you're
>>>>>>>>>>>> using that)
>>>>>>>>>>>>
>>>>>>>>>>>> Hope that helps!
>>>>>>>>>>>>
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>
>>>>>>>>>>>> On 07.10.20 00:41, Dan Hill wrote:
>>>>>>>>>>>>> I'm trying to use Table API for my job.  I'll soon try to get a
>>>>> test
>>>>>>>>>>>>> working for my stream job.
>>>>>>>>>>>>> - I'll parameterize so I can have different sources and sink
>> for
>>>>>>>>>>>> tests.
>>>>>>>>>>>>> How should I mock out a Kafka source?  For my test, I was
>>>>> planning
>>>>>>>>>>>> on
>>>>>>>>>>>>> changing the input to be from a temp file (instead of Kafka).
>>>>>>>>>>>>> - What's a good way of forcing a watermark using the Table API?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]
>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Great to know.  I copied this junit5-jupiter-starter-bazel
>>>>>>>>>>>>>> <
>>>>>>>>>>>>
>>>>>
>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel
>>>>>>
>>>>>>>>>>>> rule
>>>>>>>>>>>>>> into my repository (I don't think junit5 is supported directly
>>>>> with
>>>>>>>>>>>>>> java_test yet).  I tried a few ways of bundling
>>>>> `log4j.properties`
>>>>>>>>>>>> into the
>>>>>>>>>>>>>> jar and didn't get them to work.  My current iteration hacks
>> the
>>>>>>>>>>>>>> log4j.properties file as an absolute path.  My failed attempts
>>>>>>>>>>>> would spit
>>>>>>>>>>>>>> an error saying log4j.properties file was not found.  This
>> route
>>>>>>>>>>>> finds it
>>>>>>>>>>>>>> but the log properties are not used for the java logger.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are there a better set of rules to use for junit5?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> # build rule
>>>>>>>>>>>>>> java_junit5_test(
>>>>>>>>>>>>>>         name = "tests",
>>>>>>>>>>>>>>         srcs = glob(["*.java"]),
>>>>>>>>>>>>>>         test_package = "ai.promoted.logprocessor.batch",
>>>>>>>>>>>>>>         deps = [...],
>>>>>>>>>>>>>>         jvm_flags =
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> # log4j.properties
>>>>>>>>>>>>>> status = error
>>>>>>>>>>>>>> name = Log4j2PropertiesConfig
>>>>>>>>>>>>>> appenders = console
>>>>>>>>>>>>>> appender.console.type = Console
>>>>>>>>>>>>>> appender.console.name = LogToConsole
>>>>>>>>>>>>>> appender.console.layout.type = PatternLayout
>>>>>>>>>>>>>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>>>>>>>>>>>>>> rootLogger.level = info
>>>>>>>>>>>>>> rootLogger.appenderRefs = stdout
>>>>>>>>>>>>>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Oops, this is actually the JOIN issue thread [1]. Guess I
>>>>> should
>>>>>>>>>>>> revise
>>>>>>>>>>>>>>> my previous "haven't had issues" statement hah. Sorry for the
>>>>>>>>>>>> spam!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Unless it's related to this issue[1], which was w/ my JOIN
>> and
>>>>>>>>>>>> time
>>>>>>>>>>>>>>>> characteristics, though not sure that applies for batch.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey Dan,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini
>>>>>>>>>>>> cluster and
>>>>>>>>>>>>>>>>> haven’t had issues, though we’re only testing on streaming
>>>>> jobs.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Happy to help setting up logging with that if you’d like.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <
>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I don't think any of the gotchas apply to me (at the
>> bottom
>>>>> of
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> link).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I'm assuming for a batch job that I don't have to do
>>>>> anything
>>>>>>>>>>>> for:
>>>>>>>>>>>>>>>>>> "You can implement a custom parallel source function for
>>>>>>>>>>>> emitting
>>>>>>>>>>>>>>>>>> watermarks if your job uses event time timers."
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <
>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I've tried to enable additional logging for a few hours
>>>>>>>>>>>> today.  I
>>>>>>>>>>>>>>>>>>> think something with junit5 is swallowing the logs.  I'm
>>>>>>>>>>>> using Bazel and
>>>>>>>>>>>>>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a
>>>>>>>>>>>> custom
>>>>>>>>>>>>>>>>>>> extension.  Are there any known issues with Flink and
>>>>>>>>>>>> junit5?  I can try
>>>>>>>>>>>>>>>>>>> switching to junit4.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When I've binary searched this issue, this failure
>> happens
>>>>> if
>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>> query in step 3 has a join it.  If I remove the join, I
>> can
>>>>>>>>>>>> remove step 4
>>>>>>>>>>>>>>>>>>> and the code still works.  I've renamed a bunch of my
>>>>> tables
>>>>>>>>>>>> too and the
>>>>>>>>>>>>>>>>>>> problem still exists.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <
>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> there were some bugs and quirks in the MiniCluster that
>> we
>>>>>>>>>>>> recently
>>>>>>>>>>>>>>>>>>>> fixed:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>       -
>> https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>>>>>>>>>>>>>       -
>> https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> But I think they are probably unrelated to your case.
>>>>> Could
>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>>>>> logging and see from the logs whether the 2) and 3) jobs
>>>>>>>>>>>> execute
>>>>>>>>>>>>>>>>>>>> correctly on the MiniCluster?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>>>>>>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>>>>>>>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>>>>>>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a
>>>>>>>>>>>> source and
>>>>>>>>>>>>>>>>>>>> sink table
>>>>>>>>>>>>>>>>>>>>> using tmp filesystem directory.
>>>>>>>>>>>>>>>>>>>>> 2) I use executeSql to insert some test data into the
>>>>>>>>>>>> source tabel.
>>>>>>>>>>>>>>>>>>>>> 3) I use executeSql to select from source and insert
>> into
>>>>>>>>>>>> sink.
>>>>>>>>>>>>>>>>>>>>> 4) I use executeSql from the same source to a different
>>>>>>>>>>>> sink.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> When I do these steps, it works.  If I remove step 4,
>> no
>>>>>>>>>>>> data gets
>>>>>>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>>>>>>> to the sink.  My actual code is more complex than this
>>>>> (has
>>>>>>>>>>>> create
>>>>>>>>>>>>>>>>>>>> view,
>>>>>>>>>>>>>>>>>>>>> join and more tables).  This is a simplified
>> description
>>>>> but
>>>>>>>>>>>>>>>>>>>> highlights the
>>>>>>>>>>>>>>>>>>>>> weird error.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a
>>>>>>>>>>>> small code
>>>>>>>>>>>>>>>>>>>> bug in
>>>>>>>>>>>>>>>>>>>>> my queries that's causing issues.  These queries appear
>>>>> to
>>>>>>>>>>>> work in
>>>>>>>>>>>>>>>>>>>>> production so I'm confused.  Are there ways of viewing
>>>>>>>>>>>> failed jobs
>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>> - Dan
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

Till Rohrmann
Alright. Thanks for letting me know. I will take a look at the PR.

Cheers,
Till

On Tue, Oct 20, 2020 at 8:00 PM Dan Hill <[hidden email]> wrote:
-others.

I can't add you as a watcher to the issue (I lack permissions).  https://issues.apache.org/jira/projects/FLINK/issues/FLINK-19721?filter=reportedbyme

I created a PR.



On Fri, Oct 16, 2020 at 10:40 AM Till Rohrmann <[hidden email]> wrote:
Hi Dan,

I think it is a good idea to use an exponential backoff strategy in the RpcGatewayRetriever. So from my side you can open an issue and a PR for fixing it.

Cheers,
Till

On Fri, Oct 16, 2020 at 7:24 PM Dan Hill <[hidden email]> wrote:
To be clear, I'd be fine coding this.

On Fri, Oct 16, 2020 at 9:35 AM Dan Hill <[hidden email]> wrote:
Makes sense.  Thanks for the details!

I just looked into it.  It's this code in this diff from ~2 years ago.

+Till - would you be fine if we change this?  Context: I was able to speed up my test by writing my own future.  I think 20ms retry is long when the test is simple.  One idea is to introduce an exponential backoff up to some max (e.g. start at 2ms, 4ms, 8ms, 16ms, 20ms, 20ms).  

dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
    commonRpcService,
    DispatcherGateway.class,
    DispatcherId::fromUuid,
    20,
    Time.milliseconds(20L));
resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
    commonRpcService,
    ResourceManagerGateway.class,
    ResourceManagerId::fromUuid,
    20,
    Time.milliseconds(20L));


On Fri, Oct 16, 2020 at 2:04 AM Aljoscha Krettek <[hidden email]> wrote:
I think it's because there's just so many layers involved and so many
futures that this passes through that it takes some time. Plus it's not
a critical path so no-one took the time to optimize it.

There's no one single person that I could point you towards because it's
a mix of older code and newer code that has grown over time and
different people have worked on it.

On 15.10.20 01:56, Dan Hill wrote:
> I think it was MiniClusterJobClient.
>
> Each wait call slowed down by about 20-40ms.  It added about 3% to my total
> test runtime.  I ended up using getJobStatus because it's cleaner (better
> than optimizing for latency now).  I was curious what the future
> implementation did (e.g. does it just sleep?) and if it's configurable.
>
> On Tue, Oct 13, 2020 at 5:28 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> That's interesting!
>>
>> Which JobClient implementation is being used underneath? You're using
>> the MiniCluster resource so it should be MiniClusterJobClient (or
>> PerJobMiniClusterClient as it's called prior to Flink 1.12), right?
>>
>> Also, what does that second mean percentage-wise? Is it more a 1 second
>> improvement on a 60 second total runtime or 10 second total runtime.
>>
>>
>> On 11.10.20 22:17, Dan Hill wrote:
>>> -others.  Any idea who wrote this futures code?  I'm curious how it's
>>> implemented.  My sleep version seems to finish my tests a few tens of
>>> milliseconds faster per call (my overall test suite runs a second
>> faster).
>>> I tried diving deeper into this last night but, once I got a few layers
>>> deeper, it made sense to ask about it.
>>>
>>> On Sat, Oct 10, 2020 at 10:37 AM Dan Hill <[hidden email]> wrote:
>>>
>>>> No, thanks!  I used JobClient to getJobStatus and sleep if it was not
>>>> terminal.  I'll switch to this.
>>>>
>>>>
>>>> On Sat, Oct 10, 2020 at 12:50 AM Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> did you try using the JobClient you can get from the TableResult to
>> wait
>>>>> for job completion? You can get a CompletableFuture for the JobResult
>>>>> which should help you.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 08.10.20 23:55, Dan Hill wrote:
>>>>>> I figured out the issue.  The join caused part of the job's execution
>>>>> to be
>>>>>> delayed.  I added my own hacky wait condition into the test to make
>> sure
>>>>>> the join job finishes first and it's fine.
>>>>>>
>>>>>> What common test utilities exist for Flink?  I found
>>>>>> flink/flink-test-utils-parent.  I implemented a simple sleep loop to
>>>>> wait
>>>>>> for jobs to finish.  I'm guessing this can be done with one of the
>> other
>>>>>> utilities.
>>>>>>
>>>>>> Are there any open source test examples?
>>>>>>
>>>>>> How are watermarks usually sent with Table API in tests?
>>>>>>
>>>>>> After I collect some answers, I'm fine updating the Flink testing
>> page.
>>>>>>
>>>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
>>>>>> [hidden email]> wrote:
>>>>>>
>>>>>>> Can't comment on the SQL issues, but here's our exact setup for Bazel
>>>>> and
>>>>>>> Junit5 w/ the resource files approach:
>>>>>>>
>>>>>
>> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
>>>>>>>
>>>>>>> Best,
>>>>>>> Austin
>>>>>>>
>>>>>>> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill <[hidden email]>
>> wrote:
>>>>>>>
>>>>>>>> I was able to get finer grained logs showing.  I switched from
>>>>>>>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.
>>>>> With my
>>>>>>>> larger test case, I was hitting a silent log4j error.  When I
>> created
>>>>> a
>>>>>>>> small test case to just test logging, I received a log4j error.
>>>>>>>>
>>>>>>>> Here is a tar
>>>>>>>> <
>>>>>
>> https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing
>>>>>>
>>>>>>>> with the info logs for:
>>>>>>>> - (test-nojoin.log) this one works as expected
>>>>>>>> - (test-join.log) this does not work as expected
>>>>>>>>
>>>>>>>> I don't see an obvious issue just by scanning the logs.  I'll take a
>>>>>>>> deeper in 9 hours.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill <[hidden email]>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Switching to junit4 did not help.
>>>>>>>>>
>>>>>>>>> If I make a request to the url returned from
>>>>>>>>>
>>>>>
>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>>>>>>>>> I get
>>>>>>>>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill <[hidden email]>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute
>> path
>>>>>>>>>> reference.  However, the actual log calls are not printing to the
>>>>> console.
>>>>>>>>>> Only errors appear in my terminal window and the test logs.  Maybe
>>>>> console
>>>>>>>>>> logger does not work for this junit setup.  I'll see if the file
>>>>> version
>>>>>>>>>> works.
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> What Aljoscha suggested is what works for us!
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <
>>>>> [hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>
>>>>>>>>>>>> to make the log properties file work this should do it: assuming
>>>>> the
>>>>>>>>>>>> log4j.properties is in //src/main/resources. You will need a
>>>>>>>>>>>> BUILD.bazel
>>>>>>>>>>>> in that directory that has only the line
>>>>>>>>>>>> "exports_files(["log4j.properties"]). Then you can reference it
>> in
>>>>>>>>>>>> your
>>>>>>>>>>>> test via "resources =
>>>>> ["//src/main/resources:log4j.properties"],". Of
>>>>>>>>>>>> course you also need to have the right log4j deps (or slf4j if
>>>>> you're
>>>>>>>>>>>> using that)
>>>>>>>>>>>>
>>>>>>>>>>>> Hope that helps!
>>>>>>>>>>>>
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>
>>>>>>>>>>>> On 07.10.20 00:41, Dan Hill wrote:
>>>>>>>>>>>>> I'm trying to use Table API for my job.  I'll soon try to get a
>>>>> test
>>>>>>>>>>>>> working for my stream job.
>>>>>>>>>>>>> - I'll parameterize so I can have different sources and sink
>> for
>>>>>>>>>>>> tests.
>>>>>>>>>>>>> How should I mock out a Kafka source?  For my test, I was
>>>>> planning
>>>>>>>>>>>> on
>>>>>>>>>>>>> changing the input to be from a temp file (instead of Kafka).
>>>>>>>>>>>>> - What's a good way of forcing a watermark using the Table API?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 3:35 PM Dan Hill <[hidden email]
>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Great to know.  I copied this junit5-jupiter-starter-bazel
>>>>>>>>>>>>>> <
>>>>>>>>>>>>
>>>>>
>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel
>>>>>>
>>>>>>>>>>>> rule
>>>>>>>>>>>>>> into my repository (I don't think junit5 is supported directly
>>>>> with
>>>>>>>>>>>>>> java_test yet).  I tried a few ways of bundling
>>>>> `log4j.properties`
>>>>>>>>>>>> into the
>>>>>>>>>>>>>> jar and didn't get them to work.  My current iteration hacks
>> the
>>>>>>>>>>>>>> log4j.properties file as an absolute path.  My failed attempts
>>>>>>>>>>>> would spit
>>>>>>>>>>>>>> an error saying log4j.properties file was not found.  This
>> route
>>>>>>>>>>>> finds it
>>>>>>>>>>>>>> but the log properties are not used for the java logger.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are there a better set of rules to use for junit5?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> # build rule
>>>>>>>>>>>>>> java_junit5_test(
>>>>>>>>>>>>>>         name = "tests",
>>>>>>>>>>>>>>         srcs = glob(["*.java"]),
>>>>>>>>>>>>>>         test_package = "ai.promoted.logprocessor.batch",
>>>>>>>>>>>>>>         deps = [...],
>>>>>>>>>>>>>>         jvm_flags =
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> # log4j.properties
>>>>>>>>>>>>>> status = error
>>>>>>>>>>>>>> name = Log4j2PropertiesConfig
>>>>>>>>>>>>>> appenders = console
>>>>>>>>>>>>>> appender.console.type = Console
>>>>>>>>>>>>>> appender.console.name = LogToConsole
>>>>>>>>>>>>>> appender.console.layout.type = PatternLayout
>>>>>>>>>>>>>> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>>>>>>>>>>>>>> rootLogger.level = info
>>>>>>>>>>>>>> rootLogger.appenderRefs = stdout
>>>>>>>>>>>>>> rootLogger.appenderRef.stdout.ref = LogToConsole
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Oops, this is actually the JOIN issue thread [1]. Guess I
>>>>> should
>>>>>>>>>>>> revise
>>>>>>>>>>>>>>> my previous "haven't had issues" statement hah. Sorry for the
>>>>>>>>>>>> spam!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Unless it's related to this issue[1], which was w/ my JOIN
>> and
>>>>>>>>>>>> time
>>>>>>>>>>>>>>>> characteristics, though not sure that applies for batch.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey Dan,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini
>>>>>>>>>>>> cluster and
>>>>>>>>>>>>>>>>> haven’t had issues, though we’re only testing on streaming
>>>>> jobs.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Happy to help setting up logging with that if you’d like.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill <
>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I don't think any of the gotchas apply to me (at the
>> bottom
>>>>> of
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> link).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I'm assuming for a batch job that I don't have to do
>>>>> anything
>>>>>>>>>>>> for:
>>>>>>>>>>>>>>>>>> "You can implement a custom parallel source function for
>>>>>>>>>>>> emitting
>>>>>>>>>>>>>>>>>> watermarks if your job uses event time timers."
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill <
>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I've tried to enable additional logging for a few hours
>>>>>>>>>>>> today.  I
>>>>>>>>>>>>>>>>>>> think something with junit5 is swallowing the logs.  I'm
>>>>>>>>>>>> using Bazel and
>>>>>>>>>>>>>>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a
>>>>>>>>>>>> custom
>>>>>>>>>>>>>>>>>>> extension.  Are there any known issues with Flink and
>>>>>>>>>>>> junit5?  I can try
>>>>>>>>>>>>>>>>>>> switching to junit4.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When I've binary searched this issue, this failure
>> happens
>>>>> if
>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>> query in step 3 has a join it.  If I remove the join, I
>> can
>>>>>>>>>>>> remove step 4
>>>>>>>>>>>>>>>>>>> and the code still works.  I've renamed a bunch of my
>>>>> tables
>>>>>>>>>>>> too and the
>>>>>>>>>>>>>>>>>>> problem still exists.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek <
>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> there were some bugs and quirks in the MiniCluster that
>> we
>>>>>>>>>>>> recently
>>>>>>>>>>>>>>>>>>>> fixed:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>       -
>> https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>>>>>>>>>>>>>>>       -
>> https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> But I think they are probably unrelated to your case.
>>>>> Could
>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>>>>> logging and see from the logs whether the 2) and 3) jobs
>>>>>>>>>>>> execute
>>>>>>>>>>>>>>>>>>>> correctly on the MiniCluster?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 06.10.20 08:08, Dan Hill wrote:
>>>>>>>>>>>>>>>>>>>>> I'm writing a test for a batch job using
>>>>>>>>>>>>>>>>>>>> MiniClusterResourceConfiguration.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Here's a simple description of my working test case:
>>>>>>>>>>>>>>>>>>>>> 1) I use TableEnvironment.executeSql(...) to create a
>>>>>>>>>>>> source and
>>>>>>>>>>>>>>>>>>>> sink table
>>>>>>>>>>>>>>>>>>>>> using tmp filesystem directory.
>>>>>>>>>>>>>>>>>>>>> 2) I use executeSql to insert some test data into the
>>>>>>>>>>>> source tabel.
>>>>>>>>>>>>>>>>>>>>> 3) I use executeSql to select from source and insert
>> into
>>>>>>>>>>>> sink.
>>>>>>>>>>>>>>>>>>>>> 4) I use executeSql from the same source to a different
>>>>>>>>>>>> sink.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> When I do these steps, it works.  If I remove step 4,
>> no
>>>>>>>>>>>> data gets
>>>>>>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>>>>>>> to the sink.  My actual code is more complex than this
>>>>> (has
>>>>>>>>>>>> create
>>>>>>>>>>>>>>>>>>>> view,
>>>>>>>>>>>>>>>>>>>>> join and more tables).  This is a simplified
>> description
>>>>> but
>>>>>>>>>>>>>>>>>>>> highlights the
>>>>>>>>>>>>>>>>>>>>> weird error.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Has anyone hit issues like this?  I'm assuming I have a
>>>>>>>>>>>> small code
>>>>>>>>>>>>>>>>>>>> bug in
>>>>>>>>>>>>>>>>>>>>> my queries that's causing issues.  These queries appear
>>>>> to
>>>>>>>>>>>> work in
>>>>>>>>>>>>>>>>>>>>> production so I'm confused.  Are there ways of viewing
>>>>>>>>>>>> failed jobs
>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>> queries with MiniClusterResourceConfiguration?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>> - Dan
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>>
>>
>