some basic questions

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

some basic questions

kant kodali
Hi All,

1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic1"),
new SimpleStringSchema(),
properties);
FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic2"),
new SimpleStringSchema(),
properties);

DataStream<String> stream1 = env.addSource(consumer1);
DataStream<String> stream2 = env.addSource(consumer2);

bsTableEnv.registerDataStream("sample1", stream1);
bsTableEnv.registerDataStream("sample2", stream2);

Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
result.printSchema();

bsTableEnv.toAppendStream(result, Row.class).print();
bsTableEnv.execute("sample job");
}
}

2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 

3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?

Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: some basic questions

godfrey he
hi kant,

> 1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.
if you have converted Table to DataStream, please execute it with StreamExecutionEnvironment ( call env.execute("simple job") )

> 2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 
for streaming job, both Environment can be used. the difference is: 
  TableEnvironment will optimize multiple queries into one DAG when executing, while StreamTableEnvironment will independent optimize each query. 
  StreamTableEnvironment supports convert from/to DataStream, while TableEnvironment does not support it.
  StreamTableEnvironment supports register TableFunction and AggregateFunction, while TableEnvironment does not support it now.

for batch job, only TableEnvironment is the only choice, because DataStream does not support batch job now.

> 3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?
1.9 is blink planner's first version, and it is unstable. In 1.10, blink planner is more statable, we are switching the blink planner to the default step by step [0].


kant kodali <[hidden email]> 于2020年1月18日周六 下午5:40写道:
Hi All,

1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic1"),
new SimpleStringSchema(),
properties);
FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic2"),
new SimpleStringSchema(),
properties);

DataStream<String> stream1 = env.addSource(consumer1);
DataStream<String> stream2 = env.addSource(consumer2);

bsTableEnv.registerDataStream("sample1", stream1);
bsTableEnv.registerDataStream("sample2", stream2);

Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
result.printSchema();

bsTableEnv.toAppendStream(result, Row.class).print();
bsTableEnv.execute("sample job");
}
}

2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 

3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?

Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: some basic questions

kant kodali
Hi Godfrey,

Thanks a lot for your response. I just tried it with env.execute("simple job") but I still get the same error message.

Kant

On Sat, Jan 18, 2020 at 6:26 PM godfrey he <[hidden email]> wrote:
hi kant,

> 1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.
if you have converted Table to DataStream, please execute it with StreamExecutionEnvironment ( call env.execute("simple job") )

> 2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 
for streaming job, both Environment can be used. the difference is: 
  TableEnvironment will optimize multiple queries into one DAG when executing, while StreamTableEnvironment will independent optimize each query. 
  StreamTableEnvironment supports convert from/to DataStream, while TableEnvironment does not support it.
  StreamTableEnvironment supports register TableFunction and AggregateFunction, while TableEnvironment does not support it now.

for batch job, only TableEnvironment is the only choice, because DataStream does not support batch job now.

> 3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?
1.9 is blink planner's first version, and it is unstable. In 1.10, blink planner is more statable, we are switching the blink planner to the default step by step [0].


kant kodali <[hidden email]> 于2020年1月18日周六 下午5:40写道:
Hi All,

1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic1"),
new SimpleStringSchema(),
properties);
FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic2"),
new SimpleStringSchema(),
properties);

DataStream<String> stream1 = env.addSource(consumer1);
DataStream<String> stream2 = env.addSource(consumer2);

bsTableEnv.registerDataStream("sample1", stream1);
bsTableEnv.registerDataStream("sample2", stream2);

Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
result.printSchema();

bsTableEnv.toAppendStream(result, Row.class).print();
bsTableEnv.execute("sample job");
}
}

2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 

3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?

Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: some basic questions

kant kodali
Hi Godfrey,

I was just clicking the run button on my IDE and it doesn't really show me errors so I used command line fink run <jar> and that shows me what the error is. It tells me I need to change to toRetractStream() and both StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to work fine although I am not sure which one is the correct usage.

Thanks!

On Sat, Jan 18, 2020 at 6:52 PM kant kodali <[hidden email]> wrote:
Hi Godfrey,

Thanks a lot for your response. I just tried it with env.execute("simple job") but I still get the same error message.

