obci.core package¶
Subpackages¶
Submodules¶
obci.core.asyncio_task_manager module¶
-
class
obci.core.asyncio_task_manager.
AsyncioTaskManager
(asyncio_loop: typing.Union[asyncio.base_events.BaseEventLoop, NoneType] = None) → None[source]¶ Bases:
object
AsyncioTaskManager class is used to manage a set of tasks.
Tasks are created using create_task function.
Examples
Can be used as context manager:
with AsyncioTaskManager() as mgr: pass
Or as async context manager:
async with AsyncioTaskManager() as mgr: pass
Parameters: asyncio_loop – existing message loop or None if new message loop should be created -
create_task
(coro: coroutine) → typing.Union[asyncio.futures.Future, concurrent.futures._base.Future][source]¶ Create a new task and return Future object.
New task will be added to an internal tasks list. When task finishes or raises exception or is cancelled it will be automatically removed from that list. When AsyncioTaskManager is asked to close it will cancel all tasks on that list.
Note
Can be called from any thread or/and from any coroutine.
-
static
new_event_loop
() → asyncio.base_events.BaseEventLoop[source]¶ When AsyncioTaskManager runs with self.owns_asyncio_loop == True this function is called to create a new asyncio event loop.
Default implementation returns
asyncio.new_event_loop()
.Returns: event loop object
-
owns_asyncio_loop
¶ bool – True if owns asyncio message loop.
If this instance owns message loop, it will be closed and destroyed on AsyncioTaskManager‘s shutdown.
-
obci.core.broker module¶
-
class
obci.core.broker.
Broker
(rep_urls: typing.List[str], xpub_urls: typing.List[str], xsub_urls: typing.List[str], asyncio_loop: typing.Union[zmq.asyncio.ZMQEventLoop, NoneType] = None, zmq_context: typing.Union[zmq.asyncio.Context, NoneType] = None, zmq_io_threads: int = 1, hwm: int = 1000, msg_proxy_io_threads: int = 1, msg_proxy_hwm: int = 1000) → None[source]¶ Bases:
obci.core.zmq_asyncio_task_manager.ZmqAsyncioTaskManager
,obci.core.message_handler_mixin.MessageHandlerMixin
Broker is as essential component of OpenBCI experiment. It consists of REP socket, XPUB/XSUB message proxy and internal peer (with ID ‘0’). Every peer connects to broker on initialization and registers. Broker also acts as a message proxy and/or router between peers.
Parameters: - rep_urls – list of URLs to bind REP socket to
- xpub_urls – list of URLs to bind message proxy XPUB socket to
- xsub_urls – list of URLs to bind message proxy XSUB socket to
- asyncio_loop – existing message loop to use or None if new message loop is requested
- zmq_context – existing ZMQ asyncio context or None if new context is requested
- zmq_io_threads – number of ZMQ I/O threads to use in broker
- hwm – ZMQ High Water Mark for broker
- msg_proxy_io_threads – number of ZMQ I/O threads to use in message proxy
- msg_proxy_hwm – ZMQ High Water Mark for message proxy
-
class
obci.core.broker.
MsgProxy
(xpub_urls: typing.List[str], xsub_urls: typing.List[str], io_threads: int = 1, hwm: int = 1000) → None[source]¶ Bases:
object
Message proxy is an integral part of Broker. It routes messages from multiple publishers to multiple subscribers.
MsgProxy opens an XSUB socket, an XPUB socket, and binds them to specified IP addresses and ports. Then, all peers connect to the proxy, instead of to each other. By using such pattern it becomes trivial to add more subscribers or publishers.
Parameters: - xpub_urls – list of URLs to bind XPUB socket to
- xsub_urls – list of URLs to bind XSUB socket to
- io_threads – size of the ZMQ threads pool to handle I/O operations
- hwm – High Water Mark set on all ZMQ sockets
obci.core.message_handler_mixin module¶
-
class
obci.core.message_handler_mixin.
MessageHandlerMixin
→ None[source]¶ Bases:
object
Implements common message handling interface used by Peer and Broker classes.
-
handle_message
(msg: obci.core.messages.Message) → typing.Union[obci.core.messages.Message, NoneType][source]¶ Called by message dispatching loop when new message arrives.
Parameters: msg – message to handle Returns: value returned by handler Raises: KeyError
– when handler for msg.type is not registered
-
register_message_handler
(msg_type: str, handler: typing.Union[typing.Callable[[obci.core.messages.Message], typing.Union[obci.core.messages.Message, NoneType]], typing.Callable[[obci.core.messages.Message], coroutine]]) → None[source]¶ Register handler function to be called when message with msg_type arrives.
Parameters: - msg_type – Message type string.
- handler – Function called when new message arrives.
-
obci.core.message_statistics module¶
-
class
obci.core.message_statistics.
MsgPerfStats
(interval: float, name: str = '') → None[source]¶ Bases:
object
MsgPerfStats receives messages with msg function and periodically prints some statistical information.
Parameters: - interval – how often statistics will be printed
- name – name of this counter
-
interval
¶ How often message statistics will be printed to stdout. Specified in seconds.
Note
Interval is checked only in self.msg function, this class doesn’t use its own timer.
obci.core.peer module¶
-
exception
obci.core.peer.
MultiplePeersAvailable
(peers: typing.List[str], *args, **kwargs)[source]¶ Bases:
Exception
Raised in Peer.query method when more that one peer can answer specified query. Caller must decide which peer to ask and reissue query by calling Peer.query method with initial_peer parameter set to one of supplied peer.
Parameters: peers – list of peers
-
class
obci.core.peer.
Peer
(urls: typing.Union[str, obci.core.peer.PeerInitUrls], peer_id: typing.Union[str, NoneType] = None, asyncio_loop: typing.Union[zmq.asyncio.ZMQEventLoop, NoneType] = None, zmq_context: typing.Union[zmq.asyncio.Context, NoneType] = None, zmq_io_threads: int = 1, hwm: int = 1000) → None[source]¶ Bases:
obci.core.zmq_asyncio_task_manager.ZmqAsyncioTaskManager
,obci.core.message_handler_mixin.MessageHandlerMixin
Base peer class. All peers derive from this class.
Parameters: - urls – string or PeerInitUrls with initial bootstrap addresses
- peer_id – globally unique identifier
- asyncio_loop – existing ZMQ asyncio message loop or None if loop is requested
- zmq_context – existing ZMQ asyncio context or None if new context is requested
- zmq_io_threads – number of ZMQ I/O threads
- hwm – ZMQ high water mark
-
classmethod
add_arguments
(parser: argparse.ArgumentParser) → argparse.ArgumentParser[source]¶ Add command line args recognized by this class when using create_peer method.
-
classmethod
create_peer
(argv: typing.List[str]) → obci.core.peer.Peer[source]¶ Parse supplied argv and create new Peer instance.
-
id
¶ str – Unique identification string.
Read only property. Can be set only when creating a new peer instance.
-
initialization_finished
() → None[source]¶ Run when initialization finished and all message sending mechanisms are available to use.
-
query
(query_type: str, query_params: typing.Union[dict, NoneType] = None, initial_peer: typing.Union[str, NoneType] = None) → typing.Union[dict, str, int, float, NoneType, typing.List[typing.Any]][source]¶ Send query message to Broker (or any other peer in initial_peer is specified).
Returned value can be any JSON-serializable object.
Return type: Parameters: Returns: query response
-
query_async
(query_type: str, query_params: typing.Union[dict, NoneType] = None, initial_peer: typing.Union[str, NoneType] = None) → typing.Union[dict, str, int, float, NoneType, typing.List[typing.Any]][source]¶ Async version of
Peer.query()
.
-
register_query_handler
(msg_type: str, handler: typing.Union[typing.Callable[[obci.core.messages.Message], obci.core.messages.Message], typing.Callable[[obci.core.messages.Message], coroutine]]) → None[source]¶ Register callback handler for specified query type.
handler function must return a valid Message object.
Parameters: - msg_type (
str
) – query type - handler (
Union
) – function to execute when specified query is received
- msg_type (
-
register_query_handler_async
(msg_type: str, handler: typing.Union[typing.Callable[[obci.core.messages.Message], obci.core.messages.Message], typing.Callable[[obci.core.messages.Message], coroutine]]) → None[source]¶ Async version of
Peer.register_query_handler_async()
.
-
remove_filter
(msg_type: str, msg_subtype: typing.Union[str, NoneType] = None) → None[source]¶ Unsubscribe for messages with msg_type message type.
Peer must be initialized to use this function.
Parameters: - msg_type –
- msg_subtype –
-
send_broker_message
(msg: obci.core.messages.Message, timeout: float = 5.0) → obci.core.messages.Message[source]¶ Send message to Broker and return answer.
Return type: Parameters: - msg (
Message
) – message object to send - timeout (
float
) – timeout in seconds
Returns: response message
- msg (
-
send_message
(msg: obci.core.messages.Message) → None[source]¶ Send broadcast message.
Parameters: msg ( Message
) – message object to send
-
send_message_to_peer
(url: str, msg: obci.core.messages.Message, timeout: float = 5.0) → obci.core.messages.Message[source]¶ Send message to specified peer and return answer.
Return type: Parameters: - url (
str
) – peer’s REP socket URL - msg (
Message
) – message object to send - timeout (
float
) – timeout in seconds
Returns: response message
- url (
-
set_filter
(msg_type: str, msg_subtype: typing.Union[str, NoneType] = None) → None[source]¶ Subscribe for messages with msg_type message type.
Peer must be initialized to use this function.
Parameters: - msg_type –
- msg_subtype –
-
unregister_query_handler
(msg_type: typing.Union[str, NoneType] = None) → None[source]¶ Unregister callback handler for specified query type or for all query types if msg_type is None.
Parameters: msg_type ( Union
) – query type
-
unregister_query_handler_async
(msg_type: typing.Union[str, NoneType] = None) → None[source]¶ Async version of
Peer.unregister_query_handler()
.
-
class
obci.core.peer.
PeerInitUrls
[source]¶ Bases:
obci.core.peer.PeerInitUrls
List of initial URL adresses.
Parameters: Create new instance of PeerInitUrls(pub_urls, rep_urls, broker_rep_url)
obci.core.zmq_asyncio_task_manager module¶
-
class
obci.core.zmq_asyncio_task_manager.
ZmqAsyncioTaskManager
(asyncio_loop: typing.Union[zmq.asyncio.ZMQEventLoop, NoneType] = None, zmq_context: typing.Union[zmq.asyncio.Context, NoneType] = None, zmq_io_threads: int = 1) → None[source]¶ Bases:
obci.core.asyncio_task_manager.AsyncioTaskManager
Adds ZMQ context management to AsyncioTaskManager.
Parameters: - asyncio_loop – existing message loop or None if new message loop should be created
- zmq_context – existing asyncio ZMQ context or None if new context should be created
- zmq_io_threads – number if ZMQ I/O threads
-
static
new_event_loop
() → zmq.asyncio.ZMQEventLoop[source]¶ Overloaded function to create ZMQEventLoop instances.
Returns: new instance if ZMQEventLoop
-
owns_zmq_context
¶ True if ZMQ context is owned by this object.
Module contents¶
This module contains core modules for OpenBCI framework such as Broker, Peer and Message.
-
obci.core.
OBCI_DEBUG
= False¶ bool – Set to True if OBCI_DEBUG environment variable is defined.
If enabled sets root logger level to DEBUG and enables all warnings generated using warnings module.