A long time ago, I found myself in a situation where I had to create a scalable system that could be capable of handling hundreds of simultaneous connections at not very high cost, and with reasonable, but not instant time of response.
My first thoughts? Let’s move all create/edit/delete actions to the queue and notify users if their actions succeeded or not via WebSocket.
But back then, I hadn’t much experience with WebSockets in production, so my first step was to investigate how it works with the help of tutorials, stack overflow, and other sources.
So, after some time, I got a gist of how it should work and started to prepare a code and mess around for a while with a load tests tool to simulate high traffic.
The first problem
Some questions and answers suggested calling the subscribe method on the Redis instance on the connected WebSocket client.
io.sockets.on('connection', function (sockets) {
sockets.emit('message',{Hello: 'World!'});
sub.subscribe('attack-map-production');
sockets.on('disconnect', function() {
sub.unsubscribe('attack-map-production');
});
});
But in this way, we are creating a new connection to Redis, so the memory usage in our application and Redis connection pool are rising. (Redis allows only 10k connections to one instance)
That was a big no for me because I had to lower memory usage to a minimum.
Right now, many articles, fortunately, mention that you should not create a new Redis connection on each WebSocket client.
The second problem
After creating a big chunk of business code, when I started the part with web sockets, a question popped into my mind - how to create them in a proper and safe way?
I already had some events in the system, and some of them were ready to be additionally published via WebSockets, the rest of them were meant to stay inside the system.
My gold promise was that I wouldn’t have to change code drastically and still be able to send only selected events to websocket clients.
That’s why, at first, I created a Redis pub-sub module, as I thought that my events, in order to be visible in other instances, have to be transmitted via the Redis pub-sub pattern.
Don’t feel overwhelmed by looking at the module below, as I will explain details later in a use case
export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT';
export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT';
export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS =
'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS';
@Module({
providers: [
{
provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS,
useFactory: (options: RedisEventPubSubModuleOptions) => options,
inject: [MODULE_OPTIONS_TOKEN],
},
{
provide: REDIS_PUB_CLIENT,
useFactory: async (options: RedisEventPubSubModuleOptions) => {
const client = createClient({
url: `redis://${options.host}:${options.port}`,
});
client.on('error', (err) => console.error('Redis Client Error', err));
await client.connect();
return client;
},
inject: [MODULE_OPTIONS_TOKEN],
},
{
provide: EVENT_EMITTER_TOKEN,
useFactory: (
redisPubClient: RedisClientType,
eventEmitter: EventEmitter2,
) => {
return new RedisEventEmitter(redisPubClient, eventEmitter);
},
inject: [REDIS_PUB_CLIENT, EventEmitter2],
},
{
provide: EVENT_SUBSCRIBER_TOKEN,
useFactory: (eventEmitterSub: EventEmitter2) => {
return new EventEmitter2EventSubscriber(eventEmitterSub);
},
inject: [EventEmitter2],
},
],
exports: [
REDIS_PUB_CLIENT,
EVENT_EMITTER_TOKEN,
EVENT_SUBSCRIBER_TOKEN,
REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS,
],
})
export class RedisEventPubSubModule extends ConfigurableModuleClass {
static registerEvents(eventsPublishableNames: string[]): DynamicModule {
return {
module: class {},
providers: [
{
provide: REDIS_SUB_CLIENT,
useFactory: async (
options: RedisEventPubSubModuleOptions,
eventEmitter: EventEmitter2,
) => {
const client = createClient({
url: `redis://${options.host}:${options.port}`,
});
client.on('error', (err) =>
console.error('Redis Client Error', err),
);
await client.connect();
for (const eventPublishableName of eventsPublishableNames) {
await client.subscribe(eventPublishableName, (message) => {
const normalizedMessage = JSON.parse(
message,
) as PublishableEventInterface;
delete (
normalizedMessage as Writeable<PublishableEventInterface>
).publishableEventName;
eventEmitter.emit(eventPublishableName, normalizedMessage);
});
}
return client;
},
inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2],
},
],
};
}
}
This module takes care of creating/exposing a Pub Redis client and exposing an additional method - registerEvents, which is responsible for listening for given events on Redis pub-sub and re-emitting them via event emitter.
It may be a little foggy for now. Why re-emitting events? Why do we need to register for those events? What are EVENT_EMITTER_TOKEN and EVENT_SUBSCRIBER_TOKEN and why do we have to export them?
It will be more clear with real-life usage, so let’s create a use case - chat messages. We want to be able to send messages via HTTP POST and receive them via WebSocket on the front end.
Let’s begin
Publishing events
Here’s a module for that
@Module({
imports: [],
controllers: [],
providers: [],
})
export class UserChatModule {}
And an event that this module will be emitting after receiving a POST request
export class NewMessageEvent {
constructor(public readonly message: string) {}
}
In the controller, we have to make it possible to emit events both for our system and the Redis pub queue. We will use wrapped EventEmitter2 for that
export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN';
export class RedisEventEmitter implements EventEmitterInterface {
constructor(
private redisPubClient: RedisClientType,
private eventEmitter: EventEmitter2,
) {}
async emit(eventName: string, payload: Record<any, any>): Promise<void> {
this.eventEmitter.emit(eventName, payload);
if (this.isPublishableEvent(payload)) {
await this.redisPubClient.publish(
payload.publishableEventName,
JSON.stringify(payload),
);
}
}
private isPublishableEvent(event: any): event is PublishableEventInterface {
return event.publishableEventName !== undefined;
}
}
And then, we are able to use it in our controller
@Controller('messages')
export class SendMessageAction {
constructor(
// Previously eventEmitter2
@Inject(EVENT_EMITTER_TOKEN)
private readonly eventEmitter: EventEmitterInterface,
) {}
@Post()
async handle(@Body() request: SendMessageHttpRequest) {
await this.eventEmitter.emit(
NewMessageEvent.name,
new NewMessageEvent(request.content),
);
}
}
But before that, we have to enhance our event with PublishableEventInterface in order to allow RedisEventEmitter to catch our event and emit it in the Redis pub queue.
export class NewMessageEvent implements PublishableEventInterface {
static publishableEventName = 'events:new-message';
publishableEventName = NewMessageEvent.publishableEventName;
constructor(public readonly message: string) {}
}
Great, we are now sending our events like we used to, but now, if they are marked as publishable, they will land in the Redis pub queue.
But now, we need to make it possible to receive those events on WebSocket, right?
Receiving events
So, let’s take a look at our user chat module
@Module({
imports: [
RedisEventPubSubModule.registerEvents([
NewMessageEvent.publishableEventName,
]),
],
controllers: [SendMessageAction],
providers: [],
})
export class UserChatModule {}
As you can see, we used the method mentioned earlier - registerEvents.
Thanks to that method, we told RedisEventPubSubModule that it should listen for our NewMessageEvent event in the Redis pub-sub queue on the publishableEventName attribute.
So, if any NewMessageEvent event occurs, then it will be re-emitted as a normal NewMessageEvent event, but under the publishableEventName attribute.
It’s worth mentioning, that it will work on 1 instance or 1,000 instances. So even if we scale to a high number of instances, each of them will receive this and re-emit this event inside the system.
So, now we have abilities to emit events and listen for them. Now we need to deliver them to our websocket clients.
Websocket Gateway
Let’s take a look at Websocket Gateway
export enum WebsocketEventSubscribeList {
FETCH_EVENTS_MESSAGES = 'fetch-events-messages',
EVENTS_MESSAGES_STREAM = 'events-messages-stream',
}
@WebSocketGateway({
pingInterval: 30000,
pingTimeout: 5000,
cors: {
origin: '*',
},
})
export class MessagesWebsocketGateway {
constructor(
@Inject(EVENT_SUBSCRIBER_TOKEN)
private eventSubscriber: EventSubscriberInterface,
) {}
@SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES)
async streamMessagesData(@ConnectedSocket() client: any) {
const stream$ = this.createWebsocketStreamFromEventFactory(
client,
this.eventSubscriber,
NewMessageEvent.publishableEventName,
);
const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM;
return from(stream$).pipe(map((data) => ({ event, data })));
}
private createWebsocketStreamFromEventFactory(
client: any,
eventSubscriber: EventSubscriberInterface,
eventName: string,
): Observable<any> {
return new Observable((observer) => {
const dynamicListener = (message: PublishableEventInterface) => {
observer.next(message);
};
eventSubscriber.on(eventName, dynamicListener);
client.on('disconnect', () => {
eventSubscriber.off(eventName, dynamicListener);
});
});
}
}
So there is a thing, in the constructor, we have EVENT_SUBSCRIBER_TOKEN, which type is EventSubscriberInterface. But what it really does? This is how it looks under the hood
export class EventEmitter2EventSubscriber implements EventSubscriberInterface {
constructor(private eventEmitter: EventEmitter2) {}
on(name: string, listener: any): void {
this.eventEmitter.on(name, listener);
}
off(name: string, listener: any): void {
this.eventEmitter.removeListener(name, listener);
}
}
It’s just a wrapper for EventEmitter2, that we are using in the method createWebsocketStreamFromEventFactory
private createWebsocketStreamFromEventFactory(
client: any,
eventSubscriber: EventSubscriberInterface,
eventName: string,
): Observable<any> {
return new Observable((observer) => {
const dynamicListener = (message: PublishableEventInterface) => {
observer.next(message);
};
eventSubscriber.on(eventName, dynamicListener);
client.on('disconnect', () => {
eventSubscriber.off(eventName, dynamicListener);
});
});
}
}
We are using this wrapped EventEmitter2 to create dynamic listeners on publishableName when websocket clients connect and remove on disconnect.
Then, we are doing nothing more than creating rxjs stream to keep the websocket connection and send messages from the listener via observer.next(message); when a new message occurs.
How this event will reach our listeners?
If you return to the first snippet of code, our Redis pub sub module, then you can spot this in the registerEvents method
for (const eventPublishableName of eventsPublishableNames) {
await client.subscribe(eventPublishableName, (message) => {
const normalizedMessage = JSON.parse(
message,
) as PublishableEventInterface;
delete (
normalizedMessage as Writeable<PublishableEventInterface>
).publishableEventName;
eventEmitter.emit(eventPublishableName, normalizedMessage);
});
Which basically listens for events on the pub queue, and then re-emit them via the event emitter.
So, let’s sum up what we have done here
- We are still using our events in the system like we used to via EventEmitter2, but if we want to publish to our connected websocket clients, then all we have to do is implement PublishableInterface
- We are not creating new Redis connections on each connected websocket client
- We can scale up our system to X instances and it will still behave in the same way - each connected client will get a copy of the event via websocket, no matter to which instance they will be connected
Working code and example are available here: https://github.com/axotion/nestjs-events-websocket