obci.core package

Submodules

obci.core.asyncio_task_manager module

class obci.core.asyncio_task_manager.AsyncioTaskManager(asyncio_loop: typing.Union[asyncio.base_events.BaseEventLoop, NoneType] = None) → None[source]

Bases: object

AsyncioTaskManager class is used to manage a set of tasks.

Tasks are created using create_task function.

Examples

Can be used as context manager:

with AsyncioTaskManager() as mgr:
    pass

Or as async context manager:

async with AsyncioTaskManager() as mgr:
    pass
Parameters:asyncio_loop – existing message loop or None if new message loop should be created
async_shutdown() → None[source]

Can be called from coroutine running inside message loop.

create_task(coro: coroutine) → typing.Union[asyncio.futures.Future, concurrent.futures._base.Future][source]

Create a new task and return Future object.

New task will be added to an internal tasks list. When task finishes or raises exception or is cancelled it will be automatically removed from that list. When AsyncioTaskManager is asked to close it will cancel all tasks on that list.

Note

Can be called from any thread or/and from any coroutine.

static new_event_loop() → asyncio.base_events.BaseEventLoop[source]

When AsyncioTaskManager runs with self.owns_asyncio_loop == True this function is called to create a new asyncio event loop.

Default implementation returns asyncio.new_event_loop().

Returns:event loop object
owns_asyncio_loop

bool – True if owns asyncio message loop.

If this instance owns message loop, it will be closed and destroyed on AsyncioTaskManager‘s shutdown.

shutdown() → None[source]

Can be called from ANY thread, but NOT from event loop. It will block until all pending tasks are finished. Can be called multiple times.

exception obci.core.asyncio_task_manager.MessageLoopRunningException[source]

Bases: Exception

Raised when function requiring to be called from outside message loop was called from inside message loop.

obci.core.asyncio_task_manager.ensure_not_inside_msg_loop(func: typing.Callable[..., typing.Any]) → typing.Callable[..., typing.Any][source]

Decorator used by AsyncioTaskManager and its subclasses to annotate methods requiring to be called outside message loop.

obci.core.broker module

class obci.core.broker.Broker(rep_urls: typing.List[str], xpub_urls: typing.List[str], xsub_urls: typing.List[str], asyncio_loop: typing.Union[zmq.asyncio.ZMQEventLoop, NoneType] = None, zmq_context: typing.Union[zmq.asyncio.Context, NoneType] = None, zmq_io_threads: int = 1, hwm: int = 1000, msg_proxy_io_threads: int = 1, msg_proxy_hwm: int = 1000) → None[source]

Bases: obci.core.zmq_asyncio_task_manager.ZmqAsyncioTaskManager, obci.core.message_handler_mixin.MessageHandlerMixin

Broker is as essential component of OpenBCI experiment. It consists of REP socket, XPUB/XSUB message proxy and internal peer (with ID ‘0’). Every peer connects to broker on initialization and registers. Broker also acts as a message proxy and/or router between peers.

Parameters:
  • rep_urls – list of URLs to bind REP socket to
  • xpub_urls – list of URLs to bind message proxy XPUB socket to
  • xsub_urls – list of URLs to bind message proxy XSUB socket to
  • asyncio_loop – existing message loop to use or None if new message loop is requested
  • zmq_context – existing ZMQ asyncio context or None if new context is requested
  • zmq_io_threads – number of ZMQ I/O threads to use in broker
  • hwm – ZMQ High Water Mark for broker
  • msg_proxy_io_threads – number of ZMQ I/O threads to use in message proxy
  • msg_proxy_hwm – ZMQ High Water Mark for message proxy
class obci.core.broker.MsgProxy(xpub_urls: typing.List[str], xsub_urls: typing.List[str], io_threads: int = 1, hwm: int = 1000) → None[source]

Bases: object

Message proxy is an integral part of Broker. It routes messages from multiple publishers to multiple subscribers.

MsgProxy opens an XSUB socket, an XPUB socket, and binds them to specified IP addresses and ports. Then, all peers connect to the proxy, instead of to each other. By using such pattern it becomes trivial to add more subscribers or publishers.

Parameters:
  • xpub_urls – list of URLs to bind XPUB socket to
  • xsub_urls – list of URLs to bind XSUB socket to
  • io_threads – size of the ZMQ threads pool to handle I/O operations
  • hwm – High Water Mark set on all ZMQ sockets
shutdown() → None[source]

Shutdown message proxy. Release all associated resources.

class obci.core.broker.PeerInfo(peer_id: str, url: str)[source]

Bases: object

Used by Broker to hold information about connected peers.

Parameters:
  • peer_id – unique peer ID
  • url – URL of peer REP socket

obci.core.message_handler_mixin module

class obci.core.message_handler_mixin.MessageHandlerMixin → None[source]

Bases: object

Implements common message handling interface used by Peer and Broker classes.

handle_message(msg: obci.core.messages.Message) → typing.Union[obci.core.messages.Message, NoneType][source]

Called by message dispatching loop when new message arrives.

Parameters:msg – message to handle
Returns:value returned by handler
Raises:KeyError – when handler for msg.type is not registered
register_message_handler(msg_type: str, handler: typing.Union[typing.Callable[[obci.core.messages.Message], typing.Union[obci.core.messages.Message, NoneType]], typing.Callable[[obci.core.messages.Message], coroutine]]) → None[source]

Register handler function to be called when message with msg_type arrives.

Parameters:
  • msg_type – Message type string.
  • handler – Function called when new message arrives.
unregister_message_handler(msg_type: str) → None[source]

Unregister previously registered message handler.

Parameters:msg_type – Message type string.

obci.core.message_statistics module

class obci.core.message_statistics.MsgPerfStats(interval: float, name: str = '') → None[source]

Bases: object

MsgPerfStats receives messages with msg function and periodically prints some statistical information.

Parameters:
  • interval – how often statistics will be printed
  • name – name of this counter
interval

How often message statistics will be printed to stdout. Specified in seconds.

Note

Interval is checked only in self.msg function, this class doesn’t use its own timer.

msg(msg: obci.core.messages.Message) → None[source]

Called to count new message into statistics.

Parameters:msg (Message) – message to include into statistics
reset() → None[source]

Reset statistics.

obci.core.peer module

exception obci.core.peer.MultiplePeersAvailable(peers: typing.List[str], *args, **kwargs)[source]

Bases: Exception

Raised in Peer.query method when more that one peer can answer specified query. Caller must decide which peer to ask and reissue query by calling Peer.query method with initial_peer parameter set to one of supplied peer.

Parameters:peers – list of peers
class obci.core.peer.Peer(urls: typing.Union[str, obci.core.peer.PeerInitUrls], peer_id: typing.Union[str, NoneType] = None, asyncio_loop: typing.Union[zmq.asyncio.ZMQEventLoop, NoneType] = None, zmq_context: typing.Union[zmq.asyncio.Context, NoneType] = None, zmq_io_threads: int = 1, hwm: int = 1000) → None[source]

Bases: obci.core.zmq_asyncio_task_manager.ZmqAsyncioTaskManager, obci.core.message_handler_mixin.MessageHandlerMixin

Base peer class. All peers derive from this class.

Parameters:
  • urls – string or PeerInitUrls with initial bootstrap addresses
  • peer_id – globally unique identifier
  • asyncio_loop – existing ZMQ asyncio message loop or None if loop is requested
  • zmq_context – existing ZMQ asyncio context or None if new context is requested
  • zmq_io_threads – number of ZMQ I/O threads
  • hwm – ZMQ high water mark
classmethod add_arguments(parser: argparse.ArgumentParser) → argparse.ArgumentParser[source]

Add command line args recognized by this class when using create_peer method.

classmethod create_peer(argv: typing.List[str]) → obci.core.peer.Peer[source]

Parse supplied argv and create new Peer instance.

heartbeat() → None[source]

Periodically send HEARTBEAT messages.

id

str – Unique identification string.

Read only property. Can be set only when creating a new peer instance.

initialization_finished() → None[source]

Run when initialization finished and all message sending mechanisms are available to use.

query(query_type: str, query_params: typing.Union[dict, NoneType] = None, initial_peer: typing.Union[str, NoneType] = None) → typing.Union[dict, str, int, float, NoneType, typing.List[typing.Any]][source]

Send query message to Broker (or any other peer in initial_peer is specified).

Returned value can be any JSON-serializable object.

Return type:

Union

Parameters:
  • query_type (str) – query type
  • query_params (Union) – optional query parameters
  • initial_peer (Union) – if specified this peer will be asked instead of Broker
Returns:

query response

