pyflink连接elasticsearch5.4问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink连接elasticsearch5.4问题

刘亚坤
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink   failed。 
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。

Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
Reason: Required context properties mismatch



from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch


def area_cnts():
    s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)

    # use blink table planner
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

    # register source and sink
    register_rides_source(st_env)
    register_cnt_sink(st_env)

    # query
    st_env.from_path("source")\
        .group_by("taxiId")\
        .select("taxiId, count(1) as cnt")\
        .insert_into("sink")

    # execute
    st_env.execute("6-write_with_elasticsearch")


def register_rides_source(st_env):
    st_env \
        .connect(  # declare the external system to connect to
        Kafka()
            .version("universal")
            .topic("Rides")
            .start_from_earliest()
            .property("zookeeper.connect", "zookeeper:2181")
            .property("bootstrap.servers", "kafka:9092")) \
        .with_format(  # declare a format for this system
        Json()
            .fail_on_missing_field(True)
            .schema(DataTypes.ROW([
            DataTypes.FIELD("rideId", DataTypes.BIGINT()),
            DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
            DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
            DataTypes.FIELD("lon", DataTypes.FLOAT()),
            DataTypes.FIELD("lat", DataTypes.FLOAT()),
            DataTypes.FIELD("psgCnt", DataTypes.INT()),
            DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
        .with_schema(  # declare the schema of the table
        Schema()
            .field("rideId", DataTypes.BIGINT())
            .field("taxiId", DataTypes.BIGINT())
            .field("isStart", DataTypes.BOOLEAN())
            .field("lon", DataTypes.FLOAT())
            .field("lat", DataTypes.FLOAT())
            .field("psgCnt", DataTypes.INT())
            .field("rideTime", DataTypes.TIMESTAMP())
            .rowtime(
            Rowtime()
                .timestamps_from_field("eventTime")
                .watermarks_periodic_bounded(60000))) \
        .in_append_mode() \
        .register_table_source("source")


def register_cnt_sink(st_env):
    st_env.connect(
        Elasticsearch()
            .version("6")
            .host("elasticsearch", 9200, "http")
            .index("taxiid-cnts")
            .document_type('taxiidcnt')
            .key_delimiter("$")) \
        .with_schema(
            Schema()
                .field("taxiId", DataTypes.BIGINT())
                .field("cnt", DataTypes.BIGINT())) \
        .with_format(
           Json()
               .derive_schema()) \
        .in_upsert_mode() \
        .register_table_sink("sink")


if __name__ == '__main__':
    area_cnts()

Reply | Threaded
Open this post in threaded view
|

Re: pyflink连接elasticsearch5.4问题

Dian Fu
I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`.

在 2020年6月16日,下午1:47,jack <[hidden email]> 写道:

我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink   failed。 
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。

Caused by Could not find a suitable  factory for   ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath.
Reason: Required context properties mismatch



from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch


def area_cnts():
    s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)

    # use blink table planner
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

    # register source and sink
    register_rides_source(st_env)
    register_cnt_sink(st_env)

    # query
    st_env.from_path("source")\
        .group_by("taxiId")\
        .select("taxiId, count(1) as cnt")\
        .insert_into("sink")

    # execute
    st_env.execute("6-write_with_elasticsearch")


def register_rides_source(st_env):
    st_env \
        .connect(  # declare the external system to connect to
        Kafka()
            .version("universal")
            .topic("Rides")
            .start_from_earliest()
            .property("zookeeper.connect", "zookeeper:2181")
            .property("bootstrap.servers", "kafka:9092")) \
        .with_format(  # declare a format for this system
        Json()
            .fail_on_missing_field(True)
            .schema(DataTypes.ROW([
            DataTypes.FIELD("rideId", DataTypes.BIGINT()),
            DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
            DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()),
            DataTypes.FIELD("lon", DataTypes.FLOAT()),
            DataTypes.FIELD("lat", DataTypes.FLOAT()),
            DataTypes.FIELD("psgCnt", DataTypes.INT()),
            DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \
        .with_schema(  # declare the schema of the table
        Schema()
            .field("rideId", DataTypes.BIGINT())
            .field("taxiId", DataTypes.BIGINT())
            .field("isStart", DataTypes.BOOLEAN())
            .field("lon", DataTypes.FLOAT())
            .field("lat", DataTypes.FLOAT())
            .field("psgCnt", DataTypes.INT())
            .field("rideTime", DataTypes.TIMESTAMP())
            .rowtime(
            Rowtime()
                .timestamps_from_field("eventTime")
                .watermarks_periodic_bounded(60000))) \
        .in_append_mode() \
        .register_table_source("source")


def register_cnt_sink(st_env):
    st_env.connect(
        Elasticsearch()
            .version("6")
            .host("elasticsearch", 9200, "http")
            .index("taxiid-cnts")
            .document_type('taxiidcnt')
            .key_delimiter("$")) \
        .with_schema(
            Schema()
                .field("taxiId", DataTypes.BIGINT())
                .field("cnt", DataTypes.BIGINT())) \
        .with_format(
           Json()
               .derive_schema()) \
        .in_upsert_mode() \
        .register_table_sink("sink")


if __name__ == '__main__':
    area_cnts()


Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink连接elasticsearch5.4问题

刘亚坤
连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>>     st_env.connect(
>>         Elasticsearch()
>>             .version("5")
>>             .host("localhost", 9200, "http")
>>             .index("taxiid-cnts")
>>             .document_type('taxiidcnt')
>>             .key_delimiter("$")) \






在 2020-06-16 15:38:28,"Dian Fu" <[hidden email]> 写道: >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`. > >> 在 2020年6月16日,下午1:47,jack <[hidden email]> 写道: >> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 >> 连接es的时候报错,findAndCreateTableSink failed。 >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 >> >> Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. >> Reason: Required context properties mismatch >> >> >> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch >> >> >> def area_cnts(): >> s_env = StreamExecutionEnvironment.get_execution_environment() >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> s_env.set_parallelism(1) >> >> # use blink table planner >> st_env = StreamTableEnvironment \ >> .create(s_env, environment_settings=EnvironmentSettings >> .new_instance() >> .in_streaming_mode() >> .use_blink_planner().build()) >> >> # register source and sink >> register_rides_source(st_env) >> register_cnt_sink(st_env) >> >> # query >> st_env.from_path("source")\ >> .group_by("taxiId")\ >> .select("taxiId, count(1) as cnt")\ >> .insert_into("sink") >> >> # execute >> st_env.execute("6-write_with_elasticsearch") >> >> >> def register_rides_source(st_env): >> st_env \ >> .connect( # declare the external system to connect to >> Kafka() >> .version("universal") >> .topic("Rides") >> .start_from_earliest() >> .property("zookeeper.connect", "zookeeper:2181") >> .property("bootstrap.servers", "kafka:9092")) \ >> .with_format( # declare a format for this system >> Json() >> .fail_on_missing_field(True) >> .schema(DataTypes.ROW([ >> DataTypes.FIELD("rideId", DataTypes.BIGINT()), >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), >> DataTypes.FIELD("lon", DataTypes.FLOAT()), >> DataTypes.FIELD("lat", DataTypes.FLOAT()), >> DataTypes.FIELD("psgCnt", DataTypes.INT()), >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ >> .with_schema( # declare the schema of the table >> Schema() >> .field("rideId", DataTypes.BIGINT()) >> .field("taxiId", DataTypes.BIGINT()) >> .field("isStart", DataTypes.BOOLEAN()) >> .field("lon", DataTypes.FLOAT()) >> .field("lat", DataTypes.FLOAT()) >> .field("psgCnt", DataTypes.INT()) >> .field("rideTime", DataTypes.TIMESTAMP()) >> .rowtime( >> Rowtime() >> .timestamps_from_field("eventTime") >> .watermarks_periodic_bounded(60000))) \ >> .in_append_mode() \ >> .register_table_source("source") >> >> >> def register_cnt_sink(st_env): >> st_env.connect( >> Elasticsearch() >> .version("6") >> .host("elasticsearch", 9200, "http") >> .index("taxiid-cnts") >> .document_type('taxiidcnt') >> .key_delimiter("$")) \ >> .with_schema( >> Schema() >> .field("taxiId", DataTypes.BIGINT()) >> .field("cnt", DataTypes.BIGINT())) \ >> .with_format( >> Json() >> .derive_schema()) \ >> .in_upsert_mode() \ >> .register_table_sink("sink") >> >> >> if __name__ == '__main__': >> area_cnts() >> >
Reply | Threaded
Open this post in threaded view
|

Re: pyflink连接elasticsearch5.4问题

Dian Fu
可以发一下完整的异常吗?

在 2020年6月16日,下午3:45,jack <[hidden email]> 写道:

连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>>     st_env.connect(
>>         Elasticsearch()
>>             .version("5")
>>             .host("localhost", 9200, "http")
>>             .index("taxiid-cnts")
>>             .document_type('taxiidcnt')
>>             .key_delimiter("$")) \





在 2020-06-16 15:38:28,"Dian Fu" <[hidden email]> 写道: >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`. > >> 在 2020年6月16日,下午1:47,jack <[hidden email]> 写道: >> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 >> 连接es的时候报错,findAndCreateTableSink failed。 >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 >> >> Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. >> Reason: Required context properties mismatch >> >> >> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch >> >> >> def area_cnts(): >> s_env = StreamExecutionEnvironment.get_execution_environment() >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> s_env.set_parallelism(1) >> >> # use blink table planner >> st_env = StreamTableEnvironment \ >> .create(s_env, environment_settings=EnvironmentSettings >> .new_instance() >> .in_streaming_mode() >> .use_blink_planner().build()) >> >> # register source and sink >> register_rides_source(st_env) >> register_cnt_sink(st_env) >> >> # query >> st_env.from_path("source")\ >> .group_by("taxiId")\ >> .select("taxiId, count(1) as cnt")\ >> .insert_into("sink") >> >> # execute >> st_env.execute("6-write_with_elasticsearch") >> >> >> def register_rides_source(st_env): >> st_env \ >> .connect( # declare the external system to connect to >> Kafka() >> .version("universal") >> .topic("Rides") >> .start_from_earliest() >> .property("zookeeper.connect", "zookeeper:2181") >> .property("bootstrap.servers", "kafka:9092")) \ >> .with_format( # declare a format for this system >> Json() >> .fail_on_missing_field(True) >> .schema(DataTypes.ROW([ >> DataTypes.FIELD("rideId", DataTypes.BIGINT()), >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), >> DataTypes.FIELD("lon", DataTypes.FLOAT()), >> DataTypes.FIELD("lat", DataTypes.FLOAT()), >> DataTypes.FIELD("psgCnt", DataTypes.INT()), >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ >> .with_schema( # declare the schema of the table >> Schema() >> .field("rideId", DataTypes.BIGINT()) >> .field("taxiId", DataTypes.BIGINT()) >> .field("isStart", DataTypes.BOOLEAN()) >> .field("lon", DataTypes.FLOAT()) >> .field("lat", DataTypes.FLOAT()) >> .field("psgCnt", DataTypes.INT()) >> .field("rideTime", DataTypes.TIMESTAMP()) >> .rowtime( >> Rowtime() >> .timestamps_from_field("eventTime") >> .watermarks_periodic_bounded(60000))) \ >> .in_append_mode() \ >> .register_table_source("source") >> >> >> def register_cnt_sink(st_env): >> st_env.connect( >> Elasticsearch() >> .version("6") >> .host("elasticsearch", 9200, "http") >> .index("taxiid-cnts") >> .document_type('taxiidcnt') >> .key_delimiter("$")) \ >> .with_schema( >> Schema() >> .field("taxiId", DataTypes.BIGINT()) >> .field("cnt", DataTypes.BIGINT())) \ >> .with_format( >> Json() >> .derive_schema()) \ >> .in_upsert_mode() \ >> .register_table_sink("sink") >> >> >> if __name__ == '__main__': >> area_cnts() >> >

Reply | Threaded
Open this post in threaded view
|

Re: pyflink连接elasticsearch5.4问题

Jark Wu-3
Hi,

据我所知,Flink 1.10 官方没有支持Elasticsearch 5.x 版本的 sql connector。

Best,
Jark

On Tue, 16 Jun 2020 at 16:08, Dian Fu <[hidden email]> wrote:
可以发一下完整的异常吗?

在 2020年6月16日,下午3:45,jack <[hidden email]> 写道:

连接的版本部分我本地已经修改为 5了,发生了下面的报错;
>>     st_env.connect(
>>         Elasticsearch()
>>             .version("5")
>>             .host("localhost", 9200, "http")
>>             .index("taxiid-cnts")
>>             .document_type('taxiidcnt')
>>             .key_delimiter("$")) \





在 2020-06-16 15:38:28,"Dian Fu" <[hidden email]> 写道: >I guess it's because the ES version specified in the job is `6`, however, the jar used is `5`. > >> 在 2020年6月16日,下午1:47,jack <[hidden email]> 写道: >> >> 我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 >> 连接es的时候报错,findAndCreateTableSink failed。 >> 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 >> >> Caused by Could not find a suitable factory for ‘org.apache.flink.table.factories.TableSinkFactory’ in the classpath. >> Reason: Required context properties mismatch >> >> >> >> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings >> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, Elasticsearch >> >> >> def area_cnts(): >> s_env = StreamExecutionEnvironment.get_execution_environment() >> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> s_env.set_parallelism(1) >> >> # use blink table planner >> st_env = StreamTableEnvironment \ >> .create(s_env, environment_settings=EnvironmentSettings >> .new_instance() >> .in_streaming_mode() >> .use_blink_planner().build()) >> >> # register source and sink >> register_rides_source(st_env) >> register_cnt_sink(st_env) >> >> # query >> st_env.from_path("source")\ >> .group_by("taxiId")\ >> .select("taxiId, count(1) as cnt")\ >> .insert_into("sink") >> >> # execute >> st_env.execute("6-write_with_elasticsearch") >> >> >> def register_rides_source(st_env): >> st_env \ >> .connect( # declare the external system to connect to >> Kafka() >> .version("universal") >> .topic("Rides") >> .start_from_earliest() >> .property("zookeeper.connect", "zookeeper:2181") >> .property("bootstrap.servers", "kafka:9092")) \ >> .with_format( # declare a format for this system >> Json() >> .fail_on_missing_field(True) >> .schema(DataTypes.ROW([ >> DataTypes.FIELD("rideId", DataTypes.BIGINT()), >> DataTypes.FIELD("isStart", DataTypes.BOOLEAN()), >> DataTypes.FIELD("eventTime", DataTypes.TIMESTAMP()), >> DataTypes.FIELD("lon", DataTypes.FLOAT()), >> DataTypes.FIELD("lat", DataTypes.FLOAT()), >> DataTypes.FIELD("psgCnt", DataTypes.INT()), >> DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) \ >> .with_schema( # declare the schema of the table >> Schema() >> .field("rideId", DataTypes.BIGINT()) >> .field("taxiId", DataTypes.BIGINT()) >> .field("isStart", DataTypes.BOOLEAN()) >> .field("lon", DataTypes.FLOAT()) >> .field("lat", DataTypes.FLOAT()) >> .field("psgCnt", DataTypes.INT()) >> .field("rideTime", DataTypes.TIMESTAMP()) >> .rowtime( >> Rowtime() >> .timestamps_from_field("eventTime") >> .watermarks_periodic_bounded(60000))) \ >> .in_append_mode() \ >> .register_table_source("source") >> >> >> def register_cnt_sink(st_env): >> st_env.connect( >> Elasticsearch() >> .version("6") >> .host("elasticsearch", 9200, "http") >> .index("taxiid-cnts") >> .document_type('taxiidcnt') >> .key_delimiter("$")) \ >> .with_schema( >> Schema() >> .field("taxiId", DataTypes.BIGINT()) >> .field("cnt", DataTypes.BIGINT())) \ >> .with_format( >> Json() >> .derive_schema()) \ >> .in_upsert_mode() \ >> .register_table_sink("sink") >> >> >> if __name__ == '__main__': >> area_cnts() >> >