Convert BIGINT to TIMESTAMP in pyflink when using datastream api

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Convert BIGINT to TIMESTAMP in pyflink when using datastream api

Shilpa Shankar
Hello,

We are using pyflink's datastream api v1.12.1 to consume from kafka and want to use one of the fields to act as the "rowtime" for windowing.
We realize we need to convert BIGINT to TIMESTAMP before we use it as "rowtime". 

py4j.protocol.Py4JJavaError: An error occurred while calling o91.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

But we are not sure where and how that needs to be implemented. 
Some help here would be really appreciated. 

Thanks,
Shilpa

import os
from pyflink.table.expressions import lit, Expression
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream import CheckpointingMode, ExternalizedCheckpointCleanup
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, CsvTableSink, TableConfig
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
from pyflink.table.window import Slide

def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE)
    config = env.get_checkpoint_config()
    config.enable_externalized_checkpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)

    st_env = StreamTableEnvironment.create(
        env,
        environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    )

    register_kafka_source(st_env)
    register_transactions_sink_into_csv(st_env)

    #Filter
    st_env.from_path("source") \
        .window(Slide.over(lit(2).minutes).every(lit(1).minutes).on("rowtime").alias("w")) \
        .group_by("customer_id, w") \
        .select("""customer_id as customer_id,
                 count(*) as total_counts,
                 w.start as start_time,
                 w.end as end_time
                 """) \
        .insert_into("sink_into_csv")

def register_kafka_source(st_env):
    # Add Source
    st_env.connect(
        Kafka() \
            .version("universal") \
            .topic("topic1") \
            .property("group.id", "topic_consumer") \
            .property("security.protocol", "SASL_PLAINTEXT") \
            .property("sasl.mechanism", "PLAIN") \
            .property("bootstrap.servers", "<bootsptrap_servers>") \
            .property("sasl.jaas.config", "<user,password>") \
            .start_from_earliest()
    ).with_format(
        Json()
            .fail_on_missing_field(False)
            .schema(
            DataTypes.ROW([
                DataTypes.FIELD("customer_id", DataTypes.STRING()),
                DataTypes.FIELD("time_in_epoch_milliseconds", DataTypes.BIGINT())
            ])
        )
    ).with_schema(
        Schema()
            .field("customer_id", DataTypes.STRING())
            .field("rowtime", DataTypes.BIGINT())
            .rowtime(
            Rowtime()
                .timestamps_from_field("time_in_epoch_milliseconds")
                .watermarks_periodic_bounded(10)
        )
    ).in_append_mode(
    ).create_temporary_table(
        "source"
    )


def register_transactions_sink_into_csv(env):
    result_file = "/opt/examples/data/output/output_file.csv"
    if os.path.exists(result_file):
        os.remove(result_file)
    env.register_table_sink("sink_into_csv",
                            CsvTableSink(["customer_id",
                                          "total_count",
                                          "start_time",
                                          "end_time"],
                                         [DataTypes.STRING(),
                                          DataTypes.DOUBLE(),
                                          DataTypes.TIMESTAMP(3),
                                          DataTypes.TIMESTAMP(3)],
                                         result_file))

if __name__ == "__main__":
    main()

Reply | Threaded
Open this post in threaded view
|

Re: Convert BIGINT to TIMESTAMP in pyflink when using datastream api

Shuiqiang Chen
Hi Shilpa,

There might be something wrong when defining the rowtime field with the Connector descriptor, it’s recommended to use SQL DDL to create tables, and do queries with table API.

Best,
Shuiqiang

Shilpa Shankar <[hidden email]> 于2021年3月4日周四 下午9:29写道:
Hello,

We are using pyflink's datastream api v1.12.1 to consume from kafka and want to use one of the fields to act as the "rowtime" for windowing.
We realize we need to convert BIGINT to TIMESTAMP before we use it as "rowtime". 

py4j.protocol.Py4JJavaError: An error occurred while calling o91.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

But we are not sure where and how that needs to be implemented. 
Some help here would be really appreciated. 

Thanks,
Shilpa

import os
from pyflink.table.expressions import lit, Expression
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream import CheckpointingMode, ExternalizedCheckpointCleanup
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings, CsvTableSink, TableConfig
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
from pyflink.table.window import Slide

def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE)
    config = env.get_checkpoint_config()
    config.enable_externalized_checkpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)

    st_env = StreamTableEnvironment.create(
        env,
        environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    )

    register_kafka_source(st_env)
    register_transactions_sink_into_csv(st_env)

    #Filter
    st_env.from_path("source") \
        .window(Slide.over(lit(2).minutes).every(lit(1).minutes).on("rowtime").alias("w")) \
        .group_by("customer_id, w") \
        .select("""customer_id as customer_id,
                 count(*) as total_counts,
                 w.start as start_time,
                 w.end as end_time
                 """) \
        .insert_into("sink_into_csv")

def register_kafka_source(st_env):
    # Add Source
    st_env.connect(
        Kafka() \
            .version("universal") \
            .topic("topic1") \
            .property("group.id", "topic_consumer") \
            .property("security.protocol", "SASL_PLAINTEXT") \
            .property("sasl.mechanism", "PLAIN") \
            .property("bootstrap.servers", "<bootsptrap_servers>") \
            .property("sasl.jaas.config", "<user,password>") \
            .start_from_earliest()
    ).with_format(
        Json()
            .fail_on_missing_field(False)
            .schema(
            DataTypes.ROW([
                DataTypes.FIELD("customer_id", DataTypes.STRING()),
                DataTypes.FIELD("time_in_epoch_milliseconds", DataTypes.BIGINT())
            ])
        )
    ).with_schema(
        Schema()
            .field("customer_id", DataTypes.STRING())
            .field("rowtime", DataTypes.BIGINT())
            .rowtime(
            Rowtime()
                .timestamps_from_field("time_in_epoch_milliseconds")
                .watermarks_periodic_bounded(10)
        )
    ).in_append_mode(
    ).create_temporary_table(
        "source"
    )


