如何实现将两个消费者合二为一的 fork 功能

问题描述

我正在尝试使用“更快的协程管道”一文中描述的抽象来构建一个流媒体库。我已经修改代码,以便它正确处理管道退出(而不是在发生这种情况时抛出错误):

-- | r: return type of the continuation,i: input stream type,o: output stream type,--   m: underlying monad,a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
  deriving
    ( Functor,applicative,Monad
    )
    via (Cont (Result r m i o))

type Result r m i o = InCont r m i -> OutCont r m o -> m r

newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}

newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}

suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok

suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok

emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok nothing emptyIk

await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)

yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)

(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
  runPipe
    q
    (\a _ ok' -> k a emptyIk ok')
    (suspendIn (runPipe p (\() -> f)) ik)
    ok
  where
    f :: Result r m i e
    f _ ok = resumeOut ok nothing emptyIk

runcontPipe :: forall m a. applicative m => ContPipe a () Void m a -> m a
runcontPipe p = runPipe p (\a _ _ -> pure a) ik ok
  where
    ik :: InCont a m ()
    ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
    ok :: OutCont a m Void
    ok = MakeOutCont \_ ik' -> resumeIn ik' ok

我想实现一个功能

fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a,b)

这将两个消费者流合二为一(类似于管道的 ZipSink)。它应该具有以下语义:

  1. 如果两个流都没有退出并且正在接受输入,则向两个流提供相同的输入值
  2. 如果一个流已经退出,则存储返回值,然后将输入输入到接受该值的流中
  3. 如果两个流都已退出,则退出并将两个流的返回值放入一个元组中。

这是我的尝试:

我们重用了论文中的 loop 函数,该函数一个 InCont r m i 连接到两个 OutCont r m i 并主动恢复延续。

loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
  resumeIn ik $ MakeOutCont \v ik' ->
    resumeOut ok1 v $ MakeInCont \ok1' ->
      resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'

使用 loop 我们可以将结果管道的输入同时连接到两个管道中,输出将在两个管道之间共享(这并不重要,因为您无法产生 Void ).

fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a,b)
fork p q =
  MakePipe \k ik ok ->
    let f :: a -> Result r m i Void
        f a ik' ok' = _
        g :: b -> Result r m i Void
        g b ik' ok' = _
     in runPipe
          p
          f
          (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
          ok

现在我们只需要填写 fg 的延续,它们将在 pq 退出调用。 如果在调用 gf 已经被调用,这意味着 q 已经退出,那么 f 应该调用延续 k,如果 {{1} } 还没有被调用,那么 g 应该存储返回值 f 并恢复输入继续(通过丢弃所有传递的值) 在我看来,如果没有某种形式的共享状态,就不可能实现这一目标。我们可以尝试使用状态 monad 将状态存储在 a 中:

m

fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a,b) fork p q = MakePipe \k ik ok -> let f :: a -> Result r m i Void f a ik' ok' = do s <- get case s of nothing -> do put (Just (Left a)) resumeIn ik' sinkOk Just (Right b) -> do k (a,b) ik' ok' _ -> error "unexpected state" g :: b -> Result r m i Void g b ik' ok' = do s <- get case s of nothing -> do put (Just (Right b)) resumeIn ik' sinkOk Just (Left a) -> do k (a,b) ik' ok' _ -> error "unexpected state" in runPipe p f (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok) ok 是丢弃所有输入的输出延续:

sinkOk

我们现在可以添加一些用于测试的辅助函数

sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk

这在 print' :: Monadio m => Show i => ContPipe r i o m () print' = do m <- await case m of nothing -> pure () Just i -> do lift $ liftIO (print i) print' upfrom :: Int -> ContPipe r i Int m a upfrom i = do yield i upfrom (i + 1) take' :: Int -> ContPipe r i i m () take' n | n <= 0 = pure () | otherwise = do m <- await case m of nothing -> pure () Just i -> do yield i take' (n - 1) 早于 p 退出的情况下有效:

q

给出所需的输出

flip evalStateT nothing $ runcontPipe $ upfrom 1 .| take' 3 .| fork print' print'

但是当 1 1 2 2 3 3 ((),()) 早于 q 退出时,它会进入无限循环:

p

输出

flip evalStateT nothing $ runcontPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)