Skip to main content

Private messages

Let's add private messaging between users. We will use a request-stream to listen for new messages from other users.

See resulting code on GitHub

Shared

Let's add an object representation of a message to the shared module:

from dataclasses import dataclass
from typing import Optional

@dataclass(frozen=True)
class Message:
user: Optional[str] = None
content: Optional[str] = None

Lines 4-7 defines a frozen dataclass with two fields:

  • user : name of the recipient user when sending a message, and the name of the sender when receiving it.
  • content : the message body.

We will use json to serialize the messages for transport. Add the following helper method to the shared module:

import json
from rsocket.frame_helpers import ensure_bytes
from rsocket.payload import Payload

def encode_dataclass(obj):
return ensure_bytes(json.dumps(obj.__dict__))

def dataclass_to_payload(obj) -> Payload:
return Payload(encode_dataclass(obj))

Lines 5-6 Defines a minimal dataclass json encoder which assumes all the fields in the dataclass are python primitives, or builtin collections of those.

Lines 8-9 Defines a helper method for creating Payloads containing only data fields.

Server side

First we add a queue for incoming user messages:

from dataclasses import dataclass, field
from asyncio import Queue

@dataclass()
class UserSessionData:
...
messages: Queue = field(default_factory=Queue)

Line 7 defines a messages queue. These are private (and later on channel) messages to the user from other clients.

from typing import Optional

from more_itertools import first

def find_session_by_username(username: str) -> Optional[UserSessionData]:
return first((session for session in chat_data.user_session_by_id.values() if
session.username == username), None)

Lines 5-7 define a helper for looking up a user's session by username. This will be used to deliver private messages.

Next we will register a request-response endpoint for sending private messages:

import json
from typing import Awaitable

from rsocket.helpers import create_response
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
from shared import Message

class ChatUserSession:

def router_factory(self):
router = RequestRouter()

@router.response('message')
async def send_message(payload: Payload) -> Awaitable[Payload]:
message = Message(**json.loads(payload.data))

logging.info('Received message for user: %s', message.user)

target_message = Message(self._session.username, message.content)

session = find_session_by_username(message.user)
await session.messages.put(target_message)

return create_response()

Lines 15-26 define the endpoint for sending messages. The Payload must contain a json serialized Message object. The recipient's session is found (Line 23), and the message is placed in the user's message queue (Line 24).

Line 25 returns an empty Payload future using the create_response helper method.

As a last step on the server side, we register a request-stream endpoint which listens for incoming messages and sends them to the client:

import asyncio

from shared import encode_dataclass
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter

class ChatUserSession:

def router_factory(self):
router = RequestRouter()

@router.stream('messages.incoming')
async def messages_incoming():
class MessagePublisher(DefaultPublisher, DefaultSubscription):
def __init__(self, session: UserSessionData):
self._session = session
self._sender = None

def cancel(self):
self._sender.cancel()

def subscribe(self, subscriber: Subscriber):
super(MessagePublisher, self).subscribe(subscriber)
subscriber.on_subscribe(self)
self._sender = asyncio.create_task(self._message_sender())

async def _message_sender(self):
while True:
next_message = await self._session.messages.get()
next_payload = Payload(encode_dataclass(next_message))
self._subscriber.on_next(next_payload)

return MessagePublisher(self._session)

Lines 15-36 define the endpoint for listening to new messages.

Lines 17-34 define the publisher which will be returned. The same class will be used as the subscription for canceling the stream. The messages are taken from the user's session message queue and delivering them to the client.

The loop (Lines 31-34) is placed in an asyncio Task which can be canceled by the client (Lines 22-23)

Client side

First let's add a client method for sending private messages:

from shared import Message, encode_dataclass
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload

class ChatClient:

async def private_message(self, username: str, content: str):
print(f'Sending {content} to user {username}')

await self._rsocket.request_response(Payload(encode_dataclass(Message(username, content)),
composite(route('message'))))

Line 10-11 creates a Payload with the messages and sends it to the 'message' route.

Next we add a method which will listen for incoming messages:

import json
from typing import Optional

from shared import Message
from reactivestreams.subscriber import DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.rsocket_client import RSocketClient
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload

class ChatClient:
def __init__(self, rsocket: RSocketClient):
...
self._message_subscriber: Optional = None

def listen_for_messages(self):
def print_message(data: bytes):
message = Message(**json.loads(data))
print(f'{self._username}: from {message.user}: {message.content}')

class MessageListener(DefaultSubscriber, DefaultSubscription):
def __init__(self):

def on_next(self, value, is_complete=False):
print_message(value.data)

def on_error(self, exception: Exception):
print(exception)

def cancel(self):
self.subscription.cancel()

self._message_subscriber = MessageListener()
self._rsocket.request_stream(
Payload(metadata=composite(route('messages.incoming')))
).subscribe(self._message_subscriber)

def stop_listening_for_messages(self):
self._message_subscriber.cancel()

Lines 21-31 define the Subscriber which will listen for incoming messages and print them on the client side.

An instance of the MessageListener is stored on the client (Line 33) to later allow stopping the incoming message stream.

Lines 34-36 send the request and subscribe to the resulting Publisher.

The method in Lines 38-39 can be used to stop the above message listener.

Finally, let's test the new functionality. Modify the main method in the client:

import asyncio

from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.helpers import single_transport_provider
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP

async def main():
connection1 = await asyncio.open_connection('localhost', 6565)

async with RSocketClient(single_transport_provider(TransportTCP(*connection1)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client1:
connection2 = await asyncio.open_connection('localhost', 6565)

async with RSocketClient(single_transport_provider(TransportTCP(*connection2)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client2:
user1 = ChatClient(client1)
user2 = ChatClient(client2)

await user1.login('user1')
await user2.login('user2')

user2.listen_for_messages()

await user1.private_message('user2', 'private message from user1')

await asyncio.sleep(3)

user2.stop_listening_for_messages()

In this example, we open two rsocket connections to the server (lines 9-12 and lines 13-16).

Lines 17-21 wrap the rsocket clients with the chat client adapter and login the two users.

Line 23 makes user2 listen for incoming messages, while line 25 has user1 send a message to user2.

Finally, lines 27-29 make the application wait for 3 seconds, then stops user2 listening for messages.