Flink tableapi数据写入ES

tabEnv.connect(new Elasticsearch()
      .version("6")
      .host("localhost",9092,"http")
      .index("sensor")
      .documentType("test")
    )
      .inUpsertMode()
      .withFormat(new Json())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("cnt",DataTypes.DOUBLE())
        .field("temp",DataTypes.DOUBLE())
      ).createTemporaryTable("es_output_table")

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...