Distributed Applications
See IoTPy/IoTPy/multiprocessing/distributed.py.
You can build distributed applications by connecting IoTPy code to messaging frameworks. Here we describe connectors from IoTPy to Pika — Python code that implements a message broker called RabbitMQ. You can build connectors to other message brokers — such as ActiveMQ, and Kafka — by modifying the connector to Pika.
Sending messages from iotpy to a message broker
You can send messages from IoTPy to a message broker in two ways:
Publish stream: Use a sink agent that publishes a stream to the message broker
Publish queue: Use a thread that publishes messages that it gets from a queue.
PUBLISH stream: Copying a stream to a message broker
# Agent that copies a stream to a message broker def f(in_streams, out_streams): publisher = PikaPublisher( routing_key='temperature', exchange='publications', host='localhost') sink_list(publisher.publish_list, in_streams[0])
IoTPy/IoTPy/concurrency/ has a class called PikaPublisher. The parameters, routing_key, exchange and host, are described in the Pika documents. publish_list is a method of the PikaPublisher class; this method publishes a list on a message broker.
sink_list(publisher.publish_list, in_streams[0])
creates a sink agent which continuously gets new segments in_streams[0] and then publishes the segments by calling publish_list.
publish queue in a Thread
def publish_data_from_queue(q): publisher = PikaPublisher( routing_key='temperature', exchange='publications', host='localhost') while True: v = q.get() if v == '_finished': break else: publisher.publish_list([v])
Receiving Messages from a message broker
You receive messages by creating a source thread as illustrated in the following example which extends a stream called ‘x’ with published data.
# Source thread target for source stream named 'x'. def h(proc): def callback(ch, method, properties, body): proc.copy_stream(data=json.loads(body), stream_name='x') pika_subscriber = PikaSubscriber( callback, routing_key='temperature', exchange='publications', host='localhost') pika_subscriber.start()
PikaSubscriber is a class in IoTPy/IoTPy/concurrency/. The parameters, callback, routing_key, exchange and host are described in the Pika documents. The callback function is called when RabbitMQ detects that a message has been published with this routing key and exchange. This particular callback function merely extends the stream called ‘x’ with the message.