问题描述
我正在尝试使用“更快的协程管道”一文中描述的抽象来构建一个流媒体库。我已经修改了代码,以便它正确处理管道退出(而不是在发生这种情况时抛出错误):
-- | r: return type of the continuation,i: input stream type,o: output stream type,-- m: underlying monad,a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
deriving
( Functor,applicative,Monad
)
via (Cont (Result r m i o))
type Result r m i o = InCont r m i -> OutCont r m o -> m r
newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}
newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}
suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok
suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok
emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok nothing emptyIk
await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)
yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)
(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
runPipe
q
(\a _ ok' -> k a emptyIk ok')
(suspendIn (runPipe p (\() -> f)) ik)
ok
where
f :: Result r m i e
f _ ok = resumeOut ok nothing emptyIk
runcontPipe :: forall m a. applicative m => ContPipe a () Void m a -> m a
runcontPipe p = runPipe p (\a _ _ -> pure a) ik ok
where
ik :: InCont a m ()
ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
ok :: OutCont a m Void
ok = MakeOutCont \_ ik' -> resumeIn ik' ok
fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a,b)
这将两个消费者流合二为一(类似于管道的 ZipSink
)。它应该具有以下语义:
这是我的尝试:
我们重用了论文中的 loop
函数,该函数将一个 InCont r m i
连接到两个 OutCont r m i
并主动恢复延续。
loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
resumeIn ik $ MakeOutCont \v ik' ->
resumeOut ok1 v $ MakeInCont \ok1' ->
resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'
使用 loop
我们可以将结果管道的输入同时连接到两个管道中,输出将在两个管道之间共享(这并不重要,因为您无法产生 Void
).
fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a,b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = _
g :: b -> Result r m i Void
g b ik' ok' = _
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
现在我们只需要填写 f
和 g
的延续,它们将在 p
和 q
退出时调用。
如果在调用 g
时 f
已经被调用,这意味着 q
已经退出,那么 f
应该调用延续 k
,如果 {{1} } 还没有被调用,那么 g
应该存储返回值 f
并恢复输入继续(通过丢弃所有传递的值)
在我看来,如果没有某种形式的共享状态,就不可能实现这一目标。我们可以尝试使用状态 monad 将状态存储在 a
中:
m
fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a,b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = do
s <- get
case s of
nothing -> do
put (Just (Left a))
resumeIn ik' sinkOk
Just (Right b) -> do
k (a,b) ik' ok'
_ -> error "unexpected state"
g :: b -> Result r m i Void
g b ik' ok' = do
s <- get
case s of
nothing -> do
put (Just (Right b))
resumeIn ik' sinkOk
Just (Left a) -> do
k (a,b) ik' ok'
_ -> error "unexpected state"
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
是丢弃所有输入的输出延续:
sinkOk
sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk
这在 print' :: Monadio m => Show i => ContPipe r i o m ()
print' = do
m <- await
case m of
nothing -> pure ()
Just i -> do
lift $ liftIO (print i)
print'
upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
yield i
upfrom (i + 1)
take' :: Int -> ContPipe r i i m ()
take' n
| n <= 0 = pure ()
| otherwise = do
m <- await
case m of
nothing -> pure ()
Just i -> do
yield i
take' (n - 1)
早于 p
退出的情况下有效:
q
给出所需的输出:
flip evalStateT nothing $ runcontPipe $ upfrom 1 .| take' 3 .| fork print' print'
但是当 1
1
2
2
3
3
((),())
早于 q
退出时,它会进入无限循环:
p
输出:
flip evalStateT nothing $ runcontPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)