Kosh Operators¶
This notebook introduces Kosh's operators. Unlike transformers, which act on a feature itself, operators allow post-processing of data coming from different features, for example adding two features together. Either from the same source or not.
Kosh operators will receive the input features as Python's *args
.
Operators inputs can be features coming straight from the loader, possibly processed by a(many) transformer(s) and/or coming from another operator.
While one could be be getting each feature individually and then use a raw function on them, operators offer many advantages.
- When mixing data from multiple sources, and possibly transformers, Kosh will automatically determine which loaders to use and which output format is required from each loader (and transformers) in order for all data to be compatible when passed to the operator.
- Operators can be built (
__getitem__
function see this notebook ) to read in only the amount of data needed.
Easiest way: Decorators¶
As with transformers
one can use Python decorators
to quickly convert an existing function.
Let's import some modules, create a store and a dataset to play with.
import kosh
import numpy as np
import sys, os
store = kosh.connect("operators_demo.sql", delete_all_contents=True)
ds = store.create()
ds.associate("../tests/baselines/node_extracts2/node_extracts2.hdf5", mime_type="hdf5")
m1 = ds["node/metrics_0"]
print(m1[:][:])
m2 = ds["node/metrics_2"]
print(m2[:][:])
[[74.60042 22.704462 81.75976 43.019024 90.3619 27.78305 71.98507 38.78283 31.862976 12.7631855 94.52985 74.529434 18.101988 57.22014 50.838238 75.56943 21.334723 63.617054 ] [30.224789 70.80611 62.686962 19.330027 81.621056 93.60426 21.645191 63.31401 92.55467 90.84677 27.292467 14.005975 49.63301 85.57087 9.917352 58.027737 69.95087 5.07952 ]] [[24.176632 28.35887 57.926807 88.42995 43.800083 59.017242 22.848253 7.5056286 2.5399094 6.2492366 46.997864 60.64453 30.870817 66.92705 46.292072 27.467634 84.07651 68.11991 ] [73.90602 55.195995 84.13312 79.93733 13.419014 60.481445 64.483665 9.53269 56.463535 92.742775 88.28038 16.180855 4.254545 9.790927 67.85503 1.1167012 63.09269 49.717033 ]]
Let's assuming you have a function that adds up your features for numpy arrays. Let's make it a Kosh operator.
Note: numpy_operators
will declare the operator's types
to be {'numpy': ["numpy",]}
. See later for declaring your custom types
.
@kosh.numpy_operator
def Add(*inputs):
out = inputs[0][:]
for input_ in inputs[1:]:
out += input_[:]
return out
add = Add(m1, m2)
print(add[:])
[[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]]
Similarly to loaders and transformers, operators must declare the mime_types they can accept as inputs and the mime_types they export these inputs to. Where the transformers process the feature via their transform
function, operators must define their operate
function.
At the moment it is expected that all inputs must be from the same mime_type.
Let's create an operator while defninig these mime_types.
@kosh.typed_operator({"numpy":["numpy",]})
def Add(*inputs):
out = inputs[0][:]
for input_ in inputs[1:]:
out += input_[:]
return out
add = Add(m1, m2)
print(add[:])
[[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]]
Again just like loaders and transformers, operator can return the output in multiple format, in this case the decorated function must accept the "format" key argument.
@kosh.typed_operator_with_kwargs({"numpy":["numpy","str"]})
def Add(*inputs, **kwargs):
out = inputs[0][:]
for input_ in inputs[1:]:
out += input_[:]
if kwargs["format"] == "numpy":
return out
elif kwargs["format"] == "str":
return str(out)
add = Add(m1, m2)
print(add(format="numpy"), type(add(format="numpy")))
print(add(format="str"), type(add(format="str")))
[[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]] <class 'numpy.ndarray'> [[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]] <class 'str'>
A warning about indices¶
Kosh decorators will provide an operator that will pass through the indices a user asked for (for efficiency). See this notebook for cases where this will return the wrong answer (e.g a operator that flips the order of the input).
Operator from scratch¶
In this example we will define a simple Add operator that will add all the inputs it receives.
class Add(kosh.KoshOperator):
types = {"numpy" : ["numpy",]} # Our operator accepts numpy arrays and outputs numpy arrays
def operate(self, *inputs, ** kargs):
# *inputs are the input received from their predecessors in the execution graph
# It is important to define **kargs as the function will receive `format=some_format`
out = np.array(inputs[0])
for input_ in inputs[1:]:
out += np.array(input_)
return out
Important points:
- The
operate
function will receive the desired output format viaformat=output_format
so it must declare**kargs
- inputs are sent via
*inputs
f1 = ds["cycles"]
add = Add(f1, f1)
print(add[:])
[22 16]
As previously mentioned we can also pass the feature through a transformer first:
class Twice(kosh.transformers.KoshTransformer):
types = {"numpy":["numpy",]}
def transform(self, input, format):
return np.array(input) * 2.
twice = Twice()
f1 = ds.get_execution_graph("cycles", transformers=[twice,])
f2 = ds["cycles"]
add2 = Add(f1, f2)
print(add2[:])
[33. 24.]
We can also have an operator as an input to another, and mix and match this with regular features.
add3 = Add(add2, add)
print(add3[:])
[55. 40.]
Sometimes these can get complicated and hard to follow. You can draw the execution graph to see if everything is happening as you would expect.
kosh.utils.draw_execution_graph(add3.execution_graph(), png_name="exec_graph.png", output_format="numpy")
<Figure size 640x480 with 0 Axes>
Lastly it is worth noting that transformers and operators can implement their own __getitem__
function to subset the data. See this notebook for more in this.
Helping the Operators¶
describing the entries¶
Operators can get inputs from many sources. Sometimes it can be helpful to get an idea of what is likely coming in.
the describe_entries
function returns a generator of describe_feature
for each entry feature. It will crawl the graph backward when it encounters transformers or operators. If a loader did not implement describe_feature
an empty dictionary will be used instead.
@kosh.operators.numpy_operator
def my_operator(*data, **kargs):
return numpy.concatenate(*data)
@kosh.transformers.numpy_transformer
def my_transformer(data):
return data
store = kosh.connect("demo_entries.sql", delete_all_contents=True)
ds = store.create()
ds.associate("sample_files/run_000.hdf5","hdf5")
c = ds.get_execution_graph("cycles", transformers=my_transformer)
double = my_operator(c, c)
triple = my_operator(c, c, c)
combine = my_operator(double, triple)
mixed = my_operator(combine, combine)
list(mixed.describe_entries())
[{'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}, {'size': (2,), 'format': 'hdf5', 'type': dtype('int64'), 'dimensions': [{'name': ''}]}]
Requesting Input Datasets¶
Similarly to their describe_entries()
and the loaders' get_requestor()
operators have a get_input_datasets()
function that will return the datasets contributing to the inputs.
m1 = ds["node/metrics_1"]
m2 = ds["node/metrics_2"]
m3 = ds["node/metrics_3"]
m1_2 = Add(m1, m2)
m_f = Add(m3, m1_2)
[x for x in m_f.get_input_datasets()]
[KOSH DATASET id: a1b64a12f25441e487abf86571feb2c0 name: Unnamed Dataset creator: cdoutrix --- Attributes --- creator: cdoutrix name: Unnamed Dataset --- Associated Data (1)--- Mime_type: hdf5 /g/g19/cdoutrix/git/kosh/examples/sample_files/run_000.hdf5 ( 29e8e99cca9a42319a6552376f8ab1bc ) --- Ensembles (0)--- [] --- Ensemble Attributes --- , KOSH DATASET id: a1b64a12f25441e487abf86571feb2c0 name: Unnamed Dataset creator: cdoutrix --- Attributes --- creator: cdoutrix name: Unnamed Dataset --- Associated Data (1)--- Mime_type: hdf5 /g/g19/cdoutrix/git/kosh/examples/sample_files/run_000.hdf5 ( 29e8e99cca9a42319a6552376f8ab1bc ) --- Ensembles (0)--- [] --- Ensemble Attributes --- , KOSH DATASET id: a1b64a12f25441e487abf86571feb2c0 name: Unnamed Dataset creator: cdoutrix --- Attributes --- creator: cdoutrix name: Unnamed Dataset --- Associated Data (1)--- Mime_type: hdf5 /g/g19/cdoutrix/git/kosh/examples/sample_files/run_000.hdf5 ( 29e8e99cca9a42319a6552376f8ab1bc ) --- Ensembles (0)--- [] --- Ensemble Attributes --- ]
Which in this case is not necessarily useful since all data come from the same dataset.
Requesting input loaders¶
In the example above it might be more useful to access the loaders themselves (so we get the feature name and mime_type)
[(x.uri, x.feature, x._mime_type) for x in m_f.get_input_loaders()]
[('/g/g19/cdoutrix/git/kosh/examples/sample_files/run_000.hdf5', 'node/metrics_3', 'hdf5'), ('/g/g19/cdoutrix/git/kosh/examples/sample_files/run_000.hdf5', 'node/metrics_1', 'hdf5'), ('/g/g19/cdoutrix/git/kosh/examples/sample_files/run_000.hdf5', 'node/metrics_2', 'hdf5')]
Of course these can be used by the operators themselves.
class Add(kosh.KoshOperator):
types = {"numpy" : ["numpy",]} # Our operator accepts numpy arrays and outputs numpy arrays
def operate(self, *inputs, ** kargs):
# *inputs are the input received from their predecessors in the execution graph
# It is important to define **kargs as the function will receive `format=some_format`
datasets = list(self.get_input_datasets())
print("ADD INPUT DS:", [x.id for x in datasets])
loaders = self.get_input_loaders()
print("INPUT FEATURES:", [x.feature for x in loaders])
out = np.array(inputs[0])
for input_ in inputs[1:]:
out += np.array(input_)
return out
mf = Add(ds["node/metrics_1"], ds["node/metrics_2"])[:]
ADD INPUT DS: ['a1b64a12f25441e487abf86571feb2c0', 'a1b64a12f25441e487abf86571feb2c0'] INPUT FEATURES: ['node/metrics_1', 'node/metrics_2']