Streams

The best way to read about streams is to run the Jupyter notebook, see IoTPy/Streams.ipynb.

A stream is a list of arbitrary length which can be modified only by appending values to the tail of the list. A stream array can also be treated as a NumPy array to which rows can be appended at the end.

Stream creation

s = Stream(name=‘temperature’, initial_value=[20, 21, 20])

creates a stream with the specified name and initial value. Both the name and initial value can be omitted. The default name is “None” and the default initial value is the empty list.

Appending and extending a stream You can append or extend a stream in the same way as for a list:

s.append(value)
s.extend(a_list)

NumPy: Stream Arrays

A stream whose elements are NumPy objects is a StreamArray. An element of a StreamArray is a NumPy type. The default type is a float.

Creating a stream array

We introduce the idea with an example. The statement:

s = StreamArray(name='s', dimension=3, dtype=int)

creates a stream s where s[i] is a NumPy array consisting of an unbounded number of rows and 3 (i.e. dimension) columns and where the elements of the array are of type int. An item appended to this stream must be an array consisting of 3 integers; appending such a 3-element array appends a single row containing the 3 integers to the tail of s. Stream s can be extended by an int array consisting of an arbitrary number of rows and 3 columns; doing so adds the array to the tail of s. We describe appending and extending streams later in this section.

The parameters of StreamArray are: name, dimension, dtype, initial_value, and num_in_memory

  • name is optional and is a string. The default is ‘no_name’.

  • dimension is optional and is the dimension of elements of the stream array. The default is 0.

  • dtype is optional and the type of the rows of the stream array. The default is float.

  • initial_value is optional and is the initial value of the stream array. The default is None.

  • num_in_memory is optional and can be ignored for the time being. It is used for memory management.

When dimension is 0

The dimension parameter can be a non-negative integer or a tuple or list. If dimension is 0 then each element of the stream array belongs to type dtype. In this case, think of the stream array as a 1-D array of unbounded length with elements of type dtype. For example:

t = StreamArray()

makes t a stream array where t is effectively an array of unbounded size where each element of t is a float.

When dimension is a positive integer

If dimension is a positive integer then each element of the stream is a 1-D array whose length is dimension. Stream array s, the first example, is an instance of such a dimension. Another example is,

u = StreamArray(name='u', dimension=2, dtype=float)

makes u a stream array called u. Think of u as an array with an unbounded number of rows where each row of u is an array consisting of 2 floats.

When dimension is a tuple or list

Each element of the tuple must be a positive integer. Think of the stream array as having an unbounded number of rows where each row is an N-dimensional array where N is the length of the tuple. The lengths of the N-dimensional array are given by the tuple. For example,

v = StreamArray(dimension=(3,4), dtype=int)

makes v a stream array where each row of v is a 3 x 4 array of integers. For example, an element of v could be the array a:

a = np.array([
        [0, 1, 2, 3],
        [4, 5, 6, 7],
        [8, 9, 10, 11]])

appending a numpy array to a stream array

You can append an array to a stream provided that they have the same dimensions. Here are examples of appending to stream arrays created in the previous paragraph.

Append a singleton array of float to stream array t

t.append(np.array(1.0))

Append a 1-D array of size 3 of ints to stream array s

a = np.zeros(3, dtype=int)
s.append(a)

makes a the NumPy array: np.array([ 0,  0,  0]) and appends the array a to stream array s.

Append a 1-D array of size 2 of floats to stream array u

