Statistics
As a last step, we will add passing some statistics between the client and the server:
- The client will be able to send its memory usage to the server.
- The server will report the number of users and channels. The client will be able to specify which of these statistics it wants.
See resulting code on GitHub
Shared code
We will define some data-classes to represent the payloads being sent between the client and server:
from dataclasses import dataclass, field
from typing import Optional, List
@dataclass(frozen=True)
class ServerStatistics:
user_count: Optional[int] = None
channel_count: Optional[int] = None
@dataclass()
class ServerStatisticsRequest:
ids: Optional[List[str]] = field(default_factory=lambda: ['users', 'channels'])
period_seconds: Optional[int] = field(default_factory=lambda: 2)
@dataclass(frozen=True)
class ClientStatistics:
memory_usage: Optional[int] = None
Lines 4-7 define the data sent to the client upon request. It contains two optional fields, the user count and the channel count.
Lines 9-12 define a request from the client which specified which statistics it wants and how often to report. The ids
list
represents the two values in the ServerStatistics
class.
Lines 14-16 define the statistics sent from the client to the server.
Server side
Data-classes
First we will add a field on the UserSessionData
to store the last statistics sent by the client:
from dataclasses import dataclass
from typing import Optional
from shared import ClientStatistics
@dataclass()
class UserSessionData:
...
statistics: Optional[ClientStatistics] = None
Endpoints
We will add two endpoints, one for receiving from the client, and one for requesting specific statistics from the server.
Client send statistics
import json
from shared import ClientStatistics
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
class ChatUserSession:
def router_factory(self):
router = RequestRouter()
...
@router.fire_and_forget('statistics')
async def receive_statistics(payload: Payload):
statistics = ClientStatistics(**json.loads(utf8_decode(payload.data)))
self._session.statistics = statistics
Lines 14-17 defines an endpoint for receiving statistics from the client. It uses the fire-and-forget request type, since this data is not critical to the application. No return value is required from this method, and if provided will be ignored.
Receive requested statistics
We will add a helper method for creating a new statistics response:
def new_statistics_data(statistics_request: ServerStatisticsRequest):
statistics_data = {}
if 'users' in statistics_request.ids:
statistics_data['user_count'] = len(chat_data.user_session_by_id)
if 'channels' in statistics_request.ids:
statistics_data['channel_count'] = len(chat_data.channel_messages)
return ServerStatistics(**statistics_data)
Next we define the endpoint for sending statistics to the client:
import asyncio
import json
from shared import ClientStatistics, ServerStatisticsRequest, ServerStatistics, encode_dataclass
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import Subscriber, DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
class ChatUserSession:
def router_factory(self):
router = RequestRouter()
@router.channel('statistics')
async def send_statistics():
class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):
def __init__(self, session: UserSessionData):
super().__init__()
self._session = session
self._requested_statistics = ServerStatisticsRequest()
def cancel(self):
self._sender.cancel()
def subscribe(self, subscriber: Subscriber):
super().subscribe(subscriber)
subscriber.on_subscribe(self)
self._sender = asyncio.create_task(self._statistics_sender())
async def _statistics_sender(self):
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = new_statistics_data(self._requested_statistics)
self._subscriber.on_next(dataclass_to_payload(next_message))
except Exception:
logging.error('Statistics', exc_info=True)
def on_next(self, value: Payload, is_complete=False):
request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data)))
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
if request.ids is not None:
self._requested_statistics.ids = request.ids
if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds
response = StatisticsChannel(self._session)
return response, response
Lines 16-57 defines an endpoint for sending statistics to the client. It uses the request-channel request type, which will allow the client to both receive the server statistics, and update the server as to which statistics it wants to receive.
Client side
On the client side we will add the methods to access the new server side functionality:
send_statistics
listen_for_statistics
import resource
from shared import ServerStatistics, ClientStatistics
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload
class ChatClient:
async def send_statistics(self):
memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
payload = Payload(encode_dataclass(ClientStatistics(memory_usage=memory_usage)),
metadata=composite(route('statistics')))
await self._rsocket.fire_and_forget(payload)
The send_statistics
uses a fire-and-forget request (Line 15) to send statistics to the server. This request does not receive a response,
so does not wait for confirmation that the payload was delivered, as it is not critical information (at least for this tutorial).
Next we will request statistics from the server. First we will define a handler to listen on the channel request and control it:
import json
from asyncio import Event
from datetime import timedelta
from typing import List
from examples.tutorial.step6.models import ServerStatistics, ServerStatisticsRequest, dataclass_to_payload
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
class StatisticsHandler(DefaultPublisher, DefaultSubscriber, DefaultSubscription):
def __init__(self):
super().__init__()
self.done = Event()
def on_next(self, value: Payload, is_complete=False):
statistics = ServerStatistics(**json.loads(utf8_decode(value.data)))
print(statistics)
if is_complete:
self.done.set()
def cancel(self):
self.subscription.cancel()
def set_requested_statistics(self, ids: List[str]):
self._subscriber.on_next(dataclass_to_payload(ServerStatisticsRequest(ids=ids)))
def set_period(self, period: timedelta):
self._subscriber.on_next(
dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds()))))
Next we will use this new handler in the ChatClient
:
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload
class ChatClient:
def listen_for_statistics(self) -> StatisticsHandler:
self._statistics_subscriber = StatisticsHandler()
self._rsocket.request_channel(Payload(metadata=composite(
route('statistics')
)), publisher=self._statistics_subscriber).subscribe(self._statistics_subscriber)
return self._statistics_subscriber
def stop_listening_for_statistics(self):
self._statistics_subscriber.cancel()
Finally, let's try out this new functionality in the client:
async def statistics_example(user1):
await user1.send_statistics()
statistics_control = user1.listen_for_statistics()
await asyncio.sleep(5)
statistics_control.set_requested_statistics(['users'])
await asyncio.sleep(5)
user1.stop_listening_for_statistics()
Call this new method from the client main
method.