Haskell的类型系统可以强制执行数据管道阶段的正确排序吗?

问题描述

我使用质谱数据创建了许多数据处理管道,其中对来自仪器的数据进行了清理,转换,缩放,检查并最终进行了分析。我倾向于为此使用递归类型定义-这是一个大大简化的示例:

data Dataset = Initial { x::(Vector Double),y::(Vector Double) name::String}
             | Cleaned { x::(Vector Double),y::(Vector Double) name::String}
             | Transformed { x::(Vector Double),y::(Vector Double) name::String}

然后,典型的管道将只是一系列功能,这些功能以Dataset创建者开始,然后继续使用消耗Dataset类型的事物并产生{{1}类型的事物的功能}:

Dataset

因此,这可以确保管道中的处理步骤以正确的顺序进行,并且您可以使用合成来创建整个管道

createDataset :: Vector Double -> Vector Double -> String -> Dataset
createDataset x y name = Initial x y name

removeOutliers :: Dataset -> Dataset
removeOutliers (Initial x y n) = let
                         (new_x,new_y) = outlierRemovalFunction x y
                         in Cleaned new_x new_y (n ++"_outliersRemoved")
               (Cleaned x y n) = error "Already been cleaned"
               (Scaled x y n) = error "Scaled data should have already been cleaned"
               (Transformed x y n) = error "Transformed data should have already been cleaned"

