使用CDC的Mulesoft与Salesforce Streaming API

问题描述

我正在研究Mule API流程,以测试Salesforce事件流。我已经设置好连接器并订阅了流媒体频道。

当我创建/更新/删除联系人记录,事件通过并通过将它们添加到另一个数据库进行处理时,此方法就很好了。

enter image description here

我对Border功能有些困惑。使用当前设置,我可以关闭Mule应用程序,在组织中创建联系人,然后当我使该应用程序重新联机时,它会通过从其上次中断的地方添加数据来恢复。完美。

但是,我正在尝试模拟如果在处理事件时the子应用程序崩溃会发生什么情况。

我运行了一些APEX来创建100条随机联系记录。一旦看到它记录了我应用程序中的第一个流,我就杀死了kill子应用程序。我在这里的假设是,当我恢复应用程序时,它会知道它从何处退出,就像在上一次测试中那样,在创建联系人之前是脱机的。

我注意到的是,在我关闭该应用程序之前,它仅处理通过它的几个联系人。

看来,事件可能在流输入中是如此之快,以至于它已经到达流中的最后<ListView x:Name="SearchFields" Grid.Row="0" Grid.Column="0" ItemsSource="{Binding CustomerSearchFields}" SelectedItem="{Binding selectedSearchField,UpdateSourceTrigger=LostFocus}" Style="{StaticResource MaterialDropShadowStyle}" BorderThickness="0,2,0" Padding="24,24,0" HorizontalContentAlignment="Stretch" helper:EnterKeyTraversal.IsEnabled="True" KeyboardNavigation.TabNavigation="Cycle" FontFamily="{StaticResource DefaultFontFamily}" Background="{StaticResource ColorLightGray2}"> <ListView.Resources> <DataTemplate DataType="{x:Type model:CustomerSeachFieldviewmodel}"> <Grid> <Grid.RowDeFinitions> <RowDeFinition Height="*" /> <RowDeFinition Height="48" /> </Grid.RowDeFinitions> <TextBlock Grid.Row="0" Text="{Binding Description}" FontSize="{StaticResource FontSizeSmall}" FontWeight="SemiBold" Foreground="{StaticResource ColorDarkGray}" Margin="0,4" /> <Border x:Name="PART_Border" Grid.Row="1" BorderThickness="1" BorderBrush="{StaticResource ColorGray}"> <Grid Background="White"> <Grid.ColumnDeFinitions> <ColumnDeFinition Width="*" /> <ColumnDeFinition Width="48" /> </Grid.ColumnDeFinitions> <TextBox Grid.Column="0" Text="{Binding SearchText}" VerticalAlignment="Stretch" HorizontalAlignment="Stretch" FontSize="{StaticResource FontSizenormal}" Padding="12,15" BorderThickness="0" /> <Border Grid.Column="1" Background="{StaticResource ColorLightGray2}" Margin="8"> <ctrl:IconViewBox IconData="{StaticResource IconPathSearch}" IconSize="16" IsTabStop="False" /> </Border> </Grid> </Border> </Grid> </DataTemplate> </ListView.Resources> <ListView.ItemContainerStyle> <Style targettype="ListViewItem"> <Setter Property="IsTabStop" Value="False" /> <Setter Property="Margin" Value="0,24" /> <Setter Property="Template"> <Setter.Value> <ControlTemplate> <ContentPresenter Content="{Binding}" /> </ControlTemplate> </Setter.Value> </Setter> <Style.Triggers> <Trigger Property="IsKeyboardFocusWithin" Value="True"> <Setter Property="IsSelected" Value="True" /> </Trigger> <Trigger Property="IsSelected" Value="True"> <Trigger.Setters> <Setter Property="BorderBrush" Value="Fuchsia" /> </Trigger.Setters> </Trigger> </Style.Triggers> </Style> </ListView.ItemContainerStyle> </ListView> 。但是,由于这些记录仍未添加到我的外部数据库中,因此我丢失了这些记录。流完成了应做的工作,但是由于该应用仍在处理大量工作,因此我的100条记录没有像replayId那样提交。

如何解决此问题,以免在应用程序崩溃前如果有大量数据流而不会丢失数据?我记得使用Kafka时,一旦将ID插入数据库,您就必须能够replayId,以便它知道您正式处理的最后一个ID。在Mule中有这样一个概念,我可以告诉它我正式退出并致力于DB的位置吗?

解决方法

协议(CometD)级别的可靠性意味着许多属性。其中最主要的是订户已接收到的消息的事务性ACK(通知)。 CometD支持ACK作为扩展。 Salesforce的CometD实施不支持ACK。即使这样做,您仍然有issues ...但是风险发生的频率/损失可能会更低。

对于您而言,您必须设计一种解决方案,该解决方案相当于查找和重放未提交给目标数据库的事件。您可以使用Mule中的自定义代码或接线适配器来执行此操作。重播ID值不能保证在连续事件中是连续的,但是将对其进行排序。重播ID为100的事件A之后是重播ID为200的事件B。

您将需要在数据库中存储重播ID值。然后,您可以在重新订阅(订户失败后)时使用它来从SF中检索数据库中缺少的事件。仅当故障窗口足够小时,这才起作用。对于标准平台事件许可证,Salesforce事件保留窗口当前为24小时。较高级别的许可证可以保留更长的时间。

根据数据量,事件的发生频率和其他过程参数,可以使用Heroku Connect来开箱即用。它的确暗示了有关Heroku的Postgres DB + HC的许可成本和运营成本,但是我们在类似情况下的大多数客户都认为值得。