COUNTING ITEMS IN STREAMS
This section shows how to use IoTPy to wrap code for terminating functions to count items in streams. Examples of such applications are:
the Misra-Gries algorithm, see https://en.wikipedia.org/wiki/Misra%E2%80%93Gries_summary, http://www.cs.utexas.edu/users/misra/Notes.dir/HeavyHitters.pdf, and https://www.sciencedirect.com/science/article/pii/0167642382900120
Count Min Sketch, Bloom Filters and Heavy Hitters operations on streams are direct implementations of the map_element and map_window agents to classes from PyProbables, which is an open-source rich collection of programs operating on probabilistic data structures. See https://pyprobables.readthedocs.io/en/latest/index.html.
Misra-Gries Algorithm: Identifying frequent items
Given an input stream of arbitrary items, this agent outputs a stream of candidates for the most frequent items in the input stream. The agent has a parameter, k, where if any item occurs more than N/k times in the first N items of the input stream then that item will appear in the N-th element of the output stream. The elements of the output stream are pairs (keys, counts) where keys is a list of k -1 items from the input stream or a special symbol None, and counts is a list of k -1 nonnegative integers where the number of times keys[i] appears in the first N elements of the input stream is at most counts[i]. The output stream could contain only keys because keys contains the candidates for the most frequent items; however, counts is provided to see how the algorithm operates.
For example, if the input stream is [‘A’, ‘A’, ‘B’, ‘C’, ‘C’, ‘C’, …..], and k = 3, so that the N-th value of keys contains all items in the input stream that appear more than N/3 times, then the output stream is:
[((‘A’, None), (1, 0)), ((‘A’, None), (2, 0)), ((‘A’, ‘B’), (2, 1)), ((‘A’, None), (1, 0)),
((‘A’, ‘C’), (1, 1)), ((‘A’, ‘C’), (1, 2)), ….]
(Note that keys in the N-th element of the output stream may contain items that do not appear more than N/k times in the first N elements of the input stream. For example, ‘B’ appears in keys in the 3rd element of the output stream but ‘B’ does not appear more than 3/3 times in the first 3 elements of the input stream. Since ‘A’ appears more than 1/3 times in the first element of the input stream, it appears in the first element of the output stream. Likewise, since ‘A’ appears more than 2/3 times in the first two elements of the input stream, it appears in the second element of the output stream.
You can download the code by clicking on IoTPy/examples/Counting/Misra_Gries.py. The code is a straightforward use of map_element with state. The code was written by Atishay Jain with Mani Chandy.
Heavy HitteRS in streams
Install PyProbables to use this code. You can download the code by clicking on: IoTPy/examples/Counting/heavy_hitters.py
The code creates an agent with a single input stream and a single output stream. It outputs estimates of the most frequent items in the input stream when the input stream length is a multiple of a parameter called the window_size. The code is a straightforward application of the map_window agent. In the code, heavy_hitters_object is an instance of the HeavyHitters class in PyProbables.
import copy from probables import (HeavyHitters) def heavy_hitters_stream(in_stream, out_stream, window_size, heavy_hitters_object): def f(window): for element in window: heavy_hitters_object.add(element) return copy.copy(heavy_hitters_object.heavy_hitters) map_window(f, in_stream, out_stream, window_size, step_size=window_size)
An example of using heavy_hitters_stream is shown below.
heavy_hitters_object = HeavyHitters(width=1000, depth=5) x = Stream('input') y = Stream('output') window_size = 4 heavy_hitters_stream(x, y, window_size, heavy_hitters_object)
If, for example, x is ['a', 'a', 'a', 'b', 'a', 'b', 'c', 'a', 'b', 'c', 'b', 'b' …..] then y will become:
[{'a': 3, 'b': 1}, {'a': 5, 'c': 1, 'b': 2}, {'a': 5, 'c': 2, 'b': 5}]
Bloom Filters and Count Min Sketch in streams
You can download the code by clicking on:
IoTPy/examples/Counting/membership_in_streams.py
It is a straightforward application of the map_element agent to the BloomFilter and CountMinSketch classes in PyProbables to create an agent with a single input stream and a single output stream. The input stream contains names of operations such as add, remove and check on an instance of the BloomFilter or CountMinSketch classes and the output stream contains results of the operations. An example of an input stream is:
(‘add’, ‘Google’), (‘add’, ‘Amazon’), (‘add’, ‘Google’), (check, ‘Google’)
If the agent is passed an instance of the BloomFilter class then the output will be:
(‘Google’, True)
because ‘Google’ has appeared in the input stream and if the agent is passed an instance of the CountMinSketch class then the output will be:
(‘Google’, 2)
because ‘Google’ appeared twice in the input stream.
The code is a direct implementation of map_element.
def func(element): # Each element of the input stream is assumed to be a # pair: function_name and a value. function_name, value = element if function_name == 'add': membership_object.add(value) return _no_value elif function_name == 'remove': membership_object.remove(value) return _no_value elif function_name == 'check': return (value, membership_object.check(value)) else: raise ValueError map_element(func, in_stream, out_stream)