Skip to main content

Reactivex

Up until now we only used the core RSocket python library. We will now simplify some code using the Reactivex integration. Well show an implementation using version 4. The implementation with version 3 mostly requires some module import changes.

We will assume basic knowledge of ReactiveX and will not go into detail regarding the specifics of that library.

This section will not update all the methods, but will instead give examples for each type. For a complete code of the ReactiveX version of the code, see Here

Server side

On the server side, the response, stream and channel request types can be simplified using the python ReactiveX library.

Handler interface

The handler class needs to implement ReactivexHandler instead of RequestHandler. Use the reactivex_handler_factory to pass the handler to the RSocketServer init argument. When using routing, use the same RequestRouter instance, but the return values from the registered methods must match the return values specified on ReactivexHandler

Below is the new session initialization code:

from rsocket.reactivex.reactivex_handler_adapter import reactivex_handler_factory

def session(*connection):
RSocketServer(TransportTCP(*connection),
handler_factory=reactivex_handler_factory(handler_factory),
fragment_size_bytes=1_000_000)

Request response

For response endpoints, return any Observable with a single value, or a reactivex.empty()

Below is the modified upload endpoint, which has no response:

import reactivex
from reactivex import Observable

@router.response('file.upload')
async def upload_file(payload: Payload, composite_metadata: CompositeMetadata) -> Observable:
chat_data.files[get_file_name(composite_metadata)] = payload.data
return reactivex.empty()

You can return a single value using reactivex.just. See the download endpoint example:

import reactivex

@router.response('file.download')
async def download_file(composite_metadata: CompositeMetadata) -> reactivex.Observable:
file_name = get_file_name(composite_metadata)
return reactivex.just(
Payload(chat_data.files[file_name],
composite(metadata_item(ensure_bytes(file_name), chat_filename_mimetype)))
)

Streaming

Python ReactiveX does not natively support any backpressure mechanism. You can either use native observables, which will have backpressure at the RSocket library level, or wrap them with helpers which will provide backpressure at the application level.

Direct observable

An example with a simple observable for the channels endpoint:

import reactivex

@router.stream('channels')
async def get_channels() -> Observable:
return reactivex.from_iterable(
(Payload(ensure_bytes(channel)) for channel in chat_data.channel_messages.keys()))

Observable from asyncio.Queue

An example of sending the values from a Queue using some helper methods:

from typing import Callable
from reactivex import Observable, Subject, operators
from rsocket.reactivex.back_pressure_publisher import from_observable_with_backpressure, observable_from_queue
from shared import dataclass_to_payload

@router.stream('messages.incoming')
async def messages_incoming() -> Callable[[Subject], Observable]:
return from_observable_with_backpressure(
lambda backpressure: observable_from_queue(
self._session.messages, backpressure
).pipe(
operators.map(dataclass_to_payload)
)
)

Observable from async generator

An example of an Observable from an async generator:

from typing import Callable

from reactivex import Observable, operators, Subject
from rsocket.frame_helpers import ensure_bytes
from rsocket.payload import Payload
from rsocket.reactivex.back_pressure_publisher import (from_observable_with_backpressure,
observable_from_async_generator)

@router.stream('files')
async def get_file_names() -> Callable[[Subject], Observable]:
async def generator():
for file_name in chat_data.files.keys():
yield file_name

return from_observable_with_backpressure(
lambda backpressure: observable_from_async_generator(generator(), backpressure).pipe(
operators.map(lambda file_name: Payload(ensure_bytes(file_name)))
))

Channel

Below is an example of a reactivex backed channel response:

import asyncio
import logging
from reactivex import operators, Observer

from rsocket.reactivex.back_pressure_publisher import from_observable_with_backpressure, observable_from_async_generator

@router.channel('statistics')
async def send_statistics() -> ReactivexChannel:

def on_next(value: Payload):
request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data)))

logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')

if request.ids is not None:
self._session.requested_statistics.ids = request.ids

if request.period_seconds is not None:
self._session.requested_statistics.period_seconds = request.period_seconds

async def statistics_generator():
while True:
try:
await asyncio.sleep(self._session.requested_statistics.period_seconds)
yield new_statistics_data(self._session.requested_statistics)
except Exception:
logging.error('Statistics', exc_info=True)

return ReactivexChannel(
from_observable_with_backpressure(
lambda backpressure: observable_from_async_generator(
statistics_generator(), backpressure
).pipe(
operators.map(dataclass_to_payload)
)),
Observer(on_next=on_next),
limit_rate=2)

Client side

Client adapter

On the client side, the ReactiveXClient adapter is used to get Observables from the requests.

Request response

Below is an example of a request-response using the ReactiveX client adapter:

from examples.tutorial.reactivex.models import chat_filename_mimetype

from reactivex import operators
from rsocket.extensions.helpers import composite, route, metadata_item
from rsocket.frame_helpers import ensure_bytes
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient

async def download(self, file_name):
request = Payload(metadata=composite(
route('file.download'),
metadata_item(ensure_bytes(file_name), chat_filename_mimetype))
)

return await ReactiveXClient(self._rsocket).request_response(request).pipe(
operators.map(lambda _:_.data),
operators.last()
)

Line 15 wraps the existing rsocket client in self._rsocket with the ReactiveXClient adapter. As a result, the request_response method now returns an Observable. Although the Observable potentially represents a stream of more than one item, this response will contain at most one Payload. The Observable can be piped to operators which extract the data (Line 16) and get the single item from the stream (Line 17).

Streaming

Below is an example of the list_files method:

from typing import List

from reactivex import operators
from rsocket.extensions.helpers import composite, route
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_client import ReactiveXClient

async def list_files(self) -> List[str]:
request = Payload(metadata=composite(route('files')))

return await ReactiveXClient(self._rsocket).request_stream(request).pipe(
operators.map(lambda _: utf8_decode(_.data)),
operators.to_list()
)

Line 12 wraps the existing rsocket client in self._rsocket with the ReactiveXClient adapter. As a result, the request_stream method now returns an Observable. This provides a stream of Payloads, which can be piped to operators which extract the file names (Line 13) and collects them to a list (Line 14).

Channel

from asyncio import Queue
from datetime import timedelta

class StatisticsControl:
def __init__(self):
self.queue = Queue()

def set_requested_statistics(self, ids: List[str]):
self.queue.put_nowait(dataclass_to_payload(ServerStatisticsRequest(ids=ids)))

def set_period(self, period: timedelta):
self.queue.put_nowait(
dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds()))))
from reactivex import operators

def listen_for_statistics(self) -> StatisticsControl:
def print_statistics(value: bytes):
statistics = ServerStatistics(**json.loads(utf8_decode(value)))
print(f'users: {statistics.user_count}, channels: {statistics.channel_count}')

control = StatisticsControl()

async def listen_for_statistics():
await ReactiveXClient(self._rsocket).request_channel(
Payload(encode_dataclass(ServerStatisticsRequest(period_seconds=2)),
metadata=composite(
route('statistics')
)),
observable=from_observable_with_backpressure(
lambda backpressure: observable_from_queue(control.queue, backpressure))
).pipe(
operators.do_action(on_next=lambda value: print_statistics(value.data),
on_error=lambda exception: print(exception)))

self._statistics_task = asyncio.create_task(listen_for_statistics())

return control