问题描述
是否有一种方法可以建立一个主机,以使用nodejs通过tcp串流串行连接-我想将传感器数据从计算机上的IoT设备流式传输到连接的计算机再到Web服务器。原始数据流很好-远程计算机将对其进行处理。我一直在研究net
和serialport
npm软件包-但不确定如何将两者结合起来……
谢谢!
解决方法
准备
几乎每个供应商或设备都有自己的串行通信协议。通常这些设备也使用带有标头、校验和的数据包,但每个设备以不同的方式执行此操作。
第一个问题实际上是,您希望将数据包头和校验和信息转发到什么程度。您可能希望将传入的数据包转换为事件,或者可能已经转换为某种 JSON 消息。
假设您只想转发原始格式的数据而不进行任何预处理,确定数据包的开始和结束位置仍然很有价值。当您通过 TCP/IP 刷新数据时,最好不要在这些串行数据包之一中途这样做。
例如,您的设备可能是条码扫描仪。大多数条码扫描器在每次扫描结束时发送 CR(回车)。主动读取传入字节以寻找 CR 字符是有意义的。然后,每次注意到 CR 字符时,您都会刷新字节缓冲区。
但是,它并不总是 CR。一些设备在 STX (0x02) 和 ETX (0x03) 字符之间打包它们的数据。还有一些发送固定长度的包(例如每条消息 12 个字节)。
为了清楚起见,您最终可以每 100 字节发送一次数据,而消息实际上是 12 字节。那会破坏一些数据包。偶尔您的 TCP 接收器会收到一个不完整的数据包。说了这么多。您还可以在 TCP 接收端添加所有这些逻辑。当接收到不完整的数据包时,您可以将其保存在缓冲区中,假设下一个传入的数据包将包含丢失的字节。
考虑是否值得
请注意,您可以购买现成的商用 RS232 转以太网设备并进行配置(约 100 欧元),它们完全符合您的要求。通常在这种设备的设置中,您可以选择配置刷新字符。 (例如那个 CR)。 MOXA 可能是你能得到的最好的。 ADAM 也生产不错的设备。这些供应商制造此类设备已有大约 30 年的历史。
让你开始
但是为了锻炼,我们开始吧。 首先,您需要一些东西来与您的串行设备进行通信。 我用过这个:
npm install serialport@^9.1.0
您几乎可以盲目地复制以下代码。但显然您需要设置自己的 RS232 或 USB 端口设置。查看设备手册以确定波特率、数据位、停止位、奇偶校验和可选的 RTS/DTR
import SerialPort from "serialport";
export class RS232Port {
private port: SerialPort;
constructor(private listener: (buffer: Buffer) => any,private protocol) {
this.port = new SerialPort("/dev/ttyS0",{
baudRate: 38400,dataBits: 8,stopBits: 1,parity: "none",});
// check your RTS/DTR settings.
// this.port.on('open',() => {
// this.port.set({rts: true,dtr: false},() => {
// });
//});
const parser = this.port.pipe(this.protocol);
parser.on('data',(data) => {
console.log(`received packet:[${toHexString(data)}]`);
if (this.listener) {
this.listener(data);
}
});
}
sendBytes(buffer: Buffer) {
console.log(`write packet:[${toHexString(buffer)}]`);
this.port.write(buffer);
}
}
上面的代码不断从串行设备读取数据,并使用“协议”来确定消息的开始/结束位置。它有一个“侦听器”,这是一个回调。它还可以使用 sendBytes
函数发送字节。
这将我们带到了协议,正如前面解释的那样,在找到分隔符之前应该读取该协议。
因为我不知道你的分隔符是什么。我将向您展示另一种选择,它只是等待沉默。它假设在一段时间内没有传入数据时,消息将是完整的。
export class TimeoutProtocol extends Transform {
maxBufferSize: number;
currentPacket: [];
interval: number;
intervalID: any;
constructor(options: { interval: number,maxBufferSize: number }) {
super()
const _options = { maxBufferSize: 65536,...options }
if (!_options.interval) {
throw new TypeError('"interval" is required')
}
if (typeof _options.interval !== 'number' || Number.isNaN(_options.interval)) {
throw new TypeError('"interval" is not a number')
}
if (_options.interval < 1) {
throw new TypeError('"interval" is not greater than 0')
}
if (typeof _options.maxBufferSize !== 'number' || Number.isNaN(_options.maxBufferSize)) {
throw new TypeError('"maxBufferSize" is not a number')
}
if (_options.maxBufferSize < 1) {
throw new TypeError('"maxBufferSize" is not greater than 0')
}
this.maxBufferSize = _options.maxBufferSize
this.currentPacket = []
this.interval = _options.interval
this.intervalID = -1
}
_transform(chunk: [],encoding,cb) {
clearTimeout(this.intervalID)
for (let offset = 0; offset < chunk.length; offset++) {
this.currentPacket.push(chunk[offset])
if (this.currentPacket.length >= this.maxBufferSize) {
this.emitPacket()
}
}
this.intervalID = setTimeout(this.emitPacket.bind(this),this.interval)
cb()
}
emitPacket() {
clearTimeout(this.intervalID)
if (this.currentPacket.length > 0) {
this.push(Buffer.from(this.currentPacket))
}
this.currentPacket = []
}
_flush(cb) {
this.emitPacket()
cb()
}
}
最后一块拼图是 TCP/IP 连接。在这里,您必须确定哪一端是客户端,哪一端是服务器。我暂时跳过了,因为有很多教程和代码示例向您展示了如何设置 TCP/IP 客户端-服务器连接。
在上面的一些代码中,我使用函数 toHexString(Buffer)
将缓冲区的内容转换为十六进制格式,以便更容易地将其打印到控制台日志。
export function toHexString(byteArray: Buffer) {
let s = '0x';
byteArray.forEach(function (byte) {
s += ('0' + (byte & 0xFF).toString(16)).slice(-2);
});
return s;
}