Flink整合kafka实现Flume监控指定文件改动

场景:

实现监控指定文件改动后,将改动的文件内容传输到kafkatopic中

第一步修改flume-conf.properties

# the core components

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# set source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /export/server/flume/conf/eventlog.log

# Describe the sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic=eventlog

a1.sinks.k1.kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

第二步:启动kafka集群

cd kafka/bin

kafka-server-start.sh -daemon ../config/server.properties

第三步:打开kafka消费者

./kafka-console-consumer.sh --topic eventlog --bootstrap-server node1:9092,node2:9092,node3:9092

第四步:启动flume

./flume-ng agent -c ../conf/ -f ../conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

第五步:更改 /export/server/flume/conf/下的eventlog.log 文件

mv eventlog.log eventlog.log2

mv eventlog.log2 eventlog.log

结果:

 

添加flink程序获取kafka的数据

SourceKafka.scala

class SourceKafka {

def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] ={

val props = new Properties();

//kafka消费者节点信息

props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");//3,4

//kafka组id

props.setProperty("group.id","test-consumer-group")

//key序列化方式

props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

//value序列化方式

props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

//kafka偏移量消费策略

/**

* none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常

* earliest:在各分区下有提交的offset时:从offset处开始消费,在各分区下无提交的offset时:从头开始消费

* latest:在各分区下有提交的offset时:从offset处开始消费,在各分区下无提交的offset时:从最新的数据开始消费

*/

props.setProperty("auto.offset.reset","latest")

new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(),props)

}

}

AdEventLog.scala

object AdEventLog {

def main(args: Array[String]): Unit = {

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

import org.apache.flink.api.scala._

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置时间特征为事件时间

//从kafka中获取数据(flume)

val kafkaSource: FlinkKafkaConsumer[String] = new SourceKafka().getKafkaSource("eventlog")

val eventLogStream: DataStream[String] = env.addSource(kafkaSource)

eventLogStream.print()

env.execute()

}

}

结果展示:

 

相关文章

显卡天梯图2024最新版,显卡是电脑进行图形处理的重要设备,...
初始化电脑时出现问题怎么办,可以使用win系统的安装介质,连...
todesk远程开机怎么设置,两台电脑要在同一局域网内,然后需...
油猴谷歌插件怎么安装,可以通过谷歌应用商店进行安装,需要...
虚拟内存这个名词想必很多人都听说过,我们在使用电脑的时候...