问题描述
我有一个非常自定义的非平凡同步的要求,可以使用公平的 reentrantlock
和 Phaser
来实现。在 fs2
和 cats.effect
上实现(没有重要的定制)似乎是不可能的。
由于需要将所有阻塞操作包装到 Blocker
中,因此代码如下:
private val l: reentrantlock = new reentrantlock(true)
private val c: Condition = l.newCondition
private val b: Blocker = //...
//F is declared on the class level
def lockedMutex(conditionPredicate: Int => Boolean): F[Unit] = blocker.blockOn {
Sync[F].delay(l.lock()).bracket(_ => Sync[F].delay{
while(!conditionPredicate(2)){
c.await()
}
})(_ => Sync[F].delay(l.unlock()))
}
问题:
是否保证包含c.await()
的代码会在获取/释放Thread
的同一个reentrantlock
中执行?
这是一个至关重要的部分,因为如果它不是 IllegalMonitorStateException
将被抛出。
解决方法
在使用诸如 cats-effect 之类的东西时,您确实不需要担心线程,而是可以在更高的层次上描述您的问题。
这应该得到与您想要的相同的行为,它将运行高优先级作业,直到没有更多可以选择低优先级作业为止。在完成一个低优先级的工作后,每根光纤都会首先检查是否有更多的高优先级工作,然后再尝试选择一个低优先级的工作:
import cats.effect.Async
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import scala.concurrent.ExecutionContext
object HighLowPriorityRunner {
final case class Config[F[_]](
highPriorityJobs: Queue[F,F[Unit]],lowPriorityJobs: Queue[F,customEC: Option[ExecutionContext]
)
def apply[F[_]](config: Config[F])
(implicit F: Async[F]): F[Unit] = {
val processOneJob =
config.highPriorityJobs.tryTake.flatMap {
case Some(hpJob) => hpJob
case None => config.lowPriorityJobs.tryTake.flatMap {
case Some(lpJob) => lpJob
case None => F.unit
}
}
val loop: F[Unit] = processOneJob.start.foreverM
config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec))
}
}
您可以使用 customEC
提供您自己的 ExecutionContext 来控制在后台运行您的 Fiber 的实际线程的数量。
代码可以这样使用:
import cats.effect.{Async,IO,IOApp,Resource}
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec =>
Program[IO](ExecutionContext.fromExecutor(ec))
}
}
object Program {
private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] =
F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *>
F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing,use F.sleep on real code.
F.delay(println(s"Finished job ${id}!"))
def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for {
highPriorityJobs <- Queue.unbounded[F,F[Unit]]
lowPriorityJobs <- Queue.unbounded[F,F[Unit]]
runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config(
highPriorityJobs,lowPriorityJobs,Some(customEC)
)).start
_ <- List.range(0,10).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- List.range(10,15).traverse_(id => lowPriorityJobs.offer(createJob(id)))
_ <- F.sleep(5.seconds)
_ <- List.range(15,20).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- runnerFiber.join.void
} yield ()
}
应该产生这样的输出:
Starting job 0 on thread pool-1-thread-1
Starting job 1 on thread pool-1-thread-2
Finished job 0!
Finished job 1!
Starting job 2 on thread pool-1-thread-1
Starting job 3 on thread pool-1-thread-2
Finished job 2!
Finished job 3!
Starting job 4 on thread pool-1-thread-1
Starting job 5 on thread pool-1-thread-2
Finished job 4!
Finished job 5!
Starting job 6 on thread pool-1-thread-1
Starting job 7 on thread pool-1-thread-2
Finished job 6!
Finished job 7!
Starting job 8 on thread pool-1-thread-1
Starting job 9 on thread pool-1-thread-2
Finished job 8!
Finished job 9!
Starting job 10 on thread pool-1-thread-1
Starting job 11 on thread pool-1-thread-2
Finished job 10!
Finished job 11!
Starting job 15 on thread pool-1-thread-1
Starting job 16 on thread pool-1-thread-2
Finished job 15!
Finished job 16!
Starting job 17 on thread pool-1-thread-1
Starting job 18 on thread pool-1-thread-2
Finished job 17!
Finished job 18!
Starting job 19 on thread pool-1-thread-1
Starting job 12 on thread pool-1-thread-2
Finished job 19!
Starting job 13 on thread pool-1-thread-1
Finished job 12!
Starting job 14 on thread pool-1-thread-2
Finished job 13!
Finished job 14!
感谢 Gavin Bisesi (@Daenyth) 将我最初的想法改进为这个!
完整代码可用here。