我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 连接es的时候报错,findAndCreateTableSink failed。 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. Reason: Required context properties mismatch from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch def area_cnts(): s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) s_env.set_parallelism(1) # use blink table planner st_env = StreamTableEnvironment \ .create(s_env, environment_settings=EnvironmentSettings .new_instance() .in_streaming_mode() .use_blink_planner().build()) # register source and sink register_rides_source(st_env) register_cnt_sink(st_env) # query st_env.from_path("source")\ .group_by("taxiId")\ .select("taxiId, count(1) as cnt")\ .insert_into("sink") # execute st_env.execute("6-write_with_elasticsearch") def register_rides_source(st_env): st_env \ .connect( # declare the external system to connect to Kafka() .version("universal") .topic("Rides") .start_from_earliest() .property("zookeeper.connect", "zookeeper:2181") .property("bootstrap.servers", "kafka:9092")) \ .with_format( # declare a format for this system Json() .fail_on_missing_field(True) .schema(DataTypes.ROW([ DataTypes.FIELD("rideId", DataTypes.BIGINT()), DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), DataTypes.FIELD("lon", DataTypes.FLOAT()), DataTypes.FIELD("lat", DataTypes.FLOAT()), DataTypes.FIELD("psgCnt", DataTypes.INT()), DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ .with_schema( # declare the schema of the table Schema() .field("rideId", DataTypes.BIGINT()) .field("taxiId", DataTypes.BIGINT()) .field("isStart", DataTypes.BOOLEAN()) .field("lon", DataTypes.FLOAT()) .field("lat", DataTypes.FLOAT()) .field("psgCnt", DataTypes.INT()) .field("rideTime", DataTypes.TIMESTAMP()) .rowtime( Rowtime() .timestamps_from_field("eventTime") .watermarks_periodic_bounded(60000))) \ .in_append_mode() \ .register_table_source("source") def register_cnt_sink(st_env): st_env.connect( Elasticsearch() .version("6") .host("elasticsearch", 9200, "http") .index("taxiid-cnts") .document_type('taxiidcnt') .key_delimiter("$")) \ .with_schema( Schema() .field("taxiId", DataTypes.BIGINT()) .field("cnt", DataTypes.BIGINT())) \ .with_format( Json() .derive_schema()) \ .in_upsert_mode() \ .register_table_sink("sink") if __name__ == '__main__': area_cnts() |
I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.
|
连接的版本部分我本地已经修改为 5了,发生了下面的报错; >> st_env.connect( >> Elasticsearch() >> .version("5") >> .host("localhost", 9200, "http") >> .index("taxiid-cnts") >> .document_type('taxiidcnt') >> .key_delimiter("$")) \
|
可以发一下完整的异常吗?
|
Hi, 据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。 Best, Jark On Tue, 16 Jun 2020 at 16:08, Dian Fu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |