8000 feat: broadcast and expect multiple acks · dzad/socket.io@734b925 · GitHub
  • [go: up one dir, main page]

    Skip to content

    Commit 734b925

    Browse files
    feat: broadcast and expect multiple acks
    Syntax: ```js io.timeout(1000).emit("some-event", (err, responses) => { // ... }); ``` The adapter exposes two additional methods: - `broadcastWithAck(packets, opts, clientCountCallback, ack)` Similar to `broadcast(packets, opts)`, but: * `clientCountCallback()` is called with the number of clients that received the packet (can be called several times in a cluster) * `ack()` is called for each client response - `serverCount()` It returns the number of Socket.IO servers in the cluster (1 for the in-memory adapter). Those two methods will be implemented in the other adapters (Redis, Postgres, MongoDB, ...). Related: - socketio#1811 - socketio#4163 - socketio/socket.io-redis-adapter#445
    1 parent 8119cf7 commit 734b925

    File tree

    8 files changed

    +238
    -16
    lines changed

    8 files changed

    +238
    -16
    lines changed

    lib/broadcast-operator.ts

    Lines changed: 80 additions & 6 deletions
    Original file line numberDiff line numberDiff line change
    @@ -129,6 +129,29 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
    129129
    );
    130130
    }
    131131

    132+
    /**
    133+
    * Adds a timeout in milliseconds for the next operation
    134+
    *
    135+
    * <pre><code>
    136+
    *
    137+
    * io.timeout(1000).emit("some-event", (err, responses) => {
    138+
    * // ...
    139+
    * });
    140+
    *
    141+
    * </pre></code>
    142+
    *
    143+
    * @param timeout
    144+
    */
    145+
    public timeout(timeout: number) {
    146+
    const flags = Object.assign({}, this.flags, { timeout });
    147+
    return new BroadcastOperator(
    148+
    this.adapter,
    149+
    this.rooms,
    150+
    this.exceptRooms,
    151+
    flags
    152+
    );
    153+
    }
    154+
    132155
    /**
    133156
    * Emits to all clients.
    134157
    *
    @@ -149,14 +172,65 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
    149172
    data: data,
    150173
    };
    151174

    152-
    if ("function" == typeof data[data.length - 1]) {
    153-
    throw new Error("Callbacks are not supported when broadcasting");
    175+
    const withAck = typeof data[data.length - 1] === "function";
    176+
    177+
    if (!withAck) {
    178+
    this.adapter.broadcast(packet, {
    179+
    rooms: this.rooms,
    180+
    except: this.exceptRooms,
    181+
    flags: this.flags,
    182+
    });
    183+
    184+
    return true;
    154185
    }
    155186

    156-
    this.adapter.broadcast(packet, {
    157-
    rooms: this.rooms,
    158-
    except: this.exceptRooms,
    159-
    flags: this.flags,
    187+
    const ack = data.pop() as (...args: any[]) => void;
    188+
    let timedOut = false;
    189+
    let responses: any[] = [];
    190+
    191+
    const timer = setTimeout(() => {
    192+
    timedOut = true;
    193+
    ack.apply(this, [new Error("operation has timed out"), responses]);
    194+
    }, this.flags.timeout);
    195+
    196+
    let expectedServerCount = -1;
    197+
    let actualServerCount = 0;
    198+
    let expectedClientCount = 0;
    199+
    200+
    const checkCompleteness = () => {
    201+
    if (
    202+
    !timedOut &&
    203+
    expectedServerCount === actualServerCount &&
    204+
    responses.length === expectedClientCount
    205+
    ) {
    206+
    clearTimeout(timer);
    207+
    ack.apply(this, [null, responses]);
    208+
    }
    209+
    };
    210+
    211+
    this.adapter.broadcastWithAck(
    212+
    packet,
    213+
    {
    214+
    rooms: this.rooms,
    215+
    except: this.exceptRooms,
    216+
    flags: this.flags,
    217+
    },
    218+
    (clientCount) => {
    219+
    // each Socket.IO server in the cluster sends the number of clients that were notified
    220+
    expectedClientCount += clientCount;
    221+
    actualServerCount++;
    222+
    checkCompleteness();
    223+
    },
    224+
    (clientResponse) => {
    225+
    // each client sends an acknowledgement
    226+
    responses.push(clientResponse);
    227+
    checkCompleteness();
    228+
    }
    229+
    );
    230+
    231+
    this.adapter.serverCount().then((serverCount) => {
    232+
    expectedServerCount = serverCount;
    233+
    checkCompleteness();
    160234
    });
    161235

    162236
    return true;

    lib/index.ts

    Lines changed: 17 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -772,6 +772,23 @@ export class Server<
    772772
    return this.sockets.local;
    773773
    }
    774774

    775+
    /**
    776+
    * Adds a timeout in milliseconds for the next operation
    777+
    *
    778+
    * <pre><code>
    779+
    *
    780+
    * io.timeout(1000).emit("some-event", (err, responses) => {
    781+
    * // ...
    782+
    * });
    783+
    *
    784+
    * </pre></code>
    785+
    *
    786+
    * @param timeout
    787+
    */
    788+
    public timeout(timeout: number) {
    789+
    return this.sockets.timeout(timeout);
    790+
    }
    791+
    775792
    /**
    776793
    * Returns the matching socket instances
    777794
    *

    lib/namespace.ts

    Lines changed: 17 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -379,6 +379,23 @@ export class Namespace<
    379379
    return new BroadcastOperator(this.adapter).local;
    380380
    }
    381381

    382+
    /**
    383+
    * Adds a timeout in milliseconds for the next operation
    384+
    *
    385+
    * <pre><code>
    386+
    *
    387+
    * io.timeout(1000).emit("some-event", (err, responses) => {
    388+
    * // ...
    389+
    * });
    390+
    *
    391+
    * </pre></code>
    392+
    *
    393+
    * @param timeout
    394+
    */
    395+
    public timeout(timeout: number) {
    396+
    return new BroadcastOperator(this.adapter).timeout(timeout);
    397+
    }
    398+
    382399
    /**
    383400
    * Returns the matching socket instances
    384401
    *

    lib/socket.ts

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -140,7 +140,7 @@ export class Socket<
    140140
    private readonly adapter: Adapter;
    141141
    private acks: Map<number, () => void> = new Map();
    142142
    private fns: Array<(event: Event, next: (err?: Error) => void) => void> = [];
    143-
    private flags: BroadcastFlags & { timeout?: number } = {};
    143+
    private flags: BroadcastFlags = {};
    144144
    private _anyListeners?: Array<(...args: any[]) => void>;
    145145

    146146
    /**

    package-lock.json

    Lines changed: 8 additions & 7 deletions
    Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

    package.json

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -50,7 +50,7 @@
    5050
    "base64id": "~2.0.0",
    5151
    "debug": "~4.3.2",
    5252
    "engine.io": "~6.1.2",
    53-
    "socket.io-adapter": "~2.3.3",
    53+
    "socket.io-adapter": "~2.4.0",
    5454
    "socket.io-parser": "~4.0.4"
    5555
    },
    5656
    "devDependencies": {

    test/socket.io.ts

    Lines changed: 113 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -2519,6 +2519,119 @@ describe("socket.io", () => {
    25192519
    });
    25202520
    });
    25212521
    });
    2522+
    2523+
    it("should broadcast and expect multiple acknowledgements", (done) => {
    2524+
    const srv = createServer();
    2525+
    const sio = new Server(srv);
    2526+
    2527+
    srv.listen(async () => {
    2528+
    const socket1 = client(srv, { multiplex: false });
    2529+
    const socket2 = client(srv, { multiplex: false });
    2530+
    const socket3 = client(srv, { multiplex: false });
    2531+
    2532+
    await Promise.all([
    2533+
    waitFor(socket1, "connect"),
    2534+
    waitFor(socket2, "connect"),
    2535+
    waitFor(socket3, "connect"),
    2536+
    ]);
    2537+
    2538+
    socket1.on("some event", (cb) => {
    2539+
    cb(1);
    2540+
    });
    2541+
    2542+
    socket2.on("some event", (cb) => {
    2543+
    cb(2);
    2544+
    });
    2545+
    2546+
    socket3.on("some event", (cb) => {
    2547+
    cb(3);
    2548+
    });
    2549+
    2550+
    sio.timeout(2000).emit("some event", (err, responses) => {
    2551+
    expect(err).to.be(null);
    2552+
    expect(responses).to.have.length(3);
    2553+
    expect(responses).to.contain(1, 2, 3);
    2554+
    2555+
    done();
    2556+
    });
    2557+
    });
    2558+
    });
    2559+
    2560+
    it("should fail when a client does not acknowledge the event in the given delay", (done) => {
    2561+
    const srv = createServer();
    2562+
    const sio = new Server(srv);
    2563+
    2564+
    srv.listen(async () => {
    2565+
    const socket1 = client(srv, { multiplex: false });
    2566+
    const socket2 = client(srv, { multiplex: false });
    2567+
    const socket3 = client(srv, { multiplex: false });
    2568+
    2569+
    await Promise.all([
    2570+
    waitFor(socket1, "connect"),
    2571+
    waitFor(socket2, "connect"),
    2572+
    waitFor(socket3, "connect"),
    2573+
    ]);
    2574+
    2575+
    socket1.on("some event", (cb) => {
    2576+
    cb(1);
    2577+
    });
    2578+
    2579+
    socket2.on("some event", (cb) => {
    2580+
    cb(2);
    2581+
    });
    2582+
    2583+
    socket3.on("some event", (cb) => {
    2584+
    // timeout
    2585+
    });
    2586+
    2587+
    sio.timeout(200).emit("some event", (err, responses) => {
    2588+
    expect(err).to.be.an(Error);
    2589+
    expect(responses).to.have.length(2);
    2590+
    expect(responses).to.contain(1, 2);
    2591+
    2592+
    done();
    2593+
    });
    2594+
    });
    2595+
    });
    2596+
    2597+
    it("should broadcast and return if the packet is sent to 0 client", (done) => {
    2598+
    const srv = createServer();
    2599+
    const sio = new Server(srv);
    2600+
    2601+
    srv.listen(async () => {
    2602+
    const socket1 = client(srv, { multiplex: false });
    2603+
    const socket2 = client(srv, { multiplex: false });
    2604+
    const socket3 = client(srv, { multiplex: false });
    2605+
    2606+
    await Promise.all([
    2607+
    waitFor(socket1, "connect"),
    2608+
    waitFor(socket2, "connect"),
    2609+
    waitFor(socket3, "connect"),
    2610+
    ]);
    2611+
    2612+
    socket1.on("some event", () => {
    2613+
    done(new Error("should not happen"));
    2614+
    });
    2615+
    2616+
    socket2.on("some event", () => {
    2617+
    done(new Error("should not happen"));
    2618+
    });
    2619+
    2620+
    socket3.on("some event", () => {
    2621+
    done(new Error("should not happen"));
    2622+
    });
    2623+
    2624+
    sio
    2625+
    .to("room123")
    2626+
    .timeout(200)
    2627+
    .emit("some event", (err, responses) => {
    2628+
    expect(err).to.be(null);
    2629+
    expect(responses).to.have.length(0);
    2630+
    2631+
    done();
    2632+
    });
    2633+
    });
    2634+
    });
    25222635
    });
    25232636

    25242637
    describe("middleware", () => {

    test/support/util.ts

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -12,7 +12,7 @@ const i = expect.stringify;
    1212
    // add support for Set/Map
    1313
    const contain = expect.Assertion.prototype.contain;
    1414
    expect.Assertion.prototype.contain = function (...args) {
    15-
    if (typeof this.obj === "object") {
    15+
    if (this.obj instanceof Set || this.obj instanceof Map) {
    1616
    args.forEach((obj) => {
    1717
    this.assert(
    1818
    this.obj.has(obj),

    0 commit comments

    Comments
     (0)
    0