maven 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.6.0</version>
</dependency>
示例代码
package io.github.flink.test
import java.util
import java.util.Random
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.functions.source.sourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object EsSinkTest1 {
//创建一个实体对象
case class CameraSpeedData(id: String,timestamp: Long,speed: Double){};
//自定义数据源
class MyCameraSource extends SourceFunction[CameraSpeedData]{
var running = true
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.sourceContext[CameraSpeedData]): Unit = {
val random = new Random()
//设置可变的速度
var carSpeed = 1.to(100).map(i => ("car_"+ i ,66 + random.nextGaussian()*20))
while (running) {
carSpeed = carSpeed.map( t =>(t._1,t._2+ random.nextGaussian()))
//获取时间
val curTime = System.currentTimeMillis()
carSpeed.foreach(t =>ctx.collect(CameraSpeedData(t._1,curTime,t._2)))
Thread.sleep(100)
}
}
}
def main(args: Array[String]): Unit = {
//创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment;
//设置并行度
env.setParallelism(1)
//从文件中读取数据或者从自定义source 中读取数据
//val inputStream =env.readTextFile("*****.txt")
// val dataStream = inputStream.map(data => {
// val dataArray = data.split(",")
// CameraSpeedData(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.todouble)
// }
// )
//从自定义source 中读取数据
val inputStream = env.addSource(new MyCameraSource)
val dataStream = inputStream
.map(
data => {
data
})
val hosts = new util.ArrayList[HttpHost]()
//添加Es的地址和端口
hosts.add(new HttpHost("192.10.10.43", 9200) )
//创建es的sink 的Builder
val esSinkBuilder = new ElasticsearchSink.Builder[CameraSpeedData](hosts,new ElasticsearchSinkFunction[CameraSpeedData] {
override def process(data: CameraSpeedData,ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("data is "+data)
//创建一个map用来将数据格式化为json对象
val json = new util.HashMap[String,String]()
json.put("car_id",data.id)
json.put("speed",data.speed.toString)
json.put("timestamp",data.timestamp.toString)
//创建es的索引准备发送数据
val request = Requests.indexRequest().index("carspeed").`type`("readingdata").source(json)
//发送请求,写入数据
indexer.add(request)
println("data post")
}
})
//添加es的sink
dataStream.addSink(esSinkBuilder.build())
env.execute()
}
}
结果