COUNTING ITEMS IN STREAMS

This section shows how to use IoTPy to wrap code for terminating functions to count items in streams. Examples of such applications are:

Misra-Gries Algorithm: Identifying frequent items

Given an input stream of arbitrary items, this agent outputs a stream of candidates for the most frequent items in the input stream. The agent has a parameter, k, where if any item occurs more than N/k times in the first N items of the input stream then that item will appear in the N-th element of the output stream. The elements of the output stream are pairs (keys, counts) where keys is a list of k -1 items from the input stream or a special symbol None, and counts is a list of k -1 nonnegative integers where the number of times keys[i] appears in the first N elements of the input stream is at most counts[i]. The output stream could contain only keys because keys contains the candidates for the most frequent items; however, counts is provided to see how the algorithm operates.

For example, if the input stream is [‘A’, ‘A’, ‘B’, ‘C’, ‘C’, ‘C’, …..], and k = 3, so that the N-th value of keys contains all items in the input stream that appear more than N/3 times, then the output stream is:

[((‘A’, None), (1, 0)), ((‘A’, None), (2, 0)), ((‘A’, ‘B’), (2, 1)), ((‘A’, None), (1, 0)),

((‘A’, ‘C’), (1, 1)), ((‘A’, ‘C’), (1, 2)), ….]

(Note that keys in the N-th element of the output stream may contain items that do not appear more than N/k times in the first N elements of the input stream. For example, ‘B’ appears in keys in the 3rd element of the output stream but ‘B’ does not appear more than 3/3 times in the first 3 elements of the input stream. Since ‘A’ appears more than 1/3 times in the first element of the input stream, it appears in the first element of the output stream. Likewise, since ‘A’ appears more than 2/3 times in the first two elements of the input stream, it appears in the second element of the output stream.

You can download the code by clicking on IoTPy/examples/Counting/Misra_Gries.py. The code is a straightforward use of map_element with state. The code was written by Atishay Jain with Mani Chandy.

Heavy HitteRS in streams

Install PyProbables to use this code. You can download the code by clicking on: IoTPy/examples/Counting/heavy_hitters.py

The code creates an agent with a single input stream and a single output stream. It outputs estimates of the most frequent items in the input stream when the input stream length is a multiple of a parameter called the window_size. The code is a straightforward application of the map_window agent. In the code, heavy_hitters_object is an instance of the HeavyHitters class in PyProbables.

import copy
from probables import (HeavyHitters)

def heavy_hitters_stream(in_stream, out_stream, window_size,
                         heavy_hitters_object):
    def f(window):
        for element in window:
            heavy_hitters_object.add(element)
        return copy.copy(heavy_hitters_object.heavy_hitters)
    map_window(f, in_stream, out_stream, window_size, step_size=window_size)

An example of using heavy_hitters_stream is shown below.

heavy_hitters_object = HeavyHitters(width=1000, depth=5)
x = Stream('input')
y = Stream('output')
window_size = 4
heavy_hitters_stream(x, y, window_size, heavy_hitters_object)

If, for example, x is ['a', 'a', 'a', 'b', 'a', 'b', 'c', 'a', 'b', 'c', 'b', 'b' …..] then y will become:

[{'a': 3, 'b': 1}, {'a': 5, 'c': 1, 'b': 2}, {'a': 5, 'c': 2, 'b': 5}]

Bloom Filters and Count Min Sketch in streams

You can download the code by clicking on:

IoTPy/examples/Counting/membership_in_streams.py

It is a straightforward application of the map_element agent to the BloomFilter and CountMinSketch classes in PyProbables to create an agent with a single input stream and a single output stream. The input stream contains names of operations such as add, remove and check on an instance of the BloomFilter or CountMinSketch classes and the output stream contains results of the operations. An example of an input stream is:

(‘add’, ‘Google’), (‘add’, ‘Amazon’), (‘add’, ‘Google’), (check, ‘Google’)

If the agent is passed an instance of the BloomFilter class then the output will be:

(‘Google’, True)

 because ‘Google’ has appeared in the input stream and if the agent is passed an instance of the CountMinSketch class then the output will be:

(‘Google’, 2)

because ‘Google’ appeared twice in the input stream.

The code is a direct implementation of map_element.

    def func(element):
        # Each element of the input stream is assumed to be a
        # pair: function_name and a value.
        function_name, value = element
        if function_name == 'add':
            membership_object.add(value)
            return _no_value
        elif function_name == 'remove':
            membership_object.remove(value)
            return _no_value
        elif function_name == 'check':
            return (value, membership_object.check(value))
        else:
            raise ValueError
        
    map_element(func, in_stream, out_stream)