Drools-存储多状态会话

问题描述

我们已经在平台上实现了Drools引擎,以便能够评估流中的规则。 在我们的用例中,我们有一个更改检测流,其中包含多个实体的更改。

需要在一段时间内针对流中的每个实体评估规则,并使其状态与其他实体分开(会话)。这些规则根据每个实体的状态生成警报。因此,实体应处于边界之内,因此一个实体的状态不会干扰其他实体。

为此,我们为每个实体ID创建一个会话作为Spring Bean,并将其存储在inMemory HashMap中。因此,每次实体到达时,我们都会尝试使用其ID在inMemory地图上找到它的会话。如果返回null,则创建它。

这似乎不是实现它的正确方法。因为它不提供灾难恢复策略,所以也不能提供出色的内存管理。

我们可以使用某种inMemory数据库,例如Redis或Memchached。但是我认为它无法准确恢复有状态会话。

有人知道如何以正确的方式使用具有多个会话的嵌入式Drools实现灾难恢复和良好的内存管理吗?该平台提供一些解决方案吗?

非常感谢您的关注和支持

解决方法

答案不是尝试持久化和重用会话,而是持久化对实体当前状态建模的对象。

您当前的工作流程是:

  1. 实体(从变更检测流或其他地方)到达您的应用程序
  2. 您可以在哈希图中进行查询,以获取存储了实体状态的会话
  3. 您触发规则,从而更新会话(可能还会更新实体)
  4. 您将会话保留在内存中。

您的工作流应该是什么:

  1. (相同)实体到达您的应用程序
  2. 您在外部数据源上查找实体状态-例如从数据库或数据存储中查找
  3. 您触发规则,并传入实体状态。您无需更新会话,而可以更新状态实例。
  4. 您将状态保持在外部数据源中。

如果添加适当的直写式缓存,则可以保证性能和一致性。如果您为数据源实施了适当的锁定/事务处理,这也将使您能够横向扩展应用程序。


这是一个玩具例子。

假设我们有一个为图书馆建模的应用程序,允许用户签出书籍。一个用户一次只能签出三本书。

我们收到的“事件”为图书签入或签出事件建模:

class BookBorrowEvent {
  int userId;
  int bookId;
  EventType eventType; // EventType.CHECK_IN or EventType.CHECK_OUT
}

在外部数据源中,我们维护一个UserState记录-可能作为传统RDBMS中的不同记录或聚合;我们如何存储它与示例无关。但是,假设从数据源返回的UserState记录如下所示:

class UserState {
  int userId;
  int[] borrowedBookIds;
}

当我们收到事件时,我们将首先从外部数据存储(或内部管理的直写式缓存)中检索用户状态,然后将UserState添加到规则输入中。当然,我们应该适当地处理会话(在使用后将其丢弃,根据需要使用会话池)。

public void handleBookBorrow(BookBorrowEvent event) {
  UserState state = getUserStateFromStore(event.getUserId());

  KieSession kieSession = ...; 
  kieSession.insert( event );
  kieSession.insert( state ); 
  kieSession.fireAllRules();

  persistUserStateToStore(state);
}

然后,您的规则将针对UserState实例进行工作,而不是将值存储在局部变量中。

一些示例规则:

rule "User borrows a book"
when
  BookBorrowEvent( eventType == EventType.CHECK_OUT,$bookId: bookId != null )
  $state: UserState( $checkedOutBooks: borrowedBookIds not contains $bookId )
  Integer( this < 3 ) from $checkedOutBooks.length
then
  modify( $state ) { ... }
end

rule "User returns a book"
when
  BookBorrowEvent( eventType == EventType.CHECK_IN,$bookId: bookId != null )
  $state: UserState( $checkedOutBooks: borrowedBookIds contains $bookId )
then
  modify( $state ) { ... }
end

显然是一个玩具示例,但是对于用户尝试检出一本重复的书本,用户尝试退回未检出的书本,如果用户返回错误的情况,您可以轻松添加其他规则超过了3本书的最大借阅限制,为允许的结帐时间添加了基于时间的逻辑,等等。

即使您使用的是基于流的处理,因此您可以利用时间运算符,该工作流仍然有效,因为您将在收到状态实例时将其传递给评估流。当然,在这种情况下,出于性能方面的考虑,正确实现直写式缓存将更为重要(除非您的临时运算符足够宽容,以允许某些数据源事务延迟)。您需要做的唯一更改是重新调整规则的重点,以将其数据持久性定位到状态对象而不是会话本身-通常不建议这样做,因为会话被设计为可处理的。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...