u.append(np.array([0.0, 1.0])

Append a 3 x 4 array of integers to stream array v

v.append(np.array([
        [0, 1, 2, 3],
        [4, 5, 6, 7],
        [8, 9, 10, 11]]))

extending a stream array

You can extend a stream array by an array consisting of multiple rows provided that the dimensions of rows of the array and the stream are identical.

Extend stream array t by an array of floats

t.extend(np.array([2.0, 3.0]))

The statement is equivalent to:

t.append(np.array(2.0))
t.append(np.array(3.0))

If t was np.array([1.0]) before the above statements are executed, then its value after the statement will be np.array([1.0, 2.0, 3.0]).

Extend stream array s by an array of arrays

An example of an array consisting of 2 rows where each row consists of 3 ints is b:

b = np.array([[1, 2, 3], [4, 5, 6]])
s.extend(b)

The above statement is equivalent to:

s.append(np.array([1, 2, 3])
s.append(np.array([4, 5, 6])

If s is np.array([[0, 0, 0]]) before the above statements are executed, then after the statement is executed its content will be:

np.array([[0, 0, 0],
          [1, 2, 3],
          [4, 5, 6]])

Extend stream array u by an array of arrays

u.extend(np.array([[2.0,3.0], [4.0,5.0], [6.0,7.0]]))

Recall that u can be treated as an array with an arbitrary number of rows and 2 columns, and where the elements of u are floats. The above statement extends u by an array with 3 rows and 2 columns where the elements are floats.

Extend stream array v

v.extend(np.array([
   [[12, 13, 14, 15],[16, 17, 18, 19], [20, 21, 22, 23]],
   [[24, 25, 26, 27], [28, 29, 30, 31], [32, 33, 34, 35]]]))

Recall that v can be treated as an array with an arbitrary number of rows where each row is an array of dimension (3, 4) and where the elements of v are ints. The above statement extends v by two rows, the first of which is:

[[12, 13, 14, 15],[16, 17, 18, 19], [20, 21, 22, 23]]

and the second is

[[24, 25, 26, 27], [28, 29, 30, 31], [32, 33, 34, 35]]

user-defined types

An example of a user-defined type is:

txyz_dtype = np.dtype([('time','int'), ('data', '3float')])

An example of an object, c, of this type is created by:

c = np.array((1, [0.0, 1.0, 2.0]), dtype=txyz_dtype)

Then, c[‘time’] is np.array(1), and c['data'] is np.array([ 0., 1., 2.]

Creating a stream array with user-defined types

y = StreamArray(dimension=0, dtype=txyz_dtype)

creates a stream array, y, whose elements are of type txyz_dtype. Think of y as an array with an arbitrary number of rows where each row is an array of type txyz_dtype. Initially the stream is empty and its associated array has zero rows.

Appending to a stream array with user-defined types

We can append c to the stream array,

y.append(c)

If y was empty before the above statement was executed, then after the statement terminates the value of y is an array consisting of a single element c, i.e. y’s value is

np.array([(1, [0., 1., 2.])], dtype=txyz_dtype))

EXTending a stream array with user-defined types

d = np.array([
   (2, [3., 4., 5.]), (3, [6., 7., 8.])], dtype=txyz_dtype)

creates an array d with two elements of type txyz_dtype. We can extend y with d:

y.extend(d)

and at this point y’s value is:

np.array([
   (1, [0., 1., 2.]), 
   (2, [3., 4., 5.]), 
   (3, [6., 7., 8.])], 
   dtype=txyz_dtype))

You can also create stream arrays with user defined types and arbitrary dimensions.

Examples

Example: Operator on Stream Arrays

def test_plus_operator_with_arrays():
    x = StreamArray(dimension=2, dtype=int)
    y = StreamArray(dimension=2, dtype=int)
    z = x + y
    A = np.arange(6).reshape((3, 2))
    B = np.arange(100, 110).reshape((5, 2))
    x.extend(A)
    y.extend(B)
    run()
    assert np.array_equal(recent_values(z), np.array([
        [100, 102], [104, 106], [108, 110]]))

    C = np.arange(6, 12).reshape((3, 2))
    x.extend(C)
    run()
    assert np.array_equal(recent_values(z), np.array([
        [100, 102], [104, 106], [108, 110],
        [112, 114], [116, 118]]))

In the example, at the first execution of run(), stream-array x is extended by an array with 3 rows and 2 columns, and y is extended by an array with 5 rows and 2 columns. So, z becomes a stream array with 2 columns, and 3 rows. Then, at the next execution of run(), x is extended by another array with 3 rows and 2 columns while y remains unchanged. Now, x has more rows to add to y, and so z becomes an array with 5 rows.

Example: Operation between a Stream and a Scalar

def test_f_mul_with_stream_array():
    # Declare a stream array x
    x = StreamArray(dimension=2, dtype=int)
    
    # Create a stream array y where y = x * 2
    y = f_mul(x, 2)
    
    A = np.array([[1, 10], [2, 20], [3, 30]])
    x.extend(A)
    run()
    assert np.array_equal(
        recent_values(y),
        [[2, 20], [4, 40], [6, 60]])
    
    x.append(np.array([4, 40]))
    run()
    assert np.array_equal(
        recent_values(y),
        [[2, 20], [4, 40], [6, 60], [8, 80]])

Next, let’s look at agents: ways of converting functions on objects such as integers into functions on streams of such objects: See Agents.