logTransform :: Dataset -> Dataset
logTransform (Initial x y n) = error "Need to clean first"
             (Cleaned x y n) = let
                         (new_x,new_y) = logTransformFunction x y
                         in Transformed new_x new_y (n ++ "_logTransformed)


但是由于多种原因,这种方法似乎极为有限。第一个原因是通过构造函数上的模式匹配检测到错误,因此对管道的添加和更改将要求在模式匹配的任何地方进行更改。想象一个更复杂的示例,其中包含几个清理操作和几个转换步骤-基本上每种可能的组合都将需要其自己的唯一构造函数,并且所有模式匹配都必须是非穷举性的,或者必须在各处重复进行。

似乎受到限制的第二个原因是,构造不正确的管道仅在运行时由故障检测到。我已经对所有处理步骤进行了排序,因此在管道中的每个点,我都确切知道数据发生了什么。类型系统应该能够使我避免一开始就将步骤错误地组合在一起,并且使用一个期望在未清理输入上清理数据的函数应该在编译时就可以检测到。

我考虑过要为管道中的每个阶段使用单独的 types ,然后将“ dataset”接口实现为类型类,例如:

(logTransform . removeOutliers . createDataset) init_y init_y "ourData"

然后您可以做一些事情(我认为...),例如:

class Dataset a where
    x :: a -> Vector Double
    y :: a -> Vector Double
    name :: a -> String

data Initial = Initial x y name
instance Dataset Initial where ...

data Cleaned a = Cleaned a
instance Dataset Cleaned where ...

data Transformed a = Transformed a
instance Dataset Transformed where ...

相信这种方法可以解决上面的问题1:现在,我们可以在编译时检测流水线错误,而不再需要用所有这些不同的构造函数来描述处理步骤了

但是,看来我只是将问题“一级升级”了。我现在正在处理类型变量和所有这些嵌套类型。现在,我不需要为每种类型的组合创建一个 removeOutliers :: (Dataset a) => a -> Cleaned a removeOutliers = ... logTransform :: (Dataset a) => Cleaned a -> Transformed Cleaned a logTransform = ... 实例,而不必为每个可能的管道步骤组合都使用Dataset构造函数!

我真正想要的是一种处理管道中的类型在其约束中既非常具体又非常笼统的方法。我想使用详细说明应用特定处理步骤的顺序的类型/约束,但我也希望类型/约束能够传达更通用的信息,即“除了其他不重要的步骤之外,异常值删除已完成”。因此,基本上消除了异常值的事物类型。

传达订购信息将是一项超级奖励-“除了其他不重要的步骤之外,还发生了异常值移除,并且在以后的某个时间点发生了日志转换”。在进行对数转换之前已经消除了异常值的事物的类型(并且不一定在此之前立即)。

使用Haskell的类型系统可以实现这种事情吗?

解决方法

您可以使用幻像类型以其类型存储有关数据集的信息,例如:

data Initial
data Cleaned
data Scaled

data Dataset a = Dataset { x :: Vector Double,y :: Vector Double,name :: String }

createDataset :: Vector Double -> Vector Double -> String -> Dataset Initial
createDataset x y name = Dataset x y name

removeOutliers :: Dataset Initial -> Dataset Cleaned
removeOutliers (Dataset x y n) =
    let (x',y') = clean x y
    in Dataset x' y' (n ++ "_clean")

使用一些GHC扩展,您可以将幻像类型限制为给定的状态类型,并避免显式声明空数据类型。例如:

{-# LANGUAGE DataKinds,KindSignatures #-}

data State = Initial | Cleaned | Scaled

data Dataset (a :: State) = Dataset { x :: Vector Double,name :: String }
,

是的,现代的Haskell类型系统可以处理此问题。但是,与通常的术语级编程相比,Haskell中的类型级编程仍然很难。语法和技术很复杂,并且缺少文档。往往情况是,对需求的相对较小的更改可能导致实现方面的重大更改(即,在您的实现中添加新的“功能”可能会级联为所有类型的主要重组),这可能会导致困难如果您仍然不确定自己的实际需求是什么,可以提出一个解决方案。

@JonPurdy的评论和@AtnNn的回答给出了一些可能的想法。这是一个尝试解决您的特定要求的解决方案。但是,除非您愿意坐下来教自己一些类型级别的编程知识,否则它可能很难使用(或者至少很难适应您的要求)。

无论如何,假设您感兴趣的是用一个已在其上执行的进程的类型级别列表标记一个固定的数据结构(即,始终相同的字段具有相同的类型)。流程列表与所需流程的有序子列表相对应。

我们将需要一些扩展:

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}

过程标签本身被定义为sum类型的构造函数,扩展名DataKinds将标签从术语级别提升到类型级别:

data Process = Cleaned | Transformed | Scaled | Inspected | Analyzed

然后使用一系列应用过程(其“管道”)标记数据结构:

data Dataset (pipeline :: [Process])
  = Dataset { x :: [Double],y :: [Double],name :: String }

注意::最方便的是将管道以相反的顺序进行,首先使用最近应用的Process

要允许我们要求pipeline具有特定顺序的进程子序列,我们需要一个类型级函数(即类型族)来检查子序列。这是一个版本:

type family a || b where
  True  || b = True
  False || b = b

type family Subseq xs ys where
  Subseq '[]      ys  = True
  Subseq nonempty '[] = False
  Subseq (x:xs) (x:ys) = Subseq xs ys || Subseq (x:xs) ys
  Subseq xs     (y:ys) = Subseq xs ys

我们可以在GHCi中测试此类型级别的功能:

λ> :kind! Subseq '[Inspected,Transformed] '[Analyzed,Inspected,Transformed,Cleaned]
Subseq '[Inspected,Cleaned] :: Bool
= 'True
λ> :kind! Subseq '[Inspected,Cleaned] :: Bool
= 'False
λ> :kind! Subseq '[Inspected,Transformed] '[Transformed,Inspected]
Subseq '[Inspected,Inspected] :: Bool
= 'False

如果您要编写一个函数,该函数需要先对数据集进行转换,然后清除异常值(按该顺序),并可能与其他不重要的步骤混合在一起,而函数本身应用了缩放步骤,则签名将看起来像这样:

-- remember: pipeline type is in reverse order
foo1 :: (Subseq [Cleaned,Transformed] pipeline ~ True)
     => Dataset pipeline -> Dataset (Scaled : pipeline)
foo1 = undefined

如果要防止双倍缩放,可以引入另一个类型级别的函数:

type family Member x xs where
  Member x '[] = 'False
  Member x (x:xs) = 'True
  Member x (y:xs) = Member x xs

并添加另一个约束:

foo2 :: ( Subseq [Cleaned,Transformed] pipeline ~ True,Member Scaled pipeline ~ False)
     => Dataset pipeline -> Dataset (Scaled : pipeline)
foo2 = undefined

然后:

> foo2 (Dataset [] [] "x" :: Dataset '[Transformed])
... Couldn't match type ‘'False’ with ‘'True’ ...
> foo2 (Dataset [] [] "x" :: Dataset '[Cleaned,Scaled,Transformed])
... Couldn't match type ‘'False’ with ‘'True’ ...
> foo2 (Dataset [] [] "x" :: Dataset '[Cleaned,Transformed])
-- typechecks okay
foo2 (Dataset [] [] "x" :: Dataset '[Cleaned,Transformed])
  :: Dataset '[ 'Scaled,'Cleaned,'Transformed]

就约束语法和错误消息而言,您可以使它们变得更加友好,并带有一些其他类型别名和类型族:

import Data.Kind
import GHC.TypeLits

type Require procs pipeline = Require1 (Subseq procs pipeline) procs pipeline
type family Require1 b procs pipeline :: Constraint where
  Require1 True procs pipeline = ()
  Require1 False procs pipeline
    = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                 Text " lacks required processing " :<>: ShowType procs)
type Forbid proc pipeline = Forbid1 (Member proc pipeline) proc pipeline
type family Forbid1 b proc pipeline :: Constraint where
  Forbid1 False proc pipeline = ()
  Forbid1 True proc pipeline
    = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                 Text " must not include " :<>: ShowType proc)

foo3 :: (Require [Cleaned,Transformed] pipeline,Forbid Scaled pipeline)
     => Dataset pipeline -> Dataset (Scaled : pipeline)
foo3 = undefined

给出:

> foo3 (Dataset [] [] "x" :: Dataset '[Transformed])
...The pipeline '[ 'Transformed] lacks required processing '[ 'Cleaned,'Transformed]...
> foo3 (Dataset [] [] "x" :: Dataset '[Cleaned,Transformed])
...The pipeline '[ 'Cleaned,'Scaled,'Transformed] must not include 'Scaled...
> foo3 (Dataset [] [] "x" :: Dataset '[Cleaned,Transformed])
-- typechecks okay
foo3 (Dataset [] [] "x" :: Dataset '[Cleaned,'Transformed]

完整的代码示例:

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}

import Data.Kind
import GHC.TypeLits

data Process = Cleaned | Transformed | Scaled | Inspected | Analyzed

data Dataset (pipeline :: [Process])
  = Dataset { x :: [Double],name :: String }

type family a || b where
  True  || b = True
  False || b = b

type family Subseq xs ys where
  Subseq '[]      ys  = True
  Subseq nonempty '[] = False
  Subseq (x:xs) (x:ys) = Subseq xs ys || Subseq (x:xs) ys
  Subseq xs     (y:ys) = Subseq xs ys

type family Member x xs where
  Member x '[] = False
  Member x (x:xs) = True
  Member x (y:xs) = Member x xs

type Require procs pipeline = Require1 (Subseq procs pipeline) procs pipeline
type family Require1 b procs pipeline :: Constraint where
  Require1 True procs pipeline = ()
  Require1 False procs pipeline
    = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                 Text " lacks required processing " :<>: ShowType procs)
type Forbid proc pipeline = Forbid1 (Member proc pipeline) proc pipeline
type family Forbid1 b proc pipeline :: Constraint where
  Forbid1 False proc pipeline = ()
  Forbid1 True proc pipeline
    = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                 Text " must not include " :<>: ShowType proc)


foo1 :: (Subseq [Cleaned,Transformed] pipeline ~ True)
     => Dataset pipeline -> Dataset (Scaled : pipeline)
foo1 = undefined

foo2 :: ( Subseq [Cleaned,Member Scaled pipeline ~ False)
     => Dataset pipeline -> Dataset (Scaled : pipeline)
foo2 = undefined

foo3 :: (Require [Cleaned,Forbid Scaled pipeline)
     => Dataset pipeline -> Dataset (Scaled : pipeline)
foo3 = undefined

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...