tabEnv.connect(new Elasticsearch()
.version("6")
.host("localhost",9092,"http")
.index("sensor")
.documentType("test")
)
.inUpsertMode()
.withFormat(new Json())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("cnt",DataTypes.DOUBLE())
.field("temp",DataTypes.DOUBLE())
).createTemporaryTable("es_output_table")