def register_transactions_sink_into_csv(env):
    result_file = "/opt/examples/data/output/output_file.csv"
    if os.path.exists(result_file):
        os.remove(result_file)
    env.register_table_sink("sink_into_csv",
                            CsvTableSink(["customer_id",
                                          "total_count",
                                          "start_time",
                                          "end_time"],
                                         [DataTypes.STRING(),
                                          DataTypes.DOUBLE(),
                                          DataTypes.TIMESTAMP(3),
                                          DataTypes.TIMESTAMP(3)],
                                         result_file))

if __name__ == "__main__":
    main()

Reply | Threaded
Open this post in threaded view
|

Re: Convert BIGINT to TIMESTAMP in pyflink when using datastream api

Timo Walther
Hi Shilpa,

Shuiqiang is right. Currently, we recommend to use SQL DDL until the
connect API is updated. See here:

https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/create/#create-table

Especially the WATERMARK section shows how to declare a rowtime attribute.

Regards,
Timo


On 05.03.21 08:47, Shuiqiang Chen wrote:

> Hi Shilpa,
>
> There might be something wrong when defining the rowtime field with the
> Connector descriptor, it’s recommended to use SQL DDL to create tables,
> and do queries with table API.
>
> Best,
> Shuiqiang
>
> Shilpa Shankar <[hidden email] <mailto:[hidden email]>>
> 于2021年3月4日周四 下午9:29写道:
>
>     Hello,
>
>     We are using pyflink's datastream api v1.12.1 to consume from kafka
>     and want to use one of the fields to act as the "rowtime" for windowing.
>     We realize we need to convert BIGINT to TIMESTAMP before we use it
>     as "rowtime".
>
>     py4j.protocol.Py4JJavaError: An error occurred while calling o91.select.
>     : org.apache.flink.table.api.ValidationException: A group window
>     expects a time attribute for grouping in a stream environment.
>
>     But we are not sure where and how that needs to be implemented.
>     Some help here would be really appreciated.
>
>     Thanks,
>     Shilpa
>
>     import os
>     from pyflink.table.expressions import lit, Expression
>     from pyflink.datastream import StreamExecutionEnvironment,
>     TimeCharacteristic
>     from pyflink.datastream import CheckpointingMode,
>     ExternalizedCheckpointCleanup
>     from pyflink.table import StreamTableEnvironment, DataTypes,
>     EnvironmentSettings, CsvTableSink, TableConfig
>     from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
>     from pyflink.table.window import Slide
>
>     def main():
>          env = StreamExecutionEnvironment.get_execution_environment()
>          env.set_parallelism(1)
>          env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>
>          env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE)
>          config = env.get_checkpoint_config()
>        
>     config.enable_externalized_checkpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
>
>          st_env = StreamTableEnvironment.create(
>              env,
>            
>     environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>          )
>
>          register_kafka_source(st_env)
>          register_transactions_sink_into_csv(st_env)
>
>          #Filter
>          st_env.from_path("source") \
>            
>     .window(Slide.over(lit(2).minutes).every(lit(1).minutes).on("rowtime").alias("w"))
>     \
>              .group_by("customer_id, w") \
>              .select("""customer_id as customer_id,
>                       count(*) as total_counts,
>                       w.start as start_time,
>                       w.end as end_time
>                       """) \
>              .insert_into("sink_into_csv")
>
>     def register_kafka_source(st_env):
>          # Add Source
>          st_env.connect(
>              Kafka() \
>                  .version("universal") \
>                  .topic("topic1") \
>                  .property("group.id <http://group.id>", "topic_consumer") \
>                  .property("security.protocol", "SASL_PLAINTEXT") \
>                  .property("sasl.mechanism", "PLAIN") \
>                  .property("bootstrap.servers", "<bootsptrap_servers>") \
>                  .property("sasl.jaas.config", "<user,password>") \
>                  .start_from_earliest()
>          ).with_format(
>              Json()
>                  .fail_on_missing_field(False)
>                  .schema(
>                  DataTypes.ROW([
>                      DataTypes.FIELD("customer_id", DataTypes.STRING()),
>                      DataTypes.FIELD("time_in_epoch_milliseconds",
>     DataTypes.BIGINT())
>                  ])
>              )
>          ).with_schema(
>              Schema()
>                  .field("customer_id", DataTypes.STRING())
>                  .field("rowtime", DataTypes.BIGINT())
>                  .rowtime(
>                  Rowtime()
>                      .timestamps_from_field("time_in_epoch_milliseconds")
>                      .watermarks_periodic_bounded(10)
>              )
>          ).in_append_mode(
>          ).create_temporary_table(
>              "source"
>          )
>
>
>     def register_transactions_sink_into_csv(env):
>          result_file = "/opt/examples/data/output/output_file.csv"
>          if os.path.exists(result_file):
>              os.remove(result_file)
>          env.register_table_sink("sink_into_csv",
>                                  CsvTableSink(["customer_id",
>                                                "total_count",
>                                                "start_time",
>                                                "end_time"],
>                                               [DataTypes.STRING(),
>                                                DataTypes.DOUBLE(),
>                                                DataTypes.TIMESTAMP(3),
>                                                DataTypes.TIMESTAMP(3)],
>                                               result_file))
>
>     if __name__ == "__main__":
>          main()
>