问题描述
我有kafkaProducer演员:
class KafkaProducerActor @Inject()(
avroProducer: MyKafkaProducerAvro,jsonProducer: MyKafkaProducerJson,metrics: PrometheusMetricsService
)
extends Actor
{
def handleErrs(block: => Unit): Unit = {
try {
block
} catch {
case e: Exception =>
Logger.error(s"failed to produce kafka message,error: ${e.getMessage},cause: ${ExceptionUtils.getRootCause(e)},stacktrace: ${ExceptionUtils.getStackTrace(e)}")
metrics.incKafkaErrorCounter(e.getClass.getName)
}
}
override def receive: Receive = {
case rec: ProducerRecord[GenericRecord,GenericRecord] =>
handleErrs(avroProducer.produce(rec))
case ProducerRecordJson(topic,key,content) =>
handleErrs(jsonProducer.produce(new ProducerRecord[String,String](topic,content)))
}
}
此外,我正在尝试使用actorSystem来获取actorRef:
val kafka: ActorRef = actorSystem.actorOf(KafkaProducerActor.props,name = "kafkaProducerActor")
对于我在KafkaProducerActor中定义的内容:
object KafkaProducerActor {
def props: Props = Props(classOf[KafkaProducerActor])
}
警告以下内容:
找不到合适的actor构造函数,可能是动态调用 替换为构造函数调用
当用构造函数调用替换动态调用时(如编译器建议),即
object KafkaProducerActor {
def props: Props = Props(new KafkaProducerActor())
}
我收到编译错误:
未指定的值参数:avroProducer:MyKafkaProducerAvro,jsonProducer:MyKafkaProducerJson,指标:PrometheusMetricsService
在这种情况下启动道具的正确方法是什么?
解决方法
您应该在调用KafkaProducerActor.props
的上下文中注入服务,并将其作为参数传递。
或者只是将其手动注入到构造函数中,但是您需要为此进行静态全局注入。您可以使用这样的助手来实现它:
object InjectHelper {
lazy val injector: Injector = {
val moduleInstance: com.google.inject.Module = ??? // somehow get your guice module
Guice.createInjector(moduleInstance)
}
def inject[T](implicit mf: Manifest[T]): T =
InjectHelper.injector.getInstance(mf.runtimeClass).asInstanceOf[T]
}
object KafkaProducerActor {
def props: Props = Props(
new KafkaProducerActor(
InjectHelper.inject[MyKafkaProducerAvro],InjectHelper.inject[MyKafkaProducerJson],InjectHelper.inject[PrometheusMetricsService]
)
)
}