 
	
					
		
	
					| below is one line of my source, the body containes the user logs: {     body: [         "user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23 00:01:00;user3,url3,2018-10-23 00:02:00"     ] } I user LATERAL TABLE and a User-Defined TableFunction flatmap the source to a new table log, and I want to group by the time and username, here is my code: public class BodySplitFun extends TableFunction<Tuple3<String, String, Long>> { 	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");         public void eval(Row bodyRow) {             String body = bodyRow.getField(0).toString();             String[] lines = body.split(";");                for (String line : lines) {         		String user = line.split(",")[0];         		String url = line.split(",")[1];         		String sTime = line.split(",")[2];                         collect(new Tuple3<>(user, url, sdf.parse(sTime).getTime());                 }             }        } } tblEnv.registerFunction("bodySplit", new BodySplitFun()); tblEnv.sqlUpdate(            "select                  count(username)             from                (                   SELECT                      username,                      url,                      sTime                    FROM                      mySource LEFT JOIN LATERAL TABLE(bodySplit(body)) as T(username, url, sTime) ON TRUE                )                log             group by                 TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username"); when I run my program, I got these error message: Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)         at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)         at java.lang.reflect.Constructor.newInstance(Constructor.java:422)         at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)         at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)         ... 49 more how can I group by the sTime in the table log? | 
 
	
					
		
	
					| Hi, The problem is that sTime is not a Time Attribute[1], which has to be aligned with watermarks mechanism. Right now you cannot create a time attribute from within TableFunction, as far as I know. What you could do is to do the splitting logic in DataStream API and register a proper table with implemented watermarks in TableEnvironment. Then you can apply the windowing on a table prepared that way. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#time-attributes 
 On 23/10/2018 06:45, maidangdang44
      wrote: 
 | 
 
	
					
		
	
					| On 10/23/2018 15:20,[hidden email] wrote:  
 | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
