问题描述
因此,我的 koa 服务器应用程序中有一个 /POST 路由处理程序,每当处理 post
请求时,我都想发出一个事件 ('TASK_HANDLED',...)
,其中包含有关该任务的其他数据。
在其他地方,在事件处理程序中,我想将任务数据写入 SSE 流,以便(特定的,非广播)客户端得到通知,他的任务请求已被处理。我阅读了 koa-sse-stream
的文档,但无法理解如何将 SSE 中间件和路由器结合在一起。我也无法在网络上找到任何可用的 koa-sse-stream
示例。许多示例都有 app.use(<sse code>)
,我觉得它们可以独立工作,但在将它与路由一起使用时并不清楚。
代码组织。
const Koa = require('koa')
const app = new Koa()
app
.use(errorHandler)
.use(session(sessionConfig,app))
.use(bodyParser)
.use(passport.initialize())
.use(passport.session())
.use(cors({ credentials: true }))
.use(setHeaders)
.use(shutdown(app,shutDownConfig))
.use(serve(__approot + '/public'))
require('./utils/passport.util') // Important: this must be done after passport.initialize()
applyApiMiddleware(app) // apply route middleware/s
module.exports = app
./index.js
const initDB = require('./utils/initMongoDB')
const { initUserStorage } = require('./utils/userStorage.util')
const { initSessionStorage } = require('./utils/sessionStorage.util')
const http = require('http')
const app = require('./app')
initUserStorage(app).then().catch() // code snipped
initSessionStorage(app).then().catch() // code snipped
initDB(app).then().catch() // code snipped
process.on('SIGINT',async () => {
await cleanup('SIGINT')
process.exit()
})
process.on('SIGTERM',async () => {
await cleanup('SIGTERM')
process.exit()
})
function cleanup (sig) {
return new Promise((resolve) => {
console.debug(`... received signal: ${sig}`)
setTimeout(() => {
dbClose()
console.debug('Connection to MongoDB closed.')
console.debug('cleanup finished.')
resolve()
},1500)
})
}
所有 api 路由都具有结构 (index,routes and controller)
。这里是 /sse
路由
./sse.routes.js
const controller = require ('./sse.controller')
module.exports = (Router) => {
const router = new Router({
prefix: '/sse'
})
router.get('/:userid',async ctx => {
await controller.sseHandler(ctx)
})
return router
}
客户端/浏览器将向 /sse/:userid
路由发起 sse 请求。
./sse.controller.js
const { Passthrough } = require('stream')
exports.sseHandler = async ctx => {
const userid = ctx.params.userid
ctx.request.socket.setTimeout(0)
ctx.req.socket.setNoDelay(true)
ctx.req.socket.setKeepAlive(true)
ctx.set({
"Content-Type": "text/event-stream","Cache-Control": "no-cache","Connection": "keep-alive",})
const stream = new Passthrough()
ctx.status = 200
ctx.body = stream
const listener = (data) => {
stream.write(data);
}
stream.on('close',() => {
clearInterval(interval)
})
}
/POST 路由是 ..(./tea.routes.js
)。此发布请求来自另一个应用程序(Huey 任务执行器),而不是客户端/浏览器。
const controller = require('./tea.controller')
module.exports = (Router) => {
const router = new Router({
prefix: '/tea'
})
/** other routes */
// Agent notifies after task is executed.
router.post('/task',async ctx => {
await controller.postTask(ctx)
})
return router
}
并且 /POST
处理程序是 (./tea.controller.js
)
const EventEmitter = require('events')
const events = new EventEmitter()
events.setMaxListeners(0)
.
.
exports.postTask = async ctx => {
if (ctx.request.body.taskid) {
console.log('DEBUG: /POST body = %O',ctx.request.body)
const storedTask = await fetchTaskId(ctx.request.body.taskid)
console.log('DEBUG: stored task %s - %O complete',ctx.request.body.taskid,storedTask)
events.emit('TASK_EVENT',storedTask)
// ctx.body = "OK"
}
}
如何/在哪里编写 TASK_EVENT
事件侦听器?或者一般来说,我如何连接任务完成发布请求并写入 SSE 流?
总的来说,我的 SEE 方法好吗?我也对不使用 koa-sse-stream
的解决方案持开放态度。
对不起,如果帖子有点长。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)