La oss forestille oss at du har bestemt deg for å benytte deg av WebSockets i din nye fete web applikasjon. Du begynner kanskje først med å bare bruke det innebygde WebSocket APIet som følger med i nettleseren. Du finner etterhvert ut at det hadde vært kjekt med automatisk gjenoppkobling og en eller annen form for keep-alive mellom klient og server. La oss også anta at du har valg TypeScript til frontenden din av en eller annen grunn. SockJS som mange bruker virker litt vel omfattende og passer kanskje ikke med din valgte teknologistakk på serveren. Ditt første instinkt er jo da kanskje å sjekke på npm om det finnes et bibliotek som passer. Du finner selvfølgelig mye greier. Men ingen av de du finner virker overbevisende nok. Det er kanskje ikke rasjonelt, men du bestemmer deg for å lage det selv. Da har du full kontroll, kan få det akkurat som du selv vil og du risikerer å lære endel underveis.
Funksjonell programmering er jo noe mange snakker om. Passer det til noe som er så fullt av sideeffekter som håndtering av WebSockets? La oss prøve, så kan du jo vurdere etterpå om du syntes det var verdt innsatsen.
¶Oppsett
npx tsdx create ts-ws-diy
basic
Voila så har du et TypeScript prosjekt oppe, med masse snurrepipperier, ferdigkonfigurert for testing, bygg, linting osv. Personlig er jeg ikke så veldig glad i sånne boilerplate generatorer med masse skjult config, men det lar oss komme kjapt i gang.
Siden vi skal ha det artig med funksjonelle typer så installerer vi også det kjekke biblioteket unionize
npm i -S unionize
¶En myk start
Vi begynner med å implementere tilstandsmaskinen i diagrammet over. Det er ikke det endelige målet, men det er greit å begynne litt mykt når vi skal prøve oss frem.
Lag en ny typescript fil src/statemachine.ts
import unionize, { ofType } from "unionize";
// Definerer de forskjellige tilstandene vi har
export const States = unionize({
INITIAL: ofType<{}>(),
CONNECTING: ofType<{}>(),
OPENED: ofType<{}>(),
CLOSED: ofType<{}>(),
});
// Lager en typescript type av tilstandsdefinisjonen vår over
export type State = typeof States._Union;
// Dette er meldingene/hendelsene som kan forårsake en tilstandstransisjon
export const Events = unionize({
ON_CONNECT: {},
ON_OPEN: {},
ON_CLOSE: {}
});
export type Event = typeof Events._Union;
Dette var jo sikkert flott, men det blir jo ikke rare tilstandsmaskinen dersom vi ikke har en funksjon for å foreta en transisjon fra en tilstand til en neste. Signaturen på denne funksjonen definerer vi slik:
const TransitionFn = (evt: Event, state: State) => State;
En artig liten kuriositet er at denne funksjonen minner ganske mye på update funksjonen i Elm:
update : Msg -> Model -> Model
update msg model =
En sammenligning som kanskje er mer nærliggende dersom man kommer fra JavaScript/TypeScript miljøet er reducers i redux
(state: State, action: Action) => State
Fellestrekket for alle 3 er at de er rene funksjoner (pure functions). De er (/skal være) funksjoner som ikke har sideeffekter, inputparametere skal/kan ikke endres av funksjonen og returverdien er utelukkende en funksjon av inputparameterene. Slike funksjoner er enkle å teste og lettere å forstå enn funksjoner som muterer eller har andre skumle sideeffekter. Ok, det får være nok salgspitch om funksjonelle prinsipper. La oss lage transisjonsfunksjonen.
export const transition = (evt: Event, state: State): State => States.match(state, {
INITIAL: () => Events.match(evt, {
ON_CONNECT: () => States.CONNECTING(),
default: () => state
}),
CONNECTING: () => Events.match(evt, {
ON_OPEN: () => States.OPENED(),
ON_CLOSE: () => States.CLOSED(),
default: () => state
}),
OPENED: () => Events.match(evt, {
ON_CLOSE: () => States.CLOSED(),
default: () => state
}),
CLOSED: () => state
});
Unionize
legger på en hendig hjelpefunksjon match
som lar oss skrive switch
ekvivalent kode på en mer elegant måte.
Dersom man lar vær å bruke default
vil typescript kompilatoren også sørge for å si fra dersom du ikke har dekket alle tilfellene.
I koden over så er det bare de transisjonen som er gyldige som fører til tilstandsendring, alle andre permutasjoner vil returnere tilstanden uendret.
Det er ikke gitt at du ønsker å ignorere ugyldige kombinasjoner på denne måten i din implementasjon.
Sideeffekter
Til nå har ikke transisjonsfunksjonen vår tatt innover seg at vi ønsker jo at det skal skje noe som resultat av at tilstanden endrer seg.
Når man skifter fra INITIAL
til CONNECTING
så ønsker vi jo at “noen” faktisk initierer en websocket-forbindelse.
Det å starte en websocket-forbindelse er en sideeffekt, så vi kan jo ikke gjøre det i transisjonsfunksjonen så hvordan får vi gitt beskjed?
En (ganske lur) måte å gjøre det på er jo å returnere bestillinger på sideeffekter vi ønsker utført fra transisjonsfunksjonen.
For at transisjonsfunksjonen fortsatt skal være ren, så kan vi returnere disse bestillingene som data.
export const Effects = unionize( {
CONNECT_WS: {},
});
export type Effect = typeof Effects._Union;
Vi definerer en ny datatype for å beskrive de forskjellige effektene vi ser for oss å bestille. Enn så lenge trenger vi bare å bestille opprettelse av en websocket-tilkobling, men det blir flere etterhvert!
Det neste vi må gjøre er å endre litt på transisjonsfunksjonen vår.
export const transition = (evt: Event, state: State): [State, Effect[]] => States.match(state, {
INITIAL: () => Events.match(evt, {
ON_CONNECT: () => [States.CONNECTING(), [Effects.CONNECT_WS()]],
default: () => [state, []]
}),
CONNECTING: () => Events.match(evt, {
ON_OPEN: () => [States.OPENED(), []],
ON_CLOSE: () => [States.CLOSED(), []],
default: () => [state, []]
}),
OPENED: () => Events.match(evt, {
ON_CLOSE: () => [States.CLOSED(), []],
default: () => [state, []]
}),
CLOSED: () => [state, []]
});
Vi har endret signaturen på funksjonen til å returnere en tuple av State
og en liste med Effect
.
Siden vi endret på signaturen måtte vi også endre på alle permutasjonene. Den eneste som returnerer en faktisk
effekt-bestilling så langt er ON_CONNECT
. Nå vet kallende funksjon at det ønskes utført en sideeffekt som følge av
denne transisjonen. Det som er najs, er at funksjonen fortsatt er sideeffektfri og lett å teste.
Man kan jo spørre seg; Har vi egentlig oppnådd noe annet enn å modellere noe som ligner på readyState som vi allerede har på klassen
WebSocket
i nettleseren?
Nja nei egentlig ikke, men vi har laget fundamentet for å modellere gjenoppkobling og pulskontroll.
¶Gjenoppkobling og pulskontroll
Oisann, det ble jo litt mer komplisert. Den nye tilstanden RECONNECTING
var ikke så overraskende, men noen har tatt seg kreative friheter
i modelleringen av OPENED
tilstanden. Det blir forhåpentligvis klarere når vi kommer til kapitlet om pulskontroll.
Gjenoppkobling
Det kan være mange grunner til at en websocket kan miste forbindelsen sin til server. Kanskje mistet klienten nettforbindelse, eller kanskje rullet man ut en ny versjon av applikasjonen på serveren(e). Det er digg om vi kunne automatisert det å forsøke gjenoppkobling på en generisk måte. Vi gjør et forsøk på å legge til rette for det.
Tilstander
export type Context = {
reconnectAttempt: number,
}
export const States = unionize({
INITIAL: ofType<Context>(),
CONNECTING: ofType<Context>(),
OPENED: ofType<Context>(),
CLOSED: ofType<Context>(),
RECONNECTING: ofType<Context>(),
});
Vi ønsker å holde orden på hvor mange ganger man har forsøkt å gjenoppkoble slik at vi kan implementere f.eks en eksponensiell “backoff”.
For å ta vare på denne på tvers av alle tilstandene, lager vi en Context
type som holder på antall forsøk og så beriker vi alle tilstandene
med denne typen. Vi har også lagt til en ny tilstand RECONNECTING
for å representere tilstanden hvor vi venter før vi prøver å koble opp på nytt igjen.
Hendelser
export const Events = unionize({
ON_CONNECT: {},
ON_OPEN: {},
ON_CLOSE: {},
ON_RECONNECT: {}, // ny
});
Vi legger til en ny hendelse for å representere overgangen fra tilstand CLOSED
til tilstand RECONNECTING
.
Effekter
// Tar her høyde for at vi kan komme til å ønske flere forskjellige bestilte timeouts
// Akkurat nå støtter vi bare en timeout for å vente før vi gjør en ny oppkobling
export type TimeoutKey = "connect";
export const Effects = unionize({
CONNECT_WS: {},
SCHEDULE_TIMEOUT: ofType<{
key: TimeoutKey; // En identifikator for timeout som effekthåndteringen kan bruke for å holde orden på timeout effekter
timeoutMillis: number; // Hvor lenge ønsker vi å vente før en hendelse skal effektueres
onTimeout: Event; // Hvilken hendelse vi ønsker effektuert
}>()
});
Vi ser for oss at før man forsøker å koble opp på nytt igjen, så ønsker vi å kunne vente et gitt antall millisekunder før vi foretar et nytt oppkoblingsforsøk. Siden vi ser for oss at timeouts er noe vi kan ha bruk for i andre sammenhenger også (f.eks puls), definerer vi timeout effekten litt mer generisk her.
Transisjoner
export const transition = (evt: Event, state: State): [State, Effect[]] => States.match(state, {
INITIAL: () => Events.match(evt, {
ON_CONNECT: () => [States.CONNECTING(state), [Effects.CONNECT_WS()]],
default: () => [state, []]
}),
CONNECTING: () => Events.match(evt, {
// Endring: Dersom websocket er åpnet nullstiller vi antall gjenoppkoblingsforsøk
ON_OPEN: () => [States.OPENED({...state, reconnectAttempt: 0}),[]],
ON_CLOSE: () => [States.CLOSED(state), []],
default: () => [state, []]
}),
OPENED: () => Events.match(evt, {
ON_CLOSE: () => [States.CLOSED(state), []],
default: () => [state, []]
}),
CLOSED: () => Events.match(evt, {
// Ny: Vi bestiller en timeout effekt her, når den utløper så utløses hendelsen ON_CONNECT
ON_RECONNECT: () => [
States.RECONNECTING({...state, reconnectAttempt: state.reconnectAttempt + 1}),
[Effects.SCHEDULE_TIMEOUT({
key: "connect",
timeoutMillis: 1000 * state.reconnectAttempt, // lineær backoff enn så lenge
onTimeout: Events.ON_CONNECT()
})]
],
default: () => [state, []]
}),
// Ny: Vi har lagt til en ny tilstand og må håndtere at det skal være lov å gå fra denne
// tilstanden til CONNECTING
RECONNECTING: () => Events.match(evt, {
ON_CONNECT: () => [States.CONNECTING(state), [Effects.CONNECT_WS()]],
default: () => [state, []]
})
});
Med disse endringene så har vi fått på plass byggestenene for en tilstandsmaskin som støtter automatisk gjenoppkobling. Vi skulle gjerne forbedret algoritmen for hvor lenge man skal vente før vært gjenoppkoblingsforsøk, men det får vi komme tilbake til litt senere.
Pulskontroll
Det er veldig kjekt for klienten å vite at serveren er i live og tilsvarende er det kjekt for serveren å vite at klienten er i live. I mange skymiljøer er det ofte automatisk nedkobling dersom det ikke er aktivitet mellom klient og server. Det kan være en smule irriterende å håndtere.
Støtte for ping/pong frames er jo en del av websocket standarden, men det må da initieres fra server og du er avhengig av at serveren din støtter å lage “ping frames”. Dersom du kan det og du er trygg på at alle nettlesere du trenger å støtte har innebygd automatisk pong svar på ping frames, så kan du skippe denne delen.
Hva med tcp keepalive: Gammel stackoverflow
Ok vi er skeptiske og kjører på med vår egen ping/pong implementasjon som initieres fra klienten.
Tilstander
export type Context = {
reconnectAttempt: number,
pingTimeoutMillis: number,
pongTimeoutMillis: number
}
Vi er litt late og lar være å modellere ping og pong som egne tilstander. Man kunne kanskje argumentert for at
at ping/pong håndtering burde vært modellert som en sub-tilstandsmaskin, men det dropper vi i denne omgang.
Vi legger til timeoutverdier for ping og pong i Context
slik at det skal bli konsistent med håndtering av timeouts for gjenoppkobling.
(Vi kunne selvfølgelig latt effekthåndteringen få inn disse verdiene, eller vi kunne hardkodet dem i transisjonsfunksjonen).
Hendelser
export const Events = unionize({
ON_CONNECT: {},
ON_OPEN: {},
ON_CLOSE: {},
ON_RECONNECT: {},
ON_HEARTBEAT: {}, // Utløses når vi har fått et pong svar eller en hvilken som helst melding fra serveren, etter vi har sendt en ping melding
ON_PING_TIMEOUT: {}, // Utløses når timeout for å vente på å sende en ping melding har utløpt
ON_PONG_TIMEOUT: {}, // Utløses når maks tid vi ønsker vente på svar på en utestående ping har utløpt
});
Effekter
// Legger til ping og pong som nøkler
export type TimeoutKey = "connect" | "ping" | "pong";
export const Effects = unionize({
CONNECT_WS: {},
SCHEDULE_TIMEOUT: ofType<{
key: TimeoutKey;
timeoutMillis: number;
onTimeout: Event;
}>(),
// Effekt for å avbryte en tidligere bestilt timeout-effekt
CLEAR_TIMEOUT: ofType<{ key: TimeoutKey }>(),
// Effekt for å sende en ping melding til serveren
SEND_PING: ofType<{}>(),
// Effekt for å lukke en åpen websocket forbindelse
CLOSE_WS: {},
});
Transisjoner
export const transition = (evt: Event, state: State): [State, Effect[]] => States.match(state, {
INITIAL: () => Events.match(evt, {
ON_CONNECT: () => [States.CONNECTING(state), [Effects.CONNECT_WS()]],
default: () => [state, []]
}),
CONNECTING: () => Events.match(evt, {
ON_OPEN: () => [
States.OPENED({...state, reconnectAttempt: 0}),
// Vi legger til en timeout-effekt for å initiere pulskontroll
[Effects.SCHEDULE_TIMEOUT({
key: "ping",
timeoutMillis: state.pingTimeoutMillis,
onTimeout: Events.ON_PING_TIMEOUT(),
})]
],
ON_CLOSE: () => [States.CLOSED(state), []],
default: () => [state, []]
}),
OPENED: () => Events.match(evt, {
// Vi sender en ping melding til server etter utløpt ping timeout
// I tillegg ber vi om en timeout effekt for å vente på en pong i retur
ON_PING_TIMEOUT: () => [state, [
Effects.SEND_PING(),
Effects.SCHEDULE_TIMEOUT({
key: "pong",
timeoutMillis: state.pongTimeoutMillis,
onTimeout: Events.ON_PONG_TIMEOUT(),
})
]],
// Dersom vi ikke har fått svar fra server i tide, ber vi om å lukke underliggende websocket
ON_PONG_TIMEOUT: () => [state, [Effects.CLOSE_WS()]],
// Ved vellykket pulssjekk, nullstiller vi pong og ber om en ny ping timeout
ON_HEARTBEAT: () => [
state,
[
Effects.CLEAR_TIMEOUT({ key: "pong" }),
Effects.SCHEDULE_TIMEOUT({
key: "ping",
timeoutMillis: state.pingTimeoutMillis,
onTimeout: Events.ON_PING_TIMEOUT(),
})
]
],
ON_CLOSE: () => [
States.CLOSED(state),
// Ved lukking av en åpen forbindelse sørger vi for å rydde opp timeouts
[
Effects.CLEAR_TIMEOUT({ key: "ping" }),
Effects.CLEAR_TIMEOUT({ key: "pong" })
]
],
default: () => [state, []]
}),
CLOSED: () => Events.match(evt, {
ON_RECONNECT: () => [
States.RECONNECTING({...state, reconnectAttempt: state.reconnectAttempt + 1}),
[Effects.SCHEDULE_TIMEOUT({
key: "connect",
timeoutMillis: 1000 * (state.reconnectAttempt + 1),
onTimeout: Events.ON_CONNECT()
})]
],
default: () => [state, []]
}),
RECONNECTING: () => Events.match(evt, {
ON_CONNECT: () => [
States.CONNECTING(state), [Effects.CONNECT_WS()]
],
default: () => [state, []]
})
});
¶Testing
Vi burde kanskje ha begynt med testing en god del tidligere, men vi kan jo late som om vi har gjort dette på en TDD-lignende måte.
For å teste transisjonfunksjonen vår, lager vi en statemachine.test.ts
:
import { Context, Effects, Events, States, transition } from "../src/statemachine";
describe("verify state machine transitions", () => {
// Gi litt kortere navn på hendelser
const [
connect,
// ... droppet resten for å være kortfattet
] = [
Events.ON_CONNECT(),
// ... droppet resten for å være kortfattet
];
// States
const initialContext: Context = {
reconnectAttempt: 0,
pingTimeoutMillis: 1,
pongTimeoutMillis: 1
};
const initial = States.INITIAL(initialContext);
const connecting = States.CONNECTING(initialContext);
// ... droppet resten for å være kortfattet
// Effects
const fCon = Effects.CONNECT_WS();
// ... droppet resten for å være kortfattet
// Brukbar dekning av permutasjoner
test.each`
sourceState | action | targetState | expectedEffects
${initial} | ${connect} | ${connecting} | ${[fCon]}
${initial} | ${open} | ${initial} | ${[]}
${initial} | ${heartbeat} | ${initial} | ${[]}
${initial} | ${pingtimeout} | ${initial} | ${[]}
${initial} | ${pongtimeout} | ${initial} | ${[]}
${initial} | ${close} | ${initial} | ${[]}
${initial} | ${reconnect} | ${initial} | ${[]}
${connecting} | ${open} | ${opened} | ${[fPingTO]}
${connecting} | ${close} | ${closed} | ${[]}
${connecting} | ${connect} | ${connecting} | ${[]}
${connecting} | ${heartbeat} | ${connecting} | ${[]}
${connecting} | ${pingtimeout} | ${connecting} | ${[]}
${connecting} | ${pongtimeout} | ${connecting} | ${[]}
${connecting} | ${reconnect} | ${connecting} | ${[]}
${opened} | ${pingtimeout} | ${opened} | ${[fSendPing, fPongTO]}
${opened} | ${pongtimeout} | ${opened} | ${[fClose]}
${opened} | ${heartbeat} | ${opened} | ${[fClearPong, fPingTO]}
${opened} | ${close} | ${closed} | ${[fClearPing, fClearPong]}
${opened} | ${connect} | ${opened} | ${[]}
${opened} | ${open} | ${opened} | ${[]}
${opened} | ${reconnect} | ${opened} | ${[]}
${closed} | ${reconnect} | ${reconnecting} | ${[fConTO]}
${closed} | ${connect} | ${closed} | ${[]}
${closed} | ${open} | ${closed} | ${[]}
${closed} | ${heartbeat} | ${closed} | ${[]}
${closed} | ${pingtimeout} | ${closed} | ${[]}
${closed} | ${pongtimeout} | ${closed} | ${[]}
${closed} | ${close} | ${closed} | ${[]}
${reconnecting} | ${connect} | ${connecting1} | ${[fCon]}
${reconnecting} | ${reconnect} | ${reconnecting} | ${[]}
${reconnecting} | ${open} | ${reconnecting} | ${[]}
${reconnecting} | ${heartbeat} | ${reconnecting} | ${[]}
${reconnecting} | ${pingtimeout} | ${reconnecting} | ${[]}
${reconnecting} | ${pongtimeout} | ${reconnecting} | ${[]}
${reconnecting} | ${close} | ${reconnecting} | ${[]}
`(
"update with $sourceState gives $targetState and $expectedEffects",
({ sourceState, action, targetState, expectedEffects }) => {
const [nextState, effects] = transition(action, sourceState);
expect(nextState).toStrictEqual(targetState);
expect(effects).toStrictEqual(expectedEffects);
},
);
});
Det er litt jobb å sette opp input-parametere og forventede output-verdier. For at ikke permutasjonstabellen skal bli for bred og vanskelig å lese, valgte vi å lage korte navn. Vi måtte også gjøre litt ekstra arbeide med å definere effekter på forhånd. I sum ble det kanskje ikke så ille og det er relativt lett å skanne over tabellen for å se at vi har dekket alle transisjonene.
¶Klient-API
Før vi går løs på effekthåndteringen la oss først prøve å lage et grensesnitt for websocket-wrapperen vår.
Vi lager en fil index.ts
som blir innfallsporten til biblioteket vårt.
import {
Context,
Effect,
Effects,
States,
Event,
TimeoutKey,
transition,
Events,
State
} from "./statemachine";
// Vi ønsker oss en konfigurasjonsparameter slik at klienten kan styre endel ting selv
// (Vi legger til to felter fra Context typen vår vha Pick typen i typescript og skjøter sammen med &)
export type Config = Pick<Context, "pingTimeoutMillis" | "pongTimeoutMillis"> & {
url: string,
// Callback for faktiske meldinger fra websocket-server
// (pong-meldinger vil ikke trigge denne, da dette er et internt anliggende for wrapperen)
onMessage: (msg: MessageEvent) => void,
// Kan være hendig for klienten å få en callback ved tilstandsendringer
onStateChange?: (previous: State, current: State) => void,
}
// Når man oppretter wrapperen vår får man et objekt med følgende signature i retur
export type WSMachine = {
// Kobler til faktisk websocket og starter tilstandsmaskinen
connect: () => void;
// Kjekt å kunne sende meldinger til server!
send: (data: string | ArrayBufferLike | Blob | ArrayBufferView) => void;
// Hjelpefunksjon dersom klienten skulle ønske å spørre hva tilstanden er nå
currentState: () => State;
// Kobler ned websocket og stopper tilstandsmaskinen
disconnect: () => void;
};
// Skall-implementasjon for å opprette en WSMachine
export const wsMachine = (config: Config): WSMachine => {
// TODO: Effekthåndtering
// TODO: Implementere API for WSMachine
return {
connect: () => { return },
disconnect: () => { return },
currentState: () => throw Error("Not implemented yet"),
send: (data: string | ArrayBufferLike | Blob | ArrayBufferView) => {return}
};
};
Effekthåndtering
// Type alias for returtypen til setTimeout funksjonen
type TTimeout = ReturnType<typeof setTimeout>;
export const wsMachine = (config: Config): WSMachine => {
// Faktisk websocket
let ws: WebSocket | undefined;
// Initiell tilstand for tilstandsmaskin
let wsState = States.INITIAL({...config, reconnectAttempt: 0});
// Et map som hjelper oss å holde orden på timeouts
const timeouts: Map<TimeoutKey, TTimeout> = new Map();
// Hjelpefunksjon for å stoppe en timeout effekt
const nukeTimeout = (k: TimeoutKey) => {
const t = timeouts.get(k);
if (t) {
clearTimeout(t);
}
};
// Når en websocket rapporterer at den er åpen
const onOpen = () => handleTransition(Events.ON_OPEN());
// Når vi får en melding fra websocket server
const onMessage = (ev: MessageEvent) => {
const msg = ev.data;
// Vi ønsker ikke å plage klienten med pong meldinger
if (msg !== "pong") {
config.onMessage(ev);
}
// Uansett om det er pong eller en annen melding
// Vi har fått puls fra server og er fornøyde
handleTransition(Events.ON_HEARTBEAT());
};
// Når vi får beskjed om at WebSocket har blitt stengt
// Uavhengig av grunn, vi gir oss ikke... og prøver å koble til på nytt!
const onClose = () => {
handleTransition(Events.ON_CLOSE());
handleTransition(Events.ON_RECONNECT());
};
// Hjelpefunksjon for å håndtere hendelser som utløses av faktisk WebSocket
const addEventListeners = () => {
ws?.addEventListener("open", onOpen);
ws?.addEventListener("close", onClose);
ws?.addEventListener("message", onMessage);
};
const handleEffect = (effect: Effect) => {
// TODO: Realisere en effekt
}
const handleTransition = (event: Event) => {
// TODO: Håndtere en tilstandstransisjon og dens bestilte effekter
}
Ok da har vi gjort unna mye av oppsettet vi trenger. La oss se på hvordan vi skal håndtere effektene.
const handleEffect = (effect: Effect) => {
Effects.match(effect, {
// Vi oppretter en WebSocket og legger til håndtering av hendelser på denne
CONNECT_WS: () => {
ws = new WebSocket(config.url);
addEventListeners();
},
// Lukke websocket er jo bare å delegere til underliggende WebSocket
CLOSE_WS: () => {
ws?.close();
},
// Vi oppretter en timout, og legger den til i mappet vårt
// Vi må nesten ta vare på de ett sted for å kunne stoppe de, når vi trenger det
SCHEDULE_TIMEOUT: (t) => {
timeouts.set(
t.key,
setTimeout(() => handleTransition(t.onTimeout), t.timeoutMillis),
);
},
// Ping håndtering er jo bare å sende en melding til websocket server
// Hva om du ønsker noe annet enn "ping"?
// Det kan man f.eks løse ved å legge som en parameter på Config
SEND_PING: () => {
if (ws) {
ws.send("ping");
}
},
// Vi kaller hjelpefunksjonen vår få å stoppe en timeout fra å utløpe
CLEAR_TIMEOUT: (t) => nukeTimeout(t.key)
})
}
Deretter må vi håndtere transisjonene og effektene som er bestilt.
const handleTransition = (event: Event) => {
const [newState, effects] = transition(event, wsState);
// Dersom klient har angitt at man ønsker å få beskjed om tilstandsendringer
if (config.onStateChange) {
// Denne kunne med fordel vært litt smartere.
// Man burde sjekket om tilstandsnavnet faktisk har endret seg
// Slik det er nå får man OPEN -> OPEN for pulskontroll hendelser...
config.onStateChange(wsState, newState);
}
wsState = newState;
// Håndterer evt. bestilte effekter
effects.forEach((e) => {
handleEffect(e);
});
}
Den siste TODO’en vi må ta tak i da er returobjektet som lar klienter stoppe og starte tilstandsmaskinen.
return {
connect: () => handleTransition(Events.ON_CONNECT()),
disconnect: () => {
// nullstill tilstandsmaskinen
wsState = States.INITIAL(wsState);
// Rydd opp evt gjenlevende timeouts
nukeTimeout("connect");
nukeTimeout("ping");
nukeTimeout("pong");
// Lukk WebSocket og fjern funksjoner som lytter på hendelser på denne
if (ws) {
ws.removeEventListener("open", onOpen);
ws.removeEventListener("close", onClose);
ws.removeEventListener("message", onMessage);
ws.close();
}
},
currentState: () => wsState,
send: (data: string | ArrayBufferLike | Blob | ArrayBufferView) => {
if (States.is.OPENED(wsState) && ws instanceof WebSocket) {
ws.send(data);
// Dette er kanskje ikke så fryktelig elegant
} else {
const errorMsg = `Can't send message when websocket connection isn't in an open state. State is: ${
wsState.tag
}`;
throw Error(errorMsg);
}
}
};
Vi klatta på en send
funksjon. Men dette kunne man kanskje ønsket å håndtere mer elegant. Kanskje vi kunne ha modellert
dette smartere? Det kunne kanskje også være ønskelig i endel tilfeller å bare bufre opp meldinger i påvente av at man
får forbindelse igjen. Hva tenker du?
Testing av effekter
For å lage tester av effekthåndteringen vår benytter vi oss av et bibliotek for å mocke websockets.
npm install --save-dev jest-websocket-mock mock-socket
La oss lage en index.test.ts
import WS from "jest-websocket-mock";
import { WSMachine, wsMachine } from "../src";
describe("verify wsMachine interactions", () => {
let server: WS | undefined;
let machine: WSMachine;
let messages: string[] = [];
let stateChanges: string[] = [];
beforeEach(async () => {
messages = [];
stateChanges = [];
// Lager en ny mock WebSocket server
server = new WS("ws://localhost:1234");
// Faker "pong" meldinger hver gang server mottar en melding
server.on("message", (ws) => {
ws.send("pong");
});
// Sett opp vår WSMachine
machine = wsMachine({
url: "ws://localhost:1234",
pingTimeoutMillis: 10,
pongTimeoutMillis: 10,
onMessage(msg) {
messages.push(msg.data);
},
onStateChange(prev, curr) {
stateChanges.push(`${prev.tag}->${curr.tag}`);
}
});
machine.connect();
});
afterEach(() => {
// Rydd opp etter oss for hver test
if (machine) { machine.disconnect(); }
WS.clean();
});
test("connect to machine and verify ping", async () => {
// Mock server har hendige promises vi kan vente på
// Her venter vi på at klienten vår har koblet til
await server?.connected;
// vi venter til ping melding har blitt mottatt av server
await expect(server).toReceiveMessage("ping");
expect(server).toHaveReceivedMessages(["ping"]);
expect(messages).toEqual([]);
// Her sjekker vi at vi har fått de transisjonene vi forventer frem til nå
expect(stateChanges).toEqual([
"INITIAL->CONNECTING",
"CONNECTING->OPENED",
"OPENED->OPENED",
]);
});
test("connect to machine and verify message received", async () => {
await server?.connected;
server?.send("test");
expect(messages).toEqual(["test"]);
});
test("server close triggers reconnect attempt(s)", async () => {
// Vi venter til vi har koblet til
// Så lukker vi forbindelsen fra serveren
await server?.connected;
server?.close();
await server?.closed;
// Her sjekker vi at vi automatisk prøver å kobler til igjen
expect(stateChanges).toEqual([
"INITIAL->CONNECTING",
"CONNECTING->OPENED",
"OPENED->CLOSED",
"CLOSED->RECONNECTING"
]);
});
});
test("no pong triggers reconnect", async () => {
// I denne testen setter vi ikke opp noe automatisk retur av pong meldinger
// I følge pulskontroll logikken vår skal dette føre til gjenoppkobling
const server = new WS("ws://localhost:12345");
const stateChanges: string[] = [];
const machine = wsMachine({
url: "ws://localhost:12345",
pingTimeoutMillis: 5,
pongTimeoutMillis: 5,
onMessage: () => {
return;
},
onStateChange(prev, curr) {
stateChanges.push(`${prev.tag}->${curr.tag}`);
}
});
machine.connect();
await server.connected;
// Vi forventer her at tilstandsmaskinen skal lukke forbindelsen
// Siden vi ikke mottar en pong etter vi har sendt ping!
await server.closed;
expect(stateChanges).toStrictEqual([
"INITIAL->CONNECTING",
"CONNECTING->OPENED",
"OPENED->OPENED", // ping timeout event
"OPENED->OPENED", // pong timeout event
// Disse to trigges av onClose
"OPENED->CLOSED",
"CLOSED->RECONNECTING"
]);
machine.disconnect();
WS.clean();
});
Dette er ikke en utfyllende test av alle mulige tilfeller, men det er nok til å illustrere hvordan man kan integrasjonsteste. Det viser også hvor mye kjipere det er å teste funksjoner med sideeffekter.
¶Backoff ved gjenoppkobling
Se for deg at du har brukt vårt flotte WebSocket bibliotek. Det er en kjempesuksess og du har sykt mange samtidige klienter. Så tar du ned server(ene) dine for en oppgradering. Med vår naive logikk for gjenoppkobling vil du få en hærskare av klienter som prøver å koble seg opp samtidig. De prøver med større og større mellomrom mellom hver gang de prøver, men fortsatt omtrent samtidig. I det serveren(e) din endelig kommer opp igjen, vil det fort bli veldig mange klienter som kobler til omtrent samtidig. Det kan være litt kjipt. Du kan lese mer om problemstilling og foreslåtte løsninger på internet. F.eks denne artikkelen. Det vi burde gjøre er å lage en backoff algoritme som sørger for bedre spredning av gjenoppkoblingsforsøkene.
La oss gi det et forsøk:
export const calcBackoff = (
attempt: number,
randSeed: number,
maxVal = 30000,
): number => {
if (attempt === 0) {
return 0;
}
return Math.min(maxVal, (attempt ** 2) * 1000) + (2000 * randSeed);
};
Vi lager en backoff funksjon som øker eksponensielt. I tillegg legger vi til støy ved å bruke en random verdi vi tar inn som parameter. Denne funksjonen er fortsatt veldig naiv, og gir fortsatt ikke veldig god/jevn spredning. Det får bli opp til vordende biblioteksforfattere å lage en bedre implementasjon. Det er fortsatt langt bedre enn vår lineære backoff og det viktig er å illustrere konseptet. Ok, så nå har vi en litt bedre backoff funksjon, men den forventer en random seed (ett desimaltall mellom 0 og 1). Hvordan får vi tak i en slik random verdi? Vi ønsker jo ikke å tulle til transisjonsfunksjonen vår med sideeffekter, gjør vi vel?
Effekter
export const Effects = unionize({
// ...eksisterende effekter utelatt
// Vi definerer en ny effekt for å bestille en tilfeldig tall
REQUEST_RANDOM: {}
});
Hendelser
export const Events = unionize({
// ...eksisterende hendelser utelatt
// Ny hendelse til tilstandsmaskinen vår som utløses når bestilling er effektuert!
ON_RANDOM: ofType<{seed: number}>()
});
Transisjoner
// ... resten som før
CLOSED: () => Events.match(evt, {
// Endring: I stedet for å bestille en timeout for å gjenoppkoble bestiller
// vi her en tilfeldig verdi
ON_RECONNECT: () => [
States.RECONNECTING({...state, reconnectAttempt: state.reconnectAttempt + 1}),
[Effects.REQUEST_RANDOM()]
],
default: () => [state, []]
}),
RECONNECTING: () => Events.match(evt, {
// Ny: Først når vi har en tilfeldig verdi kan vi beregne timeout
// og bestille en timeout for gjenoppkobling
ON_RANDOM: ({seed}) => [
state,
[Effects.SCHEDULE_TIMEOUT({
key: "connect",
// Vi bytter ut med vår nye fancy funksjon for å beregne timout
timeoutMillis: calcBackoff(state.reconnectAttempt, seed),
onTimeout: Events.ON_CONNECT()
})]
],
ON_CONNECT: () => [
States.CONNECTING(state), [Effects.CONNECT_WS()]
],
default: () => [state, []]
})
Det var ikke så ille, selvom det var litt ekstra jobb selvfølgelig. Funksjonen vår er fortsatt “ren” og fri for sideeffekter.
Effekthåndtering
const handleEffect = (effect: Effect) => {
Effects.match(effect, {
// eksisterende tilfeller utelatt
REQUEST_RANDOM: () => {
// Vi lener oss på Math.random() for å få en en tilfeldig verdi
handleTransition(Events.ON_RANDOM({ seed: Math.random() }))
}
})
};
Å implementere effekthåndtering var jo ikke så vanskelig. Nå skulle vi selvfølgelig ha oppdatert testene våre også, men det får bli hjemmelekse. Det er på tide å runde av.
¶Oppsummering
Da har vi kommet mer eller mindre i mål med å lage et lite bibliotek for å berike en standard WebSocket-klient. Vi har brukt tilstandsmaskiner og funksjonell programmering som inspirasjon for implementasjonen vår. Det krever litt mer innsats å skille rene funksjoner fra sideeffekter og koden ser kanskje ganske fremmed ut for mange. For meg er det verdt innsatsen, men jeg har forståelse for at ikke alle nødvendigvis er enige med meg om det.
Koden bak min julegave til npm finner du på https://github.com/rundis/ts-ws-machine. Det er ikke 1-1 med koden i denne bloggposten, men likner ganske mye.