问题描述
我希望有一个Source.queue
(或类似的东西可以将其推送到物化图上),用来告诉我队列的当前饱和度。
我想这样做,而不(重新)实现QueueSource
图形阶段提供的功能。
我想出的一种可能的解决方法是:
object InstrumentedSource {
final class InstrumentedSourceQueueWithComplete[T](
delegate: SourceQueueWithComplete[T],bufferSize: Int,)(implicit executionContext: ExecutionContext)
extends SourceQueueWithComplete[T] {
override def complete(): Unit = delegate.complete()
override def fail(ex: Throwable): Unit = delegate.fail(ex)
override def watchCompletion(): Future[Done] = delegate.watchCompletion()
private val buffered = new AtomicLong(0)
private[InstrumentedSource] def onDequeue(): Unit = {
val _ = buffered.decrementAndGet()
}
object BufferSaturationRatioGauge extends RatioGauge {
override def getRatio: RatioGauge.Ratio = RatioGauge.Ratio.of(buffered.get(),bufferSize)
}
lazy val bufferSaturationGauge: RatioGauge = BufferSaturationRatioGauge
override def offer(elem: T): Future[QueueOfferResult] = {
val result = delegate.offer(elem)
result.foreach {
case QueueOfferResult.Enqueued =>
val _ = buffered.incrementAndGet()
case _ => // do nothing
}
result
}
}
def queue[T](bufferSize: Int,overflowStrategy: OverflowStrategy)(
implicit executionContext: ExecutionContext,materializer: Materializer,): Source[T,InstrumentedSourceQueueWithComplete[T]] = {
val (queue,source) = Source.queue[T](bufferSize,overflowStrategy).preMaterialize()
val instrumentedQueue = new InstrumentedSourceQueueWithComplete[T](queue,bufferSize)
source.mapMaterializedValue(_ => instrumentedQueue).map { item =>
instrumentedQueue.onDequeue()
item
}
}
}
这似乎主要是通过一些手动测试来实现的(除了buffered
最终最终与队列中的实际项目数保持一致,在我的情况下应该是可以的),但是我想知道是否有解决方案可以更好地利用我可能错过的内置功能。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)