问题描述
我有一个 Kinesis 消费者,他的工作是跟踪系统中的“当前活跃用户”。用户每分钟都会向 Kinesis 流发送一次心跳,而该系统只会保留它所看到的所有唯一用户 GUID 的列表,以及他们上次从该 GUID 收到心跳的时间。如果在 2 分钟内没有看到心跳,我们假设用户不再活跃并将他们从“当前活跃用户”列表中驱逐。很直接。
因为这个系统只关心当前活跃的用户,我们不需要回处理旧的消息。如果我们要关闭这个消费者 2 小时然后再打开它,我们希望从“最新”消息开始处理,而不是从我们停止的地方继续。
最后,这已根据 Amazon Kinesis Client NodeJS example 实现为 NodeJS 应用程序,使用 MultiLangDaemon 与 Kinesis 客户端库进行通信。
在正常使用下,我发现始终从“最新”恢复的最佳方法是永远不要使用 KCL 的检查点功能。例如,在我的 processRecords
方法的底部,我有以下内容:
// We don't checkpoint with kinrta,because if we crash for some reason we
// want to immediately catch back up to live instead of wasting time
// processing expired heartbeats
// processRecordsInput.checkpointer.checkpoint(sequenceNumber,// function(err,checkpointedSequenceNumber) {
completeCallback();
// }
// );
这样,每当我杀死使用者并重新启动它时,它都会查看 *.properties
文件并看到“initialPositionInStream”是“LATEST”,然后从那里开始处理。
无论如何
当我重新分片我的流(拆分分片或合并分片)时,我遇到了一个问题。当我重新分片新分片上的检查点时,不是 设置为“LATEST”,而是设置为“TRIM_HORIZON”。由于我从不重新检查点,这意味着如果我的消费者关闭并重新启动,我最终必须处理 24 小时 的数据。
我可以通过编辑 KCL 用于管理检查点的 Dynamo 表来手动修复此问题,但这显然不是可扩展的解决方案。我尝试使用检查指针并传递字符串“LATEST”而不是序列号,但这会引发序列号无效的错误。
我如何告诉 KCL,当我重新分片时,我想将新分片上的检查点设置为“最新”?
作为一个 hack-y 解决方案,我考虑过只使用 DynamoDB SDK 并修复 initialize
方法中的检查点。这很难看,但我认为它会起作用(假设亚马逊不改变他们管理 KCL 表的方式)
更新
根据所描述的“hack-y 解决方案”,我编写了以下小辅助方法:
/**
* Assumes the current shardId (available in the initialize method's
* `initializeInput.shardId`) is stored in the global "state" object,* accessible via the "state" import
*/
import { Kinesis,DynamoDB } from "aws-sdk";
import state from "../state";
import logger from "./logger";
const kinesis = new Kinesis();
const ddb = new DynamoDB.DocumentClient();
const log = logger().getLogger("recordProcessor");
const appName = process.env.APP_NAME;
export default async function (startingCheckpoint: string) {
// We can't update any Dynamo tables if we don't know which table to update
if (!appName) return;
// Compute the name of the shard JUST BEFORE ours
// Because Kinesis uses an "exclusive" start ID...
const shardIdNum = parseInt(state.shardId.split("-")[1]) - 1;
const startShardId = "shardId-" + ("000000000000" + shardIdNum).substr(-12);
// Pull data about our current shard
const kinesisResp = await kinesis.listShards({
StreamName: process.env.KINESIS_STREAM_NAME,MaxResults: 1,ExclusiveStartShardId: startShardId
}).promise();
const oldestSeqNumber = kinesisResp.Shards[0].SequenceNumberRange.StartingSequenceNumber;
// Pull data about our current checkpoint
const dynamoResp = await ddb.get({
TableName: appName,Key: {
leaseKey: state.shardId
}
}).promise();
const prevCheckpoint = dynamoResp.Item.checkpoint;
log.debug(`Oldest sequence number in Kinesis shard: ${oldestSeqNumber} vs checkpoint: ${prevCheckpoint}`);
// Determine if we need to "fix" anything
if (startingCheckpoint === "TRIM_HORIZON") {
// If our checkpoint is before the oldest sequence number,reset it to
// "TRIM_HORIZON" so we pull the oldest sequence number
if (prevCheckpoint < oldestSeqNumber) {
log.info("Updating checkpoint to TRIM_HORIZON");
await ddb.update({
TableName: appName,Key: {
leaseKey: state.shardId
},UpdateExpression: "SET #checkpoint = :value",ExpressionAttributeNames: {
"#checkpoint": "checkpoint"
},ExpressionAttributeValues: {
":value": "TRIM_HORIZON"
}
}).promise();
}
} else if (startingCheckpoint === "LATEST") {
if (prevCheckpoint !== "LATEST") {
log.info("Updating checkpoint to LATEST");
await ddb.update({
TableName: appName,ExpressionAttributeValues: {
":value": "LATEST"
}
}).promise();
}
} else {
log.warn("We can't 'fix' checkpoints that aren't TRIM_HORIZON or LATEST");
}
}
我进行了测试,这正确有效地更新了 DynamoDB 表,但它不会立即开始从新位置提取记录。看起来 KCL 在调用 initialize
方法之前读取了一次检查点,并且永远不会重新读取它。
此时,我正在寻找一种方法来告诉 KCL“开始使用新的检查点”,或者一种优雅地重新启动使用者以便它重新初始化所有内容的方法。我还没有,但我会继续研究。也许我可以在 MultiLangDaemon 文档中找到可以写入 STDOUT 的内容...
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)