hi, All
I register two table.
user_info
--------------------------
user_id | varchar |
--------------------------
name | varchar |
-------------------------
user_order
------------------------------
order_id | varchar |
------------------------------
user_id | varchar |
------------------------------
price | varchar |
------------------------------
then I user flink Table&SQL API to join these table,
"select user_info.user_id as user_id, name, price, order_id from user_info join user_order on user_order.user_id = user_info.user_id"
finally I emit join data to elasticsearch cluster.
then I run my flink program and insert two user to user_info and one order to user_order.
In elasticsearch result is
my question is
1. how can i update the price? when I insert another record into user_order, it not works correctly
update the price from 23.00 to 46.00 (order_id : 111).
then i got two records.
it seems this program do not defined the unique key fields. But i cann’t find information in flink docement .
In code source , it says
If the table does not have a key and is append-only, the keys attribute is null. .
However it not works in join scenario
2. If the data come from kafka and syncs from mysql binlog.
I submit my flink job on 2019-11-05 : 21:00:00. Then,how can i join with these users in mysql but never appeared in kafka streaming. ( kafka offset from 2019-11-05 : 21:00:00.)
(kafkaConsumer011.setStartFromGroupOffsets() not setStartFromEarliest ).
Thanks .
my code
public class TestTwoStreamJoin {
@Test
void testTowStreamJoin() throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
DataStream<String> d1 = fsEnv.socketTextStream("localhost", 9000);
String[] fieldNames = new String[]{"user_id", "name"};
TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING};
DataStream<Row> t1 = getRows(d1, fieldNames, types);
DataStream<String> d2 = fsEnv.socketTextStream("localhost", 9001);
String[] field2 = new String[]{"order_id", "user_id", "price"};
TypeInformation[] types2 = new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING};
DataStream<Row> t2 = getRows(d2, field2, types2);
fsTableEnv.registerTable("user_info", fsTableEnv.fromDataStream(t1));
fsTableEnv.registerTable("user_order", fsTableEnv.fromDataStream(t2));
String joinSql = "select user_info.user_id as user_id, name, price, order_id from user_info join user_order on user_order.user_id = user_info.user_id";
Table t3 = fsTableEnv.sqlQuery(joinSql);
fsTableEnv.toAppendStream(t3, Row.class).print();
String[] outputFields = new String[]{"user_id","name","price","order_id"};
TypeInformation[] outputTypes = new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING,Types.STRING};
fsTableEnv.connect(new Elasticsearch()
.version("6")
.host("127.0.0.1", 9200, "http")
.index("test_index")
.documentType("user")
.failureHandlerIgnore()
.bulkFlushInterval(1000)
.bulkFlushMaxActions(1)
)
.withSchema(new Schema().schema(new TableSchema(outputFields, outputTypes)))
.withFormat(new Json().schema(new RowTypeInfo(outputTypes, outputFields)))
.inUpsertMode()
.registerTableSink("output");
fsTableEnv.sqlUpdate("insert into output " + joinSql);
fsTableEnv.execute("job");
}
public DataStream<Row> getRows(DataStream<String> dataStream, String[] f, TypeInformation[] t) {
DataStream<Row> r1 = dataStream.map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
String[] v = value.split(",");
Row r = new Row(v.length);
for (int i = 0; i<v.length; i++) {
r.setField(i, v[i]);
}
return r;
}
}).returns(new RowTypeInfo(t,f));
return r1;
}
}