This section describes stream programs for filters. We begin with FIR and IIR (Finite and Infinite Impulse Response) bandpass filters. We first describe the code using windowing agents (map_window and merge_window or their decorators @map_w and @merge_w). These examples illustrate the use of stream arrays and NumPy. They also show how IoTPy allows you to reuse terminating functions from popular libraries to build non-terminating functions on endless streams.

Later, to illustrate the use of classes to store historical data, we give the program using the map_element agent or the @map_e decorator. Using windows gives shorter, more elegant code than code that uses map_element and which reads only one element of an input stream at a time. The code using map_element is provided merely as an illustration.

The code for this section was written by Deepak Narayanan and Mani Chandy, Caltech.

BANDPASS FIR FILTER OF STREAMING DATA

You can download this code by clicking on IoTPy/examples/signal_processing_examples/dsp_filters.py.

Let y be the output stream and x the input stream. The formula for a Finite Impulse Response (FIR) filter is found in https://en.wikipedia.org/wiki/Finite_impulse_response and is:

y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]

where b is a parameter of the filter and its length is M+1. So, y[n] is the dot product of a window into x with the reverse of b.

def bandpass_FIR(in_stream, b):
    return dot_product(in_stream, reverse(b))

dot_product returns the dot product of the second argument, reverse(b), on sliding windows of in_stream. The code for dot_product is:

def dot_product(in_stream, vector):
    @fmap_w
    def f(window, vector): return np.dot(window, vector)
    return f(in_stream, window_size=len(vector), step_size=1,
      vector=vector)

BANDPASS IIR FILTER OF STREAMING DATA

You can download this code by clicking on: IoTPy/examples/signal_processing_examples/dsp_filters.py.

The formula for an Infinite Impulse Response (IIR) filter is found in https://en.wikipedia.org/wiki/Infinite_impulse_response and is:

y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[P]*x[n-P] -
   (a[1]*y[n-1] + a[2]*y[n-2] + ... + a[Q]*y[n-Q]

We assume that the lengths of a and b are equal, and we call the length M. Function f in the code below returns y[n] where x_window is x[n-M, …, n] and y_window is y[n-M, …, n].

def bandpass_IIR(in_stream, b, a):
    M = len(b)
    out_stream = Stream(initial_value = np.zeros(M))
    in_stream.extend(np.zeros(M))
    @merge_w
    def f(windows, b, a):
        x_window, y_window = windows
        return np.dot(b, x_window) - np.dot(a, y_window[1:])
    f(in_streams=[in_stream, out_stream], out_stream=out_stream,
      window_size=M, step_size=1, b=reverse(b), a=reverse(a[1:]))
    return out_stream

EXAMPLES OF CLASSES THAT STORE HISTORICAL DATA

Next we give examples of code in which the historical stream data are saved as attributes of classes.

class BP_IIR(object):
    """
    Bandpass IIR Filter

    Parameters
    ----------
    a, b: list of float
      Parameters that define an IIR filter

    Attributes
    ----------
    x, y: array of float
      Local variables of IIR calculations.

    """
    def __init__(self, a, b):
        assert len(b) == len(a)
        self.b = np.array(b)
        self.a = np.array(a)
        self.N = len(a)
        self.x = np.zeros(self.N)
        self.y = np.zeros(self.N)

    def filter_sample(self, sample):
        """
        This is the standard IIR calculation.
        Parameters
        ----------
        sample: float or int
          The next element of the stream.
        """
        # Shift x and y to the right by 1
        self.x[1:] = self.x[:- 1]
        self.y[1:] = self.y[:-1]
        # Update x[0] and y[0]
        self.x[0] = sample
        self.y[0] = self.a[0] * self.x[0]
        self.y[0] += sum(self.a[1:]*self.x[1:] - self.b[1:]*self.y[1:])
        return self.y[0]

    def filter_stream(self, in_stream, out_stream):
        """
        Filters the input stream to get the output stream
        using filter_sample().

        """
        map_element(self.filter_sample, in_stream, out_stream)

The filter_sample() code is a straightforward implementation of the equation for the operation of a bandpass filter when a new sample arrives on the stream. The filter_stream() function uses the code to obtain a function that filters its input stream to produce a filtered output stream.

Next, we define a function with parameters that include the low cutoff and high cutoff frequencies and the order of the bandpass filter, and which filters an input stream to produce an output stream. This function uses the butter_bandpass function from scipy.signal to get the parameters b, a used in the filter.

def bandpass_filter_stream(in_stream, out_stream, lowcut, highcut, fs, order):
    """
    Parameters
    ----------
    in_stream, out_stream: Stream
       The input and output streams of the agent
    low_cut, highcut: int or float
       The lower and upper frequencies of the bandpass filter.
    fs: int or float
       The sample rate in number per second.
    order: int, positive
        The order of the filter.

    """
    # butter_bandpass is imported from scipy.sigal
    b, a = butter_bandpass(lowcut, highcut, fs, order)
    bp = BP_IIR(b, a)
    bp.filter_stream(in_stream, out_stream)