Kant

On Sat, Jan 18, 2020 at 6:26 PM godfrey he <[hidden email]> wrote:
hi kant,

> 1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.
if you have converted Table to DataStream, please execute it with StreamExecutionEnvironment ( call env.execute("simple job") )

> 2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 
for streaming job, both Environment can be used. the difference is: 
  TableEnvironment will optimize multiple queries into one DAG when executing, while StreamTableEnvironment will independent optimize each query. 
  StreamTableEnvironment supports convert from/to DataStream, while TableEnvironment does not support it.
  StreamTableEnvironment supports register TableFunction and AggregateFunction, while TableEnvironment does not support it now.

for batch job, only TableEnvironment is the only choice, because DataStream does not support batch job now.

> 3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?
1.9 is blink planner's first version, and it is unstable. In 1.10, blink planner is more statable, we are switching the blink planner to the default step by step [0].


kant kodali <[hidden email]> 于2020年1月18日周六 下午5:40写道:
Hi All,

1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic1"),
new SimpleStringSchema(),
properties);
FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic2"),
new SimpleStringSchema(),
properties);

DataStream<String> stream1 = env.addSource(consumer1);
DataStream<String> stream2 = env.addSource(consumer2);

bsTableEnv.registerDataStream("sample1", stream1);
bsTableEnv.registerDataStream("sample2", stream2);

Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
result.printSchema();

bsTableEnv.toAppendStream(result, Row.class).print();
bsTableEnv.execute("sample job");
}
}

2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 

3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?

Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: some basic questions

godfrey he
hi kant, 
"FULL OUTER JOIN" job will generate retract message, so toRetractStream is required to guarantee the correctness.
I think it's better to use StreamExecutionEnvrionment.execute, because you have converted the Table to DataStream. 

kant kodali <[hidden email]> 于2020年1月19日周日 上午11:59写道:
Hi Godfrey,

I was just clicking the run button on my IDE and it doesn't really show me errors so I used command line fink run <jar> and that shows me what the error is. It tells me I need to change to toRetractStream() and both StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to work fine although I am not sure which one is the correct usage.

Thanks!

On Sat, Jan 18, 2020 at 6:52 PM kant kodali <[hidden email]> wrote:
Hi Godfrey,

Thanks a lot for your response. I just tried it with env.execute("simple job") but I still get the same error message.

Kant

On Sat, Jan 18, 2020 at 6:26 PM godfrey he <[hidden email]> wrote:
hi kant,

> 1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.
if you have converted Table to DataStream, please execute it with StreamExecutionEnvironment ( call env.execute("simple job") )

> 2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 
for streaming job, both Environment can be used. the difference is: 
  TableEnvironment will optimize multiple queries into one DAG when executing, while StreamTableEnvironment will independent optimize each query. 
  StreamTableEnvironment supports convert from/to DataStream, while TableEnvironment does not support it.
  StreamTableEnvironment supports register TableFunction and AggregateFunction, while TableEnvironment does not support it now.

for batch job, only TableEnvironment is the only choice, because DataStream does not support batch job now.

> 3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?
1.9 is blink planner's first version, and it is unstable. In 1.10, blink planner is more statable, we are switching the blink planner to the default step by step [0].


kant kodali <[hidden email]> 于2020年1月18日周六 下午5:40写道:
Hi All,

1) The Documentation says full outer join is supported however the below code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic1"),
new SimpleStringSchema(),
properties);
FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic2"),
new SimpleStringSchema(),
properties);

DataStream<String> stream1 = env.addSource(consumer1);
DataStream<String> stream2 = env.addSource(consumer2);

bsTableEnv.registerDataStream("sample1", stream1);
bsTableEnv.registerDataStream("sample2", stream2);

Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");
result.printSchema();

bsTableEnv.toAppendStream(result, Row.class).print();
bsTableEnv.execute("sample job");
}
}

2) If I am using a blink planner should I use TableEnvironment or StreamTableEnvironment ? 

3) Why flink current stable documentation(1.9) recommends (old planner)? any rough timeline on when we would be able to use blink planner in production? perhaps 1.10 or 1.11?

Thanks!