IE retrospectiveResearch • Streams Overview

Event Stream Strategy

RethinkDB cannot handle many thousands of changefeed connections, based on consensus of those who have used it in production. "changefeeds aren't scalable, guys. It was a marketing gimmick to attract new developers (a startup mistake, focusing on this instead of essential features like data compression)."

A small number of persistant changefeed connections are recommended. People in RethinkDB messaging channels and some places online (this now-dead link, for example) claim to use RethinkDB with no troubles for massively concurrent applications using similar approach to one diagrammed here.

A few restrictions or things to note,

Diagram (user comes online)

  1. Each pod defines an event emitter,

    const dbChangeEmitter = new events.EventEmitter();
    
  2. The emitter can emit changes to many listeners. Changes are emitted to itself only, not other pods. Clients that subscribe connect with one pod only,

    dbChangeEmitter.emit(`event.${db}.presence.friendsof.${userId}`, data))
    events.on(dbChangeEmitter, `event.${db}.presence.friend.${userId}`)
    
  3. Each pod listens to the Presences table,

    const feedStream = await r
      .db(db)
      .table(tablePresences).changes()
      .filter(row => row('old_val').eq(null).or(
        row('old_val')('state').ne(row('new_val')('state'))
      )).run()
    feedStream.on('data', d => fn(db, d))
    
  4. When a user's Presence changes, room ids the user belongs to are emitted,

    const roomMembershipsJoined =
      await dbRoomMembershipGetAllJoinedUserFromRoomId(r, db, presenceDoc.user_id)
    roomMembershipsJoined.map(roomMembership => {
      Object.assign(roomMembership, { user_presence: presenceDoc })
      if (roomMembership.group_type === enumGroupTypeDIRECT)
        dbChangeFriendStreamEmit(evemit, db, roomMembership.room_id, roomMembership)
      else if (roomMembership.group_type === enumGroupTypeGROUP)
        dbChangeRoomStreamEmit(evemit, db, roomMembership.room_id, roomMembership)
    })
    
  5. (Client) A user connects to a stream always served by the same pod,

    fetchEventSource(`${endpointUrl}?${query}`, {
      headers: {
        Accept: 'text/event-stream',
        Authorization: `Bearer ${token}`
      },
      onerror: ev => onEvent(data, ev),
      onmessage: ev => onEvent(data, ev),
      onclose: ev => onEvent(data, ev),
      onopen: ev => onEvent(data, ev)
    })
    
  6. The subscription handler first sends the state of all friends and room members at the head of the stream, then sinks AsyncIterator events from the room id and connected user's own user id (to get friend events),

    yield (await Promise.all([
      dbRoomMembershipGetAllJoinedFromTargetUserId(r, db, userId),
      // for each friend, send two events,
      //   1. AVAILABILITY event *with* user_presence,
      //   2. MEMBERSHIP event *without* user_presence
      ...(await dbRoomMembershipGetAllFriendFromUserId(r, db, userId, userExcludeId))
        .map(m => ([m, ({ ...m, user_presence: null })])),
      roomId ? dbRoomMembershipGetAllJoinedPresenceFromRoomId(r, db, roomId) : [],
      dbRoomMembershipGetAllInvitedFromTargetUserId(r, db, userId),
      dbRoomMembershipGetAllInvitedFromSenderUserId(r, db, userId)
    ])).flat()
    
    return combine(
      events.on(dbChangeEmitter, `event.${db}.presence.friendsof.${userId}`)
      events.on(dbChangeEmitter, `event.${db}.presence.room.${roomId}`))
    
  7. When a user's connection starts, the Presence document is set ONLINE,

    await dbPresenceSetStateONLINE(r, db, authDecoded.sub)
    
  8. (Client) the Presence change is published to all pods in the replica set and, through changefeed emitters, is sent to already-connected clients,

    {
      id: 'userid-134',
      user: {
        id: 'mSddOsvfH0UPCvWAL6yiV',
        displayName: 'loongcat1234'
      },
      timePublished: 1668630754574,
      roomId: 'GROUP.kWlzROKBozovu-zejkz1N',
      eventType: 'AVAILABILITY',
      presenceType: 'ONLINE'
    }
    

attachments

rethinkdb changefeed example
const presenceUsersRoomInitial = await r
    .table( 'Presence' )
    .getAll( r.args([ /* usersIds */ ]) )
    .orderBy({ index: r.desc( 'user_id' ) })
    .run();

const presenceStream = await r
    .table( 'Presence' )
    .changes()
    .merge({ docType: 'presence' })
    .getCursor();

const membershipStream = await r
    .table( 'RoomMemberships' )
    .changes()
    .merge({ docType: 'membership' })
    .getCursor();

const roomSendStream = ( roomId, data ) => (
    roomStreams[roomId].map( stream => stream.send( data ) ) );

const roomSendStreams = ( roomIds, data ) => (
    roomIds.map( id => roomSendStream( id, data ) ) );

// update if user membership changes (leave or join room/friendship)
membershpStream.on( 'data', data => {
    roomSendStream( data.new_value.room_id, data.new_value );
});

// update if user presence changes
presenceStream.on('data', data => {
    const userId = data.new_value.user_id;
    const roomIds = await getUserRoomIds( userId );
    
    roomSendStreams( roomIds, data.new_value );
});

// publishing changes
await r.db( 'Presence' ).insert({
    userId,
    presence: 'AVAILABLE'
}).run();

await r.db( 'Presence' ).insert({
    userId,
    presence: 'OFFLINE'
}).run();
bumblehead.com