[wip] so close; check the ws messages

This commit is contained in:
2025-08-30 18:24:08 -04:00
parent 782dd738cc
commit 01a12ec58a
11 changed files with 139 additions and 88 deletions

View File

@@ -79,9 +79,7 @@ export default (props: { tableKey: string }) => {
<Player
playerKey={player}
style={{
transform: `translate(0, ${
verticalOffset() * 150
}vh)`,
transform: `translate(0, ${verticalOffset() * 150}vh)`,
}}
/>
);
@@ -114,9 +112,7 @@ export default (props: { tableKey: string }) => {
<div class="absolute tc mt-8 flex gap-4">
<select>
<For each={Object.entries(games)}>
{([gameId, game]) => (
<option value={gameId}>{game.title}</option>
)}
{([gameId, game]) => <option value={gameId}>{gameId}</option>}
</For>
</select>
<button

View File

@@ -6,6 +6,7 @@ import { combine } from "kefir";
import Bus from "kefir-bus";
import db from "./db";
import { liveTable, WsIn, WsOut } from "./table";
import { err } from "./logging";
export const WS = Bus<
{
@@ -76,13 +77,17 @@ const api = new Elysia({ prefix: "/api" })
})
)
.ws("/ws/:tableKey", {
async open({
body: WsIn,
response: WsOut,
open: ({
data: {
params: { tableKey },
humanKey,
},
send,
}) {
}) => {
console.log("websocket opened");
const table = liveTable(tableKey);
table.inputs.connectionChanges.emit({
@@ -90,22 +95,14 @@ const api = new Elysia({ prefix: "/api" })
presence: "joined",
});
Object.entries(table.outputs.global).forEach(([type, stream]) =>
Object.entries({
...table.outputs.global,
...(table.outputs.player[humanKey] ?? {}),
}).forEach(([type, stream]) =>
stream.onValue((v) => send({ [type]: v }))
);
combine(
[table.outputs.gameState],
[table.outputs.gameImpl],
(state, game: Game) => state && game.getView({ state, humanKey })
)
.toProperty()
.onValue((view) => send({ view }));
},
body: WsIn,
response: WsOut,
message: (
{
data: {
@@ -114,19 +111,20 @@ const api = new Elysia({ prefix: "/api" })
},
},
body
) => WS.emit({ ...body, type: "message", humanKey, tableKey }),
) => liveTable(tableKey).inputs.messages.emit({ ...body, humanKey }),
close({
close: ({
data: {
params: { tableKey },
humanKey,
},
}) {
}) =>
liveTable(tableKey).inputs.connectionChanges.emit({
humanKey,
presence: "left",
});
},
}),
error: (error) => err(error),
});
export default api;

View File

@@ -16,8 +16,8 @@ new Elysia()
})
)
.onRequest(({ request }) => log.log(request))
.onError(({ error }) => log.err(error))
.onRequest(({ request }) => console.log(request.url))
.onError(({ error }) => console.error(error))
.get("/ping", () => "pong")
.use(api)

View File

@@ -10,4 +10,7 @@ export const log = (value: unknown) => LogBus.emit(value);
export const err = (value: unknown) =>
LogBus.emitEvent({ type: "error", value });
LogPool.log();
LogStream.log();
LogStream.onError((err) => {
console.error(err);
});

View File

