How can I update data by flink-connector-elasticsearch6_2.11 in join scenario

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How can I update data by flink-connector-elasticsearch6_2.11 in join scenario

淘宝龙安
hi, All

I register two table. 

user_info
--------------------------
user_id  |  varchar | 
--------------------------
name     |  varchar |
-------------------------


user_order
------------------------------
order_id  |    varchar  |
------------------------------
user_id   |     varchar  | 
------------------------------
price       |   varchar    |
------------------------------


then I user flink Table&SQL API to join these table,  

"select user_info.user_id as user_id, name, price, order_id from user_info join user_order on user_order.user_id = user_info.user_id"


finally  I emit join data to elasticsearch cluster.

then I run my flink program and  insert two user to user_info and one order to user_order.


image.png

image.png


In elasticsearch result is 

image.png

my question is  

1.    how can i update the price? when I insert another record into user_order, it not works correctly
update the price from 23.00 to 46.00 (order_id : 111).

image.png

then i got two records. 

image.png

it seems this program do not defined the unique key fields. But i cann’t  find information in flink docement .
In code source , it says 
If the table does not have a key and is append-only, the keys attribute is null. . 

However it not works in join scenario


2.   If the data come from kafka  and syncs from mysql binlog. 
      I submit my flink job on  2019-11-05 : 21:00:00.  Then,how can i join with these users  in mysql but never appeared in  kafka streaming. ( kafka offset from 2019-11-05 : 21:00:00.)
     (kafkaConsumer011.setStartFromGroupOffsets() not setStartFromEarliest ).  




Thanks .

my code
public class TestTwoStreamJoin {

@Test
void testTowStreamJoin() throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
DataStream<String> d1 = fsEnv.socketTextStream("localhost", 9000);
String[] fieldNames = new String[]{"user_id", "name"};
TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING};
DataStream<Row> t1 = getRows(d1, fieldNames, types);
DataStream<String> d2 = fsEnv.socketTextStream("localhost", 9001);
String[] field2 = new String[]{"order_id", "user_id", "price"};
TypeInformation[] types2 = new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING};
DataStream<Row> t2 = getRows(d2, field2, types2);
fsTableEnv.registerTable("user_info", fsTableEnv.fromDataStream(t1));
fsTableEnv.registerTable("user_order", fsTableEnv.fromDataStream(t2));
String joinSql = "select user_info.user_id as user_id, name, price, order_id from user_info join user_order on user_order.user_id = user_info.user_id";
Table t3 = fsTableEnv.sqlQuery(joinSql);
fsTableEnv.toAppendStream(t3, Row.class).print();
String[] outputFields = new String[]{"user_id","name","price","order_id"};
TypeInformation[] outputTypes = new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING,Types.STRING};
fsTableEnv.connect(new Elasticsearch()
.version("6")
.host("127.0.0.1", 9200, "http")
.index("test_index")
.documentType("user")
.failureHandlerIgnore()
.bulkFlushInterval(1000)
.bulkFlushMaxActions(1)
)
.withSchema(new Schema().schema(new TableSchema(outputFields, outputTypes)))
.withFormat(new Json().schema(new RowTypeInfo(outputTypes, outputFields)))
.inUpsertMode()
.registerTableSink("output");
fsTableEnv.sqlUpdate("insert into output " + joinSql);
fsTableEnv.execute("job");
}

public DataStream<Row> getRows(DataStream<String> dataStream, String[] f, TypeInformation[] t) {
DataStream<Row> r1 = dataStream.map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
String[] v = value.split(",");
Row r = new Row(v.length);
for (int i = 0; i<v.length; i++) {
r.setField(i, v[i]);
}
return r;
}
}).returns(new RowTypeInfo(t,f));
return r1;
}
}