Reactivex
Up until now we only used the core RSocket python library. We will now simplify some code using the Reactivex integration. Well show an implementation using version 4. The implementation with version 3 mostly requires some module import changes.
We will assume basic knowledge of ReactiveX and will not go into detail regarding the specifics of that library.
This section will not update all the methods, but will instead give examples for each type. For a complete code of the ReactiveX version of the code, see Here
Server side
On the server side, the response, stream and channel request types can be simplified using the python ReactiveX library.
Handler interface
The handler class needs to implement ReactivexHandler
instead of RequestHandler
. Use the
reactivex_handler_factory
to pass the handler to the RSocketServer
init argument.
When using routing, use the same RequestRouter
instance, but the return values from the registered methods must
match the return values specified on ReactivexHandler
Below is the new session initialization code:
from rsocket.reactivex.reactivex_handler_adapter import reactivex_handler_factory
def session(*connection):
RSocketServer(TransportTCP(*connection),
handler_factory=reactivex_handler_factory(handler_factory),
fragment_size_bytes=1_000_000)
Request response
For response endpoints, return any Observable
with a single value, or a reactivex.empty()
Below is the modified upload endpoint, which has no response:
import reactivex
from reactivex import Observable
@router.response('file.upload')
async def upload_file(payload: Payload, composite_metadata: CompositeMetadata) -> Observable:
chat_data.files[get_file_name(composite_metadata)] = payload.data
return reactivex.empty()
You can return a single value using reactivex.just
. See the download endpoint example:
import reactivex
@router.response('file.download')
async def download_file(composite_metadata: CompositeMetadata) -> reactivex.Observable:
file_name = get_file_name(composite_metadata)
return reactivex.just(
Payload(chat_data.files[file_name],
composite(metadata_item(ensure_bytes(file_name), chat_filename_mimetype)))
)
Streaming
Python ReactiveX does not natively support any backpressure mechanism. You can either use native observables, which will have backpressure at the RSocket library level, or wrap them with helpers which will provide backpressure at the application level.
Direct observable
An example with a simple observable for the channels endpoint:
import reactivex
@router.stream('channels')
async def get_channels() -> Observable:
return reactivex.from_iterable(
(Payload(ensure_bytes(channel)) for channel in chat_data.channel_messages.keys()))
Observable from asyncio.Queue
An example of sending the values from a Queue using some helper methods:
from typing import Callable
from reactivex import Observable, Subject, operators
from rsocket.reactivex.back_pressure_publisher import from_observable_with_backpressure, observable_from_queue
from shared import dataclass_to_payload
@router.stream('messages.incoming')
async def messages_incoming() -> Callable[[Subject], Observable]:
return from_observable_with_backpressure(
lambda backpressure: observable_from_queue(
self._session.messages, backpressure
).pipe(
operators.map(dataclass_to_payload)
)
)
Observable from async generator
An example of an Observable
from an async generator:
from typing import Callable
from reactivex import Observable, operators, Subject
from rsocket.frame_helpers import ensure_bytes
from rsocket.payload import Payload
from rsocket.reactivex.back_pressure_publisher import (from_observable_with_backpressure,
observable_from_async_generator)
@router.stream('files')
async def get_file_names() -> Callable[[Subject], Observable]:
async def generator():
for file_name in chat_data.files.keys():
yield file_name
return from_observable_with_backpressure(
lambda backpressure: observable_from_async_generator(generator(), backpressure).pipe(
operators.map(lambda file_name: Payload(ensure_bytes(file_name)))
))
Channel
Below is an example of a reactivex backed channel response:
import asyncio
import logging
from reactivex import operators, Observer
from rsocket.reactivex.back_pressure_publisher import from_observable_with_backpressure, observable_from_async_generator
@router.channel('statistics')
async def send_statistics() -> ReactivexChannel:
def on_next(value: Payload):
request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data)))
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
if request.ids is not None:
self._session.requested_statistics.ids = request.ids
if request.period_seconds is not None:
self._session.requested_statistics.period_seconds = request.period_seconds
async def statistics_generator():
while True:
try:
await asyncio.sleep(self._session.requested_statistics.period_seconds)
yield new_statistics_data(self._session.requested_statistics)
except Exception:
logging.error('Statistics', exc_info=True)
return ReactivexChannel(
from_observable_with_backpressure(
lambda backpressure: observable_from_async_generator(
statistics_generator(), backpressure
).pipe(
operators.map(dataclass_to_payload)
)),
Observer(on_next=on_next),
limit_rate=2)
Client side
Client adapter
On the client side, the ReactiveXClient
adapter is used to get Observable
s from the requests.
Request response
Below is an example of a request-response using the ReactiveX client adapter:
from examples.tutorial.reactivex.models import chat_filename_mimetype
from reactivex import operators
from rsocket.extensions.helpers import composite, route, metadata_item
from rsocket.frame_helpers import ensure_bytes
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient
async def download(self, file_name):
request = Payload(metadata=composite(
route('file.download'),
metadata_item(ensure_bytes(file_name), chat_filename_mimetype))
)
return await ReactiveXClient(self._rsocket).request_response(request).pipe(
operators.map(lambda _:_.data),
operators.last()
)
Line 15 wraps the existing rsocket client in self._rsocket with the ReactiveXClient adapter. As a result, the request_response method now returns an Observable. Although the Observable potentially represents a stream of more than one item, this response will contain at most one Payload. The Observable can be piped to operators which extract the data (Line 16) and get the single item from the stream (Line 17).
Streaming
Below is an example of the list_files method:
from typing import List
from reactivex import operators
from rsocket.extensions.helpers import composite, route
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient
async def list_files(self) -> List[str]:
request = Payload(metadata=composite(route('files')))
return await ReactiveXClient(self._rsocket).request_stream(request).pipe(
operators.map(lambda _: utf8_decode(_.data)),
operators.to_list()
)
Line 12 wraps the existing rsocket client in self._rsocket with the ReactiveXClient adapter. As a result, the request_stream method now returns an Observable. This provides a stream of Payloads, which can be piped to operators which extract the file names (Line 13) and collects them to a list (Line 14).
Channel
from asyncio import Queue
from datetime import timedelta
class StatisticsControl:
def __init__(self):
self.queue = Queue()
def set_requested_statistics(self, ids: List[str]):
self.queue.put_nowait(dataclass_to_payload(ServerStatisticsRequest(ids=ids)))
def set_period(self, period: timedelta):
self.queue.put_nowait(
dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds()))))
from reactivex import operators
def listen_for_statistics(self) -> StatisticsControl:
def print_statistics(value: bytes):
statistics = ServerStatistics(**json.loads(utf8_decode(value)))
print(f'users: {statistics.user_count}, channels: {statistics.channel_count}')
control = StatisticsControl()
async def listen_for_statistics():
await ReactiveXClient(self._rsocket).request_channel(
Payload(encode_dataclass(ServerStatisticsRequest(period_seconds=2)),
metadata=composite(
route('statistics')
)),
observable=from_observable_with_backpressure(
lambda backpressure: observable_from_queue(control.queue, backpressure))
).pipe(
operators.do_action(on_next=lambda value: print_statistics(value.data),
on_error=lambda exception: print(exception)))
self._statistics_task = asyncio.create_task(listen_for_statistics())
return control