@@ -1,7 +1,14 @@
import GAMES, { Game, GameKey } from "@games/shared/games";
import { isEmpty, multiScan, ValueWithin } from "@games/shared/kefir";
import {
isEmpty,
multiScan,
partition,
set,
setDiff,
ValueWithin,
} from "@games/shared/kefirs";
import { t } from "elysia";
import { combine, pool, Property } from "kefir";
import { combine, Observable, pool, Property } from "kefir";
import Bus, { type Bus as TBus } from "kefir-bus";
import { log } from "./logging";
@@ -31,15 +38,7 @@ type TablePayload<
},
never
>;
readys: TBus<
Attributed & {
ready: boolean;
},
any
>;
actions: TBus<Attributed & GameAction, any>;
quits: TBus<Attributed, any>;
messages: TBus<Attributed & TWsIn, any>;
};
outputs: {
global: {
@@ -77,14 +76,11 @@ export const liveTable = <
if (!(key in tables)) {
const inputs: TablePayload<GameConfig, GameState, GameAction>["inputs"] = {
connectionChanges: Bus(),
readys: Bus(),
actions: Bus(),
quits: Bus(),
messages: Bus(),
};
const { connectionChanges, readys, actions, quits } = inputs;
const { connectionChanges, messages } = inputs;
// =======
const playerStreams = {};
// players who have at least one connection to the room
const playersPresent = connectionChanges
@@ -102,7 +98,37 @@ export const liveTable = <
.map((counts) => Object.keys(counts))
.toProperty();
const gameEnds = quits.map((_) => null);
const playerStreams: TablePayload<
GameConfig,
GameState,
GameAction
>["outputs"]["player"] = {};
playersPresent
.map(set)
.slidingWindow(2, 2)
.map(([prev, cur]) => setDiff([prev, cur]))
.onValue(({ added, removed }) => {
added.forEach((p) => {
playerStreams[p] = {
view: Bus(),
};
});
removed.forEach((p) => {
delete playerStreams[p];
});
});
const { ready, action, quit } = partition(
["ready", "action", "quit"],
messages
) as unknown as {
// yuck
ready: Observable<Attributed & { ready: boolean }, any>;
action: Observable<Attributed & { action: GameAction }, any>;
quit: Observable<Attributed, any>;
};
const gameEnds = quit.map((_) => null);
const playersReady = multiScan(
null as {
@@ -114,8 +140,8 @@ export const liveTable = <
Object.fromEntries(players.map((p) => [p, prev?.[p] ?? false])),
],
[
readys,
(prev, evt: ValueWithin<typeof readys>) =>
ready,
(prev, evt: ValueWithin<typeof ready>) =>
prev?.[evt.humanKey] != null
? {
...prev,
@@ -165,7 +191,7 @@ export const liveTable = <
prev || (game.init() as GameState),
],
[
combine([actions], [gameImpl], (action, impl) => ({
combine([action], [gameImpl], (action, impl) => ({
action,
...impl,
})),
@@ -185,7 +211,7 @@ export const liveTable = <
action,
}) as GameState),
],
[quits, () => null]
[quit, () => null]
).toProperty();
const gameIsActive = gameState

View File

@@ -1,37 +0,0 @@
import { merge, Observable } from "kefir";
export type ValueWithin<O extends Observable<any, any>> = Parameters<
Parameters<O["map"]>[0]
>[0];
type Mutation<A, O extends Observable<any, any>> = [
O,
(prev: A, value: ValueWithin<O>) => A
];
export const multiScan = <A, M extends Mutation<A, any>[]>(
initValue: A,
...mutations: M
): Observable<A, any> =>
merge(
mutations.map(([source, mutation]) =>
source.map((event) => ({ event, mutation }))
)
).scan((prev, { event, mutation }) => mutation(prev, event), initValue);
export const partition =
<C extends readonly [...string[]], T, E>(
classes: C,
partitionFn: (v: T) => C[number]
) =>
(obs: Observable<T, E>) => {
const assigned = obs.map((obj) => ({ obj, cls: partitionFn(obj) }));
return Object.fromEntries(
classes.map((C) => [
C,
assigned.filter(({ cls }) => cls == C).map(({ obj }) => obj),
])
);
};
export const isEmpty = (container: { length: number }) => container.length == 0;

52
pkg/shared/kefirs.ts Normal file
View File

@@ -0,0 +1,52 @@
import { merge, Observable } from "kefir";
import Bus from "kefir-bus";
export type ValueWithin<O extends Observable<any, any>> = Parameters<
Parameters<O["map"]>[0]
>[0];
type Mutation<A, O extends Observable<any, any>> = [
O,
(prev: A, value: ValueWithin<O>) => A
];
export const multiScan = <A, M extends Mutation<A, any>[]>(
initValue: A,
...mutations: M
): Observable<A, any> =>
merge(
mutations.map(([source, mutation]) =>
source.map((event) => ({ event, mutation }))
)
).scan((prev, { event, mutation }) => mutation(prev, event), initValue);
export const partition = <
C extends readonly [...string[]],
T extends { [key: string]: any },
E
>(
classes: C,
obs: Observable<T, E>
) => {
const classBuses = Object.fromEntries(classes.map((c) => [c, Bus()]));
obs.onValue((v) => {
for (const _class of classes) {
if (_class in v) {
classBuses[_class].emit(v);
return;
}
}
});
return classBuses;
};
export const isEmpty = (container: { length: number }) => container.length == 0;
export const setDiff = <T>(
sets: [Set<T>, s2: Set<T>]
): { added: T[]; removed: T[] } => ({
added: [...sets[1].difference(sets[0])],
removed: [...sets[0].difference(sets[1])],
});
export const set = <T>(arr: T[]) => new Set<T>(arr);

View File

@@ -3,9 +3,11 @@
"version": "1.0.0",
"dependencies": {
"kefir": "^3.8.8",
"kefir-bus": "^2.3.1",
"object-hash": "^3.0.0"
},
"devDependencies": {
"@types/kefir": "^3.8.11"
"@types/kefir": "^3.8.11",
"ts-xor": "^1.3.0"
}
}

View File

@@ -1,6 +1,9 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"esModuleInterop": true
"esModuleInterop": true,
"target": "esnext",
"moduleResolution": "nodenext",
"module": "nodenext"
}
}