Flink SQL

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL

Steve Bistline
Hi,

I have a silly question about Flink SQL that I cannot seem to find a clear answer to. If I have the following code. Will the "result" from the sql SELECT statement only return and the data then be written to S3 if and only if the statement returns data that matches the criteria?

Does "nothing" happen otherwise ( ie no match to the sql statement.)?

tableEnv.registerDataStream("SENSORS",dataset,"t_deviceID, t_timeStamp, t_sKey, t_sValue");


// TEMEPERATURE
Table result = tableEnv.sql("SELECT 'AlertTEMEPERATURE ',t_sKey, t_deviceID, t_sValue FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue > " + TEMPERATURE_THRESHOLD);
tableEnv.toAppendStream(result, Row.class).print();
// Write to S3 bucket
DataStream<Row> dsRow = tableEnv.toAppendStream(result, Row.class);
String fileNameTemp = sdf.format(new Date());
dsRow.writeAsText("s3://csv-ai/flink-alerts/"+fileNameTemp+"TEMPERATURE.txt");
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL

Dominik Wosiński
Hey,

Not exactly sure by what you mean by "nothing" but generally the concept is. The data is fed to the dynamic table and the result of the query creates another dynamic table. So, if the resulting query returns an empty table, no data will indeed be written to the S3. Not sure if this was what You are asking about.

Best Regards,
Dom.



pt., 30 lis 2018 o 08:24 Steve Bistline <[hidden email]> napisał(a):
Hi,

I have a silly question about Flink SQL that I cannot seem to find a clear answer to. If I have the following code. Will the "result" from the sql SELECT statement only return and the data then be written to S3 if and only if the statement returns data that matches the criteria?

Does "nothing" happen otherwise ( ie no match to the sql statement.)?

tableEnv.registerDataStream("SENSORS",dataset,"t_deviceID, t_timeStamp, t_sKey, t_sValue");


// TEMEPERATURE
Table result = tableEnv.sql("SELECT 'AlertTEMEPERATURE ',t_sKey, t_deviceID, t_sValue FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue > " + TEMPERATURE_THRESHOLD);
tableEnv.toAppendStream(result, Row.class).print();
// Write to S3 bucket
DataStream<Row> dsRow = tableEnv.toAppendStream(result, Row.class);
String fileNameTemp = sdf.format(new Date());
dsRow.writeAsText("s3://csv-ai/flink-alerts/"+fileNameTemp+"TEMPERATURE.txt");