query_async(query_type: str, query_params: typing.Union[dict, NoneType] = None, initial_peer: typing.Union[str, NoneType] = None) → typing.Union[dict, str, int, float, NoneType, typing.List[typing.Any]][source]

Async version of Peer.query().

register_query_handler(msg_type: str, handler: typing.Union[typing.Callable[[obci.core.messages.Message], obci.core.messages.Message], typing.Callable[[obci.core.messages.Message], coroutine]]) → None[source]

Register callback handler for specified query type.

handler function must return a valid Message object.

Parameters:
  • msg_type (str) – query type
  • handler (Union) – function to execute when specified query is received
register_query_handler_async(msg_type: str, handler: typing.Union[typing.Callable[[obci.core.messages.Message], obci.core.messages.Message], typing.Callable[[obci.core.messages.Message], coroutine]]) → None[source]

Async version of Peer.register_query_handler_async().

remove_filter(msg_type: str, msg_subtype: typing.Union[str, NoneType] = None) → None[source]

Unsubscribe for messages with msg_type message type.

Peer must be initialized to use this function.

Parameters:
  • msg_type
  • msg_subtype
send_broker_message(msg: obci.core.messages.Message, timeout: float = 5.0) → obci.core.messages.Message[source]

Send message to Broker and return answer.

Return type:

Message

Parameters:
  • msg (Message) – message object to send
  • timeout (float) – timeout in seconds
Returns:

response message

send_message(msg: obci.core.messages.Message) → None[source]

Send broadcast message.

Parameters:msg (Message) – message object to send
send_message_to_peer(url: str, msg: obci.core.messages.Message, timeout: float = 5.0) → obci.core.messages.Message[source]

Send message to specified peer and return answer.

Return type:

Message

Parameters:
  • url (str) – peer’s REP socket URL
  • msg (Message) – message object to send
  • timeout (float) – timeout in seconds
Returns:

response message

set_filter(msg_type: str, msg_subtype: typing.Union[str, NoneType] = None) → None[source]

Subscribe for messages with msg_type message type.

Peer must be initialized to use this function.

Parameters:
  • msg_type
  • msg_subtype
unregister_query_handler(msg_type: typing.Union[str, NoneType] = None) → None[source]

Unregister callback handler for specified query type or for all query types if msg_type is None.

Parameters:msg_type (Union) – query type
unregister_query_handler_async(msg_type: typing.Union[str, NoneType] = None) → None[source]

Async version of Peer.unregister_query_handler().

class obci.core.peer.PeerInitUrls[source]

Bases: obci.core.peer.PeerInitUrls

List of initial URL adresses.

Parameters:
  • pub_urls (list) – list of PUB URL’s to bind to
  • rep_urls (list) – list of REP URL’s to bind to
  • broker_rep_url (str) – broker’s REP URL

Create new instance of PeerInitUrls(pub_urls, rep_urls, broker_rep_url)

exception obci.core.peer.QueryAnswerUnknown[source]

Bases: Exception

Raised when answer to query is unknown.

exception obci.core.peer.TooManyRedirectsException[source]

Bases: Exception

Raised when too many redirects occurred in Peer.query method.

obci.core.zmq_asyncio_task_manager module

class obci.core.zmq_asyncio_task_manager.ZmqAsyncioTaskManager(asyncio_loop: typing.Union[zmq.asyncio.ZMQEventLoop, NoneType] = None, zmq_context: typing.Union[zmq.asyncio.Context, NoneType] = None, zmq_io_threads: int = 1) → None[source]

Bases: obci.core.asyncio_task_manager.AsyncioTaskManager

Adds ZMQ context management to AsyncioTaskManager.

Parameters:
  • asyncio_loop – existing message loop or None if new message loop should be created
  • zmq_context – existing asyncio ZMQ context or None if new context should be created
  • zmq_io_threads – number if ZMQ I/O threads
static new_event_loop() → zmq.asyncio.ZMQEventLoop[source]

Overloaded function to create ZMQEventLoop instances.

Returns:new instance if ZMQEventLoop
owns_zmq_context

True if ZMQ context is owned by this object.

Module contents

This module contains core modules for OpenBCI framework such as Broker, Peer and Message.

obci.core.OBCI_DEBUG = False

bool – Set to True if OBCI_DEBUG environment variable is defined.

If enabled sets root logger level to DEBUG and enables all warnings generated using warnings module.