在Node JS应用中使用Kafka Consumer来表示已进行计算

问题描述

所以我的问题可能涉及基于应用程序性质的一些头脑风暴。

我有一个Node JS应用程序,可以将消息发送到Kafka。例如,用户每次单击页面时,Kafka应用程序都会基于访问量运行计算。然后,在同一实例中,我想通过我的Kafka消息触发计算后检索该计算。到目前为止,此计算已存储在Cassandra数据库中。问题是,如果我们尝试在计算完成之前从Cassandra中读取数据,那么我们将不会从数据库中查询任何内容(尚未插入键),并且不会返回任何内容(错误),或者计算可能已过期。到目前为止,这是我的代码。

router.get('/:slug',async (req,res) =>{

Producer = kafka.Producer


KeyedMessage = kafka.KeyedMessage
  client = new kafka.KafkaClient()



producer = new Producer(client)



km = new KeyedMessage('key','message')
  kafka_message = JSON.stringify({ id: req.session.session_id.toString(),url: arbitrary_url })
  payloads = [
    { topic: 'MakeComputationTopic',messages: kafka_message}
  ]; 
const clientCass = new cassandra.Client({
contactPoints: ['127.0.0.1:9042'],localDataCenter: 'datacenter1',// here is the change required
keyspace: 'computation_space',authProvider: new auth.PlainTextAuthProvider('cassandra','cassandra')
});



const query = 'SELECT * FROM computation  WHERE id = ?';




clientCass.execute(query,[req.session.session_id],{ hints : ['int'] })
  .then(result => console.log('User with email %s',result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });


}

首先,我想到了异步和等待,但这是可以排除的,因为这不会停止过时的计算。

其次,我研究了让应用程序进入睡眠状态,但是这种方式似乎会使我的应用程序变慢。

我可能决定使用Kafka Consumer(在我的node-js中)消耗一条消息,该消息表明现在可以安全地查看Cassandra表。

例如(使用kafka-node)

consumer.on('message',function (message) {
    clientCass.execute(query,{ hints : ['int'] })
  .then(result => console.log('User with computation%s',result.rows[0].computations))
  .catch((message) => {
    console.log('Could not find key')
  });
}); 

这种方法似乎更好一些,因为每次用户单击页面时我都必须让消费者使用,而我只关心向它发送1条消息。

我想知道如何应对这一挑战?我可能错过了一个场景,还是有一种方法可以使用kafka-node解决此问题?我也在考虑做一个while循环,等待承诺成功,并且计算不是过时的(比较缓存中的值)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)