Add pings, debug logs, socket cleanup to Unit

At some point I need to stress-test this to see how long the connection can stay open, and what happens when it goes down.
This commit is contained in:
2025-01-04 12:05:29 -06:00
parent 5ae6cac549
commit ab4898c20b
4 changed files with 105 additions and 21 deletions

3
dist/unit.d.ts vendored
View File

@ -6,9 +6,12 @@ export declare class Unit extends EventEmitter {
port: number;
private client?;
private pingTimeout?;
private pingTimer?;
private pingInterval;
constructor(endpoint: string, port?: number);
connect(): Promise<void>;
close(): void;
private socketCleanup;
private heartbeat;
private onClientMessage;
send(request: ICRequest): Promise<ICResponse>;

53
dist/unit.js vendored
View File

@ -1,10 +1,14 @@
import { EventEmitter } from "events";
import { WebSocket } from "ws";
import debug from "debug";
const debugUnit = debug("ic:unit");
export class Unit extends EventEmitter {
endpoint;
port;
client;
pingTimeout;
pingTimer;
pingInterval = 60000;
constructor(endpoint, port = 6680) {
super();
this.endpoint = endpoint;
@ -16,42 +20,71 @@ export class Unit extends EventEmitter {
if (this.client) {
throw new Error("can't open a client that is already open");
}
debugUnit(`connecting to ws://${this.endpoint}:${this.port.toString()}`);
this.client = new WebSocket(`ws://${this.endpoint}:${this.port.toString()}`);
const { heartbeat, onClientMessage } = this;
this.client.on("error", console.error);
const { heartbeat, onClientMessage, socketCleanup } = this;
this.client.on("error", (evt) => {
// todo: emit event so we can reconnect? auto reconnect?
debugUnit("error in websocket: $o", evt);
socketCleanup();
});
this.client.on("open", heartbeat);
this.client.on("ping", heartbeat);
this.client.on("close", () => {
clearTimeout(this.pingTimeout);
this.client?.removeAllListeners();
this.client = undefined;
});
this.client.on("pong", heartbeat);
this.client.on("close", socketCleanup);
this.client.on("message", onClientMessage);
this.pingTimer = setInterval(() => {
debugUnit("sending ping");
this.client?.ping();
}, this.pingInterval);
await new Promise((resolve, reject) => {
this.client?.once("error", reject);
this.client?.once("open", resolve);
});
debugUnit("connected");
}
close() {
debugUnit("closing connection by request");
this.client?.close();
}
socketCleanup = () => {
debugUnit("socket cleanup");
this.client?.removeAllListeners();
this.client = undefined;
if (this.pingTimeout) {
clearTimeout(this.pingTimeout);
this.pingTimeout = undefined;
}
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = undefined;
}
};
heartbeat = () => {
debugUnit("received heartbeat");
clearTimeout(this.pingTimeout);
this.pingTimeout = setTimeout(() => {
this.close();
}, 30000 + 1000);
debugUnit("terminating connection due to heartbeat timeout");
this.client?.terminate();
this.socketCleanup();
}, this.pingInterval + 5000);
};
onClientMessage = (msg) => {
debugUnit("message received, length %d", msg.length);
const respObj = JSON.parse(msg.toString());
if (respObj.command.toLowerCase() === "notifylist") {
debugUnit(" it's a subscription confirmation or update");
this.emit(`notify`, respObj);
}
this.emit(`response-${respObj.messageID}`, respObj);
};
async send(request) {
this.client?.send(JSON.stringify(request));
const payload = JSON.stringify(request);
debugUnit("sending message of length %d with id %s", payload.length, request.messageID);
this.client?.send(payload);
return await new Promise((resolve) => {
this.once(`response-${request.messageID}`, (resp) => {
debugUnit(" returning response to message %s", request.messageID);
resolve(resp);
});
});

2
dist/unit.js.map vendored
View File

@ -1 +1 @@
{"version":3,"file":"unit.js","sourceRoot":"","sources":["../unit.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,YAAY,EAAE,MAAM,QAAQ,CAAC;AACtC,OAAO,EAAE,SAAS,EAAE,MAAM,IAAI,CAAC;AAI/B,MAAM,OAAO,IAAK,SAAQ,YAAY;IAK3B;IACA;IALD,MAAM,CAAa;IACnB,WAAW,CAAiC;IAEpD,YACS,QAAgB,EAChB,OAAO,IAAI;QAElB,KAAK,EAAE,CAAC;QAHD,aAAQ,GAAR,QAAQ,CAAQ;QAChB,SAAI,GAAJ,IAAI,CAAO;QAIlB,IAAI,CAAC,QAAQ,GAAG,QAAQ,CAAC;QACzB,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC;IACnB,CAAC;IAEM,KAAK,CAAC,OAAO;QAClB,IAAI,IAAI,CAAC,MAAM,EAAE,CAAC;YAChB,MAAM,IAAI,KAAK,CAAC,0CAA0C,CAAC,CAAC;QAC9D,CAAC;QAED,IAAI,CAAC,MAAM,GAAG,IAAI,SAAS,CACzB,QAAQ,IAAI,CAAC,QAAQ,IAAI,IAAI,CAAC,IAAI,CAAC,QAAQ,EAAE,EAAE,CAChD,CAAC;QAEF,MAAM,EAAE,SAAS,EAAE,eAAe,EAAE,GAAG,IAAI,CAAC;QAC5C,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,OAAO,CAAC,KAAK,CAAC,CAAC;QACvC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,MAAM,EAAE,SAAS,CAAC,CAAC;QAClC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,MAAM,EAAE,SAAS,CAAC,CAAC;QAClC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,GAAG,EAAE;YAC3B,YAAY,CAAC,IAAI,CAAC,WAAW,CAAC,CAAC;YAC/B,IAAI,CAAC,MAAM,EAAE,kBAAkB,EAAE,CAAC;YAClC,IAAI,CAAC,MAAM,GAAG,SAAS,CAAC;QAC1B,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,SAAS,EAAE,eAAe,CAAC,CAAC;QAE3C,MAAM,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;YACpC,IAAI,CAAC,MAAM,EAAE,IAAI,CAAC,OAAO,EAAE,MAAM,CAAC,CAAC;YACnC,IAAI,CAAC,MAAM,EAAE,IAAI,CAAC,MAAM,EAAE,OAAO,CAAC,CAAC;QACrC,CAAC,CAAC,CAAC;IACL,CAAC;IAEM,KAAK;QACV,IAAI,CAAC,MAAM,EAAE,KAAK,EAAE,CAAC;IACvB,CAAC;IAEO,SAAS,GAAG,GAAG,EAAE;QACvB,YAAY,CAAC,IAAI,CAAC,WAAW,CAAC,CAAC;QAE/B,IAAI,CAAC,WAAW,GAAG,UAAU,CAAC,GAAG,EAAE;YACjC,IAAI,CAAC,KAAK,EAAE,CAAC;QACf,CAAC,EAAE,KAAK,GAAG,IAAI,CAAC,CAAC;IACnB,CAAC,CAAC;IAEM,eAAe,GAAG,CAAC,GAAW,EAAE,EAAE;QACxC,MAAM,OAAO,GAAG,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,QAAQ,EAAE,CAAe,CAAC;QACzD,IAAI,OAAO,CAAC,OAAO,CAAC,WAAW,EAAE,KAAK,YAAY,EAAE,CAAC;YACnD,IAAI,CAAC,IAAI,CAAC,QAAQ,EAAE,OAAO,CAAC,CAAC;QAC/B,CAAC;QAED,IAAI,CAAC,IAAI,CAAC,YAAY,OAAO,CAAC,SAAS,EAAE,EAAE,OAAO,CAAC,CAAC;IACtD,CAAC,CAAC;IAEK,KAAK,CAAC,IAAI,CAAC,OAAkB;QAClC,IAAI,CAAC,MAAM,EAAE,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,OAAO,CAAC,CAAC,CAAC;QAC3C,OAAO,MAAM,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,EAAE;YACnC,IAAI,CAAC,IAAI,CAAC,YAAY,OAAO,CAAC,SAAS,EAAE,EAAE,CAAC,IAAgB,EAAE,EAAE;gBAC9D,OAAO,CAAC,IAAI,CAAC,CAAC;YAChB,CAAC,CAAC,CAAC;QACL,CAAC,CAAC,CAAC;IACL,CAAC;CACF"}
{"version":3,"file":"unit.js","sourceRoot":"","sources":["../unit.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,YAAY,EAAE,MAAM,QAAQ,CAAC;AACtC,OAAO,EAAE,SAAS,EAAE,MAAM,IAAI,CAAC;AAC/B,OAAO,KAAK,MAAM,OAAO,CAAC;AAK1B,MAAM,SAAS,GAAG,KAAK,CAAC,SAAS,CAAC,CAAC;AAEnC,MAAM,OAAO,IAAK,SAAQ,YAAY;IAO3B;IACA;IAPD,MAAM,CAAa;IACnB,WAAW,CAAiC;IAC5C,SAAS,CAAkC;IAC3C,YAAY,GAAG,KAAK,CAAC;IAE7B,YACS,QAAgB,EAChB,OAAO,IAAI;QAElB,KAAK,EAAE,CAAC;QAHD,aAAQ,GAAR,QAAQ,CAAQ;QAChB,SAAI,GAAJ,IAAI,CAAO;QAIlB,IAAI,CAAC,QAAQ,GAAG,QAAQ,CAAC;QACzB,IAAI,CAAC,IAAI,GAAG,IAAI,CAAC;IACnB,CAAC;IAEM,KAAK,CAAC,OAAO;QAClB,IAAI,IAAI,CAAC,MAAM,EAAE,CAAC;YAChB,MAAM,IAAI,KAAK,CAAC,0CAA0C,CAAC,CAAC;QAC9D,CAAC;QAED,SAAS,CAAC,sBAAsB,IAAI,CAAC,QAAQ,IAAI,IAAI,CAAC,IAAI,CAAC,QAAQ,EAAE,EAAE,CAAC,CAAC;QAEzE,IAAI,CAAC,MAAM,GAAG,IAAI,SAAS,CACzB,QAAQ,IAAI,CAAC,QAAQ,IAAI,IAAI,CAAC,IAAI,CAAC,QAAQ,EAAE,EAAE,CAChD,CAAC;QAEF,MAAM,EAAE,SAAS,EAAE,eAAe,EAAE,aAAa,EAAE,GAAG,IAAI,CAAC;QAC3D,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,CAAC,GAAG,EAAE,EAAE;YAC9B,wDAAwD;YACxD,SAAS,CAAC,wBAAwB,EAAE,GAAG,CAAC,CAAC;YACzC,aAAa,EAAE,CAAC;QAClB,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,MAAM,EAAE,SAAS,CAAC,CAAC;QAClC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,MAAM,EAAE,SAAS,CAAC,CAAC;QAClC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,MAAM,EAAE,SAAS,CAAC,CAAC;QAClC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,OAAO,EAAE,aAAa,CAAC,CAAC;QACvC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,SAAS,EAAE,eAAe,CAAC,CAAC;QAE3C,IAAI,CAAC,SAAS,GAAG,WAAW,CAAC,GAAG,EAAE;YAChC,SAAS,CAAC,cAAc,CAAC,CAAC;YAC1B,IAAI,CAAC,MAAM,EAAE,IAAI,EAAE,CAAC;QACtB,CAAC,EAAE,IAAI,CAAC,YAAY,CAAC,CAAC;QAEtB,MAAM,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,MAAM,EAAE,EAAE;YACpC,IAAI,CAAC,MAAM,EAAE,IAAI,CAAC,OAAO,EAAE,MAAM,CAAC,CAAC;YACnC,IAAI,CAAC,MAAM,EAAE,IAAI,CAAC,MAAM,EAAE,OAAO,CAAC,CAAC;QACrC,CAAC,CAAC,CAAC;QAEH,SAAS,CAAC,WAAW,CAAC,CAAC;IACzB,CAAC;IAEM,KAAK;QACV,SAAS,CAAC,+BAA+B,CAAC,CAAC;QAC3C,IAAI,CAAC,MAAM,EAAE,KAAK,EAAE,CAAC;IACvB,CAAC;IAEO,aAAa,GAAG,GAAG,EAAE;QAC3B,SAAS,CAAC,gBAAgB,CAAC,CAAC;QAE5B,IAAI,CAAC,MAAM,EAAE,kBAAkB,EAAE,CAAC;QAClC,IAAI,CAAC,MAAM,GAAG,SAAS,CAAC;QAExB,IAAI,IAAI,CAAC,WAAW,EAAE,CAAC;YACrB,YAAY,CAAC,IAAI,CAAC,WAAW,CAAC,CAAC;YAC/B,IAAI,CAAC,WAAW,GAAG,SAAS,CAAC;QAC/B,CAAC;QAED,IAAI,IAAI,CAAC,SAAS,EAAE,CAAC;YACnB,aAAa,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YAC9B,IAAI,CAAC,SAAS,GAAG,SAAS,CAAC;QAC7B,CAAC;IACH,CAAC,CAAC;IAEM,SAAS,GAAG,GAAG,EAAE;QACvB,SAAS,CAAC,oBAAoB,CAAC,CAAC;QAChC,YAAY,CAAC,IAAI,CAAC,WAAW,CAAC,CAAC;QAE/B,IAAI,CAAC,WAAW,GAAG,UAAU,CAAC,GAAG,EAAE;YACjC,SAAS,CAAC,iDAAiD,CAAC,CAAC;YAC7D,IAAI,CAAC,MAAM,EAAE,SAAS,EAAE,CAAC;YACzB,IAAI,CAAC,aAAa,EAAE,CAAC;QACvB,CAAC,EAAE,IAAI,CAAC,YAAY,GAAG,IAAI,CAAC,CAAC;IAC/B,CAAC,CAAC;IAEM,eAAe,GAAG,CAAC,GAAW,EAAE,EAAE;QACxC,SAAS,CAAC,6BAA6B,EAAE,GAAG,CAAC,MAAM,CAAC,CAAC;QAErD,MAAM,OAAO,GAAG,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,QAAQ,EAAE,CAAe,CAAC;QACzD,IAAI,OAAO,CAAC,OAAO,CAAC,WAAW,EAAE,KAAK,YAAY,EAAE,CAAC;YACnD,SAAS,CAAC,8CAA8C,CAAC,CAAC;YAC1D,IAAI,CAAC,IAAI,CAAC,QAAQ,EAAE,OAAO,CAAC,CAAC;QAC/B,CAAC;QAED,IAAI,CAAC,IAAI,CAAC,YAAY,OAAO,CAAC,SAAS,EAAE,EAAE,OAAO,CAAC,CAAC;IACtD,CAAC,CAAC;IAEK,KAAK,CAAC,IAAI,CAAC,OAAkB;QAClC,MAAM,OAAO,GAAG,IAAI,CAAC,SAAS,CAAC,OAAO,CAAC,CAAC;QACxC,SAAS,CACP,yCAAyC,EACzC,OAAO,CAAC,MAAM,EACd,OAAO,CAAC,SAAS,CAClB,CAAC;QAEF,IAAI,CAAC,MAAM,EAAE,IAAI,CAAC,OAAO,CAAC,CAAC;QAC3B,OAAO,MAAM,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,EAAE;YACnC,IAAI,CAAC,IAAI,CAAC,YAAY,OAAO,CAAC,SAAS,EAAE,EAAE,CAAC,IAAgB,EAAE,EAAE;gBAC9D,SAAS,CAAC,oCAAoC,EAAE,OAAO,CAAC,SAAS,CAAC,CAAC;gBACnE,OAAO,CAAC,IAAI,CAAC,CAAC;YAChB,CAAC,CAAC,CAAC;QACL,CAAC,CAAC,CAAC;IACL,CAAC;CACF"}

68
unit.ts
View File

@ -1,11 +1,17 @@
import { EventEmitter } from "events";
import { WebSocket } from "ws";
import debug from "debug";
import { ICRequest } from "./messages/request.js";
import { ICResponse } from "./messages/response.js";
const debugUnit = debug("ic:unit");
export class Unit extends EventEmitter {
private client?: WebSocket;
private pingTimeout?: ReturnType<typeof setTimeout>;
private pingTimer?: ReturnType<typeof setInterval>;
private pingInterval = 60000;
constructor(
public endpoint: string,
@ -22,42 +28,76 @@ export class Unit extends EventEmitter {
throw new Error("can't open a client that is already open");
}
debugUnit(`connecting to ws://${this.endpoint}:${this.port.toString()}`);
this.client = new WebSocket(
`ws://${this.endpoint}:${this.port.toString()}`,
);
const { heartbeat, onClientMessage } = this;
this.client.on("error", console.error);
const { heartbeat, onClientMessage, socketCleanup } = this;
this.client.on("error", (evt) => {
// todo: emit event so we can reconnect? auto reconnect?
debugUnit("error in websocket: $o", evt);
socketCleanup();
});
this.client.on("open", heartbeat);
this.client.on("ping", heartbeat);
this.client.on("close", () => {
clearTimeout(this.pingTimeout);
this.client?.removeAllListeners();
this.client = undefined;
});
this.client.on("pong", heartbeat);
this.client.on("close", socketCleanup);
this.client.on("message", onClientMessage);
this.pingTimer = setInterval(() => {
debugUnit("sending ping");
this.client?.ping();
}, this.pingInterval);
await new Promise((resolve, reject) => {
this.client?.once("error", reject);
this.client?.once("open", resolve);
});
debugUnit("connected");
}
public close() {
debugUnit("closing connection by request");
this.client?.close();
}
private socketCleanup = () => {
debugUnit("socket cleanup");
this.client?.removeAllListeners();
this.client = undefined;
if (this.pingTimeout) {
clearTimeout(this.pingTimeout);
this.pingTimeout = undefined;
}
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = undefined;
}
};
private heartbeat = () => {
debugUnit("received heartbeat");
clearTimeout(this.pingTimeout);
this.pingTimeout = setTimeout(() => {
this.close();
}, 30000 + 1000);
debugUnit("terminating connection due to heartbeat timeout");
this.client?.terminate();
this.socketCleanup();
}, this.pingInterval + 5000);
};
private onClientMessage = (msg: Buffer) => {
debugUnit("message received, length %d", msg.length);
const respObj = JSON.parse(msg.toString()) as ICResponse;
if (respObj.command.toLowerCase() === "notifylist") {
debugUnit(" it's a subscription confirmation or update");
this.emit(`notify`, respObj);
}
@ -65,9 +105,17 @@ export class Unit extends EventEmitter {
};
public async send(request: ICRequest): Promise<ICResponse> {
this.client?.send(JSON.stringify(request));
const payload = JSON.stringify(request);
debugUnit(
"sending message of length %d with id %s",
payload.length,
request.messageID,
);
this.client?.send(payload);
return await new Promise((resolve) => {
this.once(`response-${request.messageID}`, (resp: ICResponse) => {
debugUnit(" returning response to message %s", request.messageID);
resolve(resp);
});
});