Kosh Operators¶
This notebook introduces Kosh's operators. Unlike transformers, which act on a feature itself, operators allow post-processing of data coming from different features, for example adding two features together. Either from the same source or not.
Kosh operators will receive the input features as Python's *args.
Operators inputs can be features coming straight from the loader, possibly processed by a(many) transformer(s) and/or coming from another operator.
While one could be be getting each feature individually and then use a raw function on them, operators offer many advantages.
- When mixing data from multiple sources, and possibly transformers, Kosh will automatically determine which loaders to use and which output format is required from each loader (and transformers) in order for all data to be compatible when passed to the operator.
- Operators can be built (
__getitem__function see this notebook ) to read in only the amount of data needed.
Easiest way: Decorators¶
As with transformers one can use Python decorators to quickly convert an existing function.
Let's import some modules, create a store and a dataset to play with.
import kosh
import numpy as np
import h5py
import sys, os
store = kosh.connect("operators_demo.sql", delete_all_contents=True)
ds = store.create()
ds.associate("../tests/baselines/node_extracts2/node_extracts2.hdf5", mime_type="hdf5")
m1 = ds["node/metrics_0"]
print(m1[:][:])
m2 = ds["node/metrics_2"]
print(m2[:][:])
/g/g20/moreno45/Projects/ASCAML/kosh/kosh/utils.py:522: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81. import pkg_resources
extract numpy ['numpy'] [[74.60042 22.704462 81.75976 43.019024 90.3619 27.78305 71.98507 38.78283 31.862976 12.7631855 94.52985 74.529434 18.101988 57.22014 50.838238 75.56943 21.334723 63.617054 ] [30.224789 70.80611 62.686962 19.330027 81.621056 93.60426 21.645191 63.31401 92.55467 90.84677 27.292467 14.005975 49.63301 85.57087 9.917352 58.027737 69.95087 5.07952 ]] extract numpy ['numpy'] [[24.176632 28.35887 57.926807 88.42995 43.800083 59.017242 22.848253 7.5056286 2.5399094 6.2492366 46.997864 60.64453 30.870817 66.92705 46.292072 27.467634 84.07651 68.11991 ] [73.90602 55.195995 84.13312 79.93733 13.419014 60.481445 64.483665 9.53269 56.463535 92.742775 88.28038 16.180855 4.254545 9.790927 67.85503 1.1167012 63.09269 49.717033 ]]
Let's assuming you have a function that adds up your features for numpy arrays. Let's make it a Kosh operator.
Note: numpy_operators will declare the operator's types to be {'numpy': ["numpy",]}. See later for declaring your custom types.
@kosh.numpy_operator
def Add(*inputs):
out = inputs[0][:]
for input_ in inputs[1:]:
out += input_[:]
return out
add = Add(m1, m2)
print(add[:])
extract numpy ['numpy'] extract numpy ['numpy'] [[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]]
Similarly to loaders and transformers, operators must declare the mime_types they can accept as inputs and the mime_types they export these inputs to. Where the transformers process the feature via their transform function, operators must define their operate function.
At the moment it is expected that all inputs must be from the same mime_type.
Let's create an operator while defninig these mime_types.
@kosh.typed_operator({"numpy":["numpy",]})
def Add(*inputs):
out = inputs[0][:]
for input_ in inputs[1:]:
out += input_[:]
return out
add = Add(m1, m2)
print(add[:])
extract numpy ['numpy'] extract numpy ['numpy'] [[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]]
Again just like loaders and transformers, operator can return the output in multiple format, in this case the decorated function must accept the "format" key argument.
@kosh.typed_operator_with_kwargs({"numpy":["numpy","str"]})
def Add(*inputs, **kwargs):
out = inputs[0][:]
for input_ in inputs[1:]:
out += input_[:]
if kwargs["format"] == "numpy":
return out
elif kwargs["format"] == "str":
return str(out)
add = Add(m1, m2)
print(add(format="numpy"), type(add(format="numpy")))
print(add(format="str"), type(add(format="str")))
extract numpy ['numpy'] extract numpy ['numpy'] extract numpy ['numpy'] extract numpy ['numpy'] [[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]] <class 'numpy.ndarray'> extract numpy ['numpy'] extract numpy ['numpy'] extract numpy ['numpy'] extract numpy ['numpy'] [[ 98.777054 51.06333 139.68657 131.44897 134.16199 86.80029 94.83332 46.28846 34.402885 19.012423 141.52771 135.17397 48.972805 124.14719 97.13031 103.03706 105.41123 131.73697 ] [104.13081 126.002106 146.82008 99.26736 95.04007 154.08571 86.12886 72.8467 149.0182 183.58954 115.572845 30.186829 53.887558 95.36179 77.77238 59.14444 133.04355 54.796555]] <class 'str'>
A warning about indices¶
Kosh decorators will provide an operator that will pass through the indices a user asked for (for efficiency). See this notebook for cases where this will return the wrong answer (e.g a operator that flips the order of the input).
Operator from scratch¶
In this example we will define a simple Add operator that will add all the inputs it receives.
class Add(kosh.KoshOperator):
types = {"numpy" : ["numpy",]} # Our operator accepts numpy arrays and outputs numpy arrays
def operate(self, *inputs, ** kargs):
# *inputs are the input received from their predecessors in the execution graph
# It is important to define **kargs as the function will receive `format=some_format`
out = np.array(inputs[0])
for input_ in inputs[1:]:
out += np.array(input_)
return out
Important points:
- The
operatefunction will receive the desired output format viaformat=output_formatso it must declare**kargs - inputs are sent via
*inputs
f1 = ds["cycles"]
add = Add(f1, f1)
print(add[:])
extract numpy ['numpy'] extract numpy ['numpy'] [22 16]
As previously mentioned we can also pass the feature through a transformer first:
class Twice(kosh.transformers.KoshTransformer):
types = {"numpy":["numpy",]}
def transform(self, input, format):
return np.array(input) * 2.
twice = Twice()
f1 = ds.get_execution_graph("cycles", transformers=[twice,])
f2 = ds["cycles"]
add2 = Add(f1, f2)
print(add2[:])
extract numpy ['numpy'] extract numpy ['numpy'] [33. 24.]
We can also have an operator as an input to another, and mix and match this with regular features.
add3 = Add(add2, add)
print(add3[:])
extract numpy ['numpy'] extract numpy ['numpy'] extract numpy ['numpy'] extract numpy ['numpy'] [55. 40.]
Sometimes these can get complicated and hard to follow. You can draw the execution graph to see if everything is happening as you would expect.
kosh.utils.draw_execution_graph(add3.execution_graph(), png_name="exec_graph.png", output_format="numpy")
<Figure size 432x288 with 0 Axes>
Lastly it is worth noting that transformers and operators can implement their own __getitem__ function to subset the data. See this notebook for more in this.
Helping the Operators¶
describing the entries¶
Operators can get inputs from many sources. Sometimes it can be helpful to get an idea of what is likely coming in.
the describe_entries function returns a generator of describe_feature for each entry feature. It will crawl the graph backward when it encounters transformers or operators. If a loader did not implement describe_feature an empty dictionary will be used instead.
@kosh.operators.numpy_operator
def my_operator(*data, **kargs):
return numpy.concatenate(*data)
@kosh.transformers.numpy_transformer
def my_transformer(data):
return data
store = kosh.connect("demo_entries.sql", delete_all_contents=True)
ds = store.create()
ds.associate("sample_files/run_000.hdf5","hdf5")
c = ds.get_execution_graph("cycles", transformers=my_transformer)
double = my_operator(c, c)
triple = my_operator(c, c, c)
combine = my_operator(double, triple)
mixed = my_operator(combine, combine)
list(mixed.describe_entries())
[{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]},
{'size': (2,),
'format': 'hdf5',
'type': dtype('int64'),
'dimensions': [{'name': ''}]}]
Requesting Input Datasets¶
Similarly to their describe_entries() and the loaders' get_requestor() operators have a get_input_datasets() function that will return the datasets contributing to the inputs.
m1 = ds["node/metrics_1"]
m2 = ds["node/metrics_2"]
m3 = ds["node/metrics_3"]
m1_2 = Add(m1, m2)
m_f = Add(m3, m1_2)
[x for x in m_f.get_input_datasets()]
[KOSH DATASET id: 25ec9ee74c48465d92b94165c34711c9 name: Unnamed Dataset creator: moreno45 --- Attributes --- creator: moreno45 name: Unnamed Dataset --- Associated Data (1)--- Mime_type: hdf5 /g/g20/moreno45/Projects/ASCAML/kosh/examples/sample_files/run_000.hdf5 ( 6ccb39c0ba004f93b786b15774d381a8 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---, KOSH DATASET id: 25ec9ee74c48465d92b94165c34711c9 name: Unnamed Dataset creator: moreno45 --- Attributes --- creator: moreno45 name: Unnamed Dataset --- Associated Data (1)--- Mime_type: hdf5 /g/g20/moreno45/Projects/ASCAML/kosh/examples/sample_files/run_000.hdf5 ( 6ccb39c0ba004f93b786b15774d381a8 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---, KOSH DATASET id: 25ec9ee74c48465d92b94165c34711c9 name: Unnamed Dataset creator: moreno45 --- Attributes --- creator: moreno45 name: Unnamed Dataset --- Associated Data (1)--- Mime_type: hdf5 /g/g20/moreno45/Projects/ASCAML/kosh/examples/sample_files/run_000.hdf5 ( 6ccb39c0ba004f93b786b15774d381a8 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---]
Which in this case is not necessarily useful since all data come from the same dataset.
Requesting input loaders¶
In the example above it might be more useful to access the loaders themselves (so we get the feature name and mime_type)
[(x.uri, x.feature, x._mime_type) for x in m_f.get_input_loaders()]
[('/g/g20/moreno45/Projects/ASCAML/kosh/examples/sample_files/run_000.hdf5',
'node/metrics_3',
'hdf5'),
('/g/g20/moreno45/Projects/ASCAML/kosh/examples/sample_files/run_000.hdf5',
'node/metrics_1',
'hdf5'),
('/g/g20/moreno45/Projects/ASCAML/kosh/examples/sample_files/run_000.hdf5',
'node/metrics_2',
'hdf5')]
Of course these can be used by the operators themselves.
class Add(kosh.KoshOperator):
types = {"numpy" : ["numpy",]} # Our operator accepts numpy arrays and outputs numpy arrays
def operate(self, *inputs, ** kargs):
# *inputs are the input received from their predecessors in the execution graph
# It is important to define **kargs as the function will receive `format=some_format`
datasets = list(self.get_input_datasets())
print("ADD INPUT DS:", [x.id for x in datasets])
loaders = self.get_input_loaders()
print("INPUT FEATURES:", [x.feature for x in loaders])
out = np.array(inputs[0])
for input_ in inputs[1:]:
out += np.array(input_)
return out
mf = Add(ds["node/metrics_1"], ds["node/metrics_2"])[:]
extract numpy ['numpy'] extract numpy ['numpy'] ADD INPUT DS: ['25ec9ee74c48465d92b94165c34711c9', '25ec9ee74c48465d92b94165c34711c9'] INPUT FEATURES: ['node/metrics_1', 'node/metrics_2']
L-Norm Operator¶
Kosh also comes with some built in operators. Here we will use the L-Norm Operator with some PyDV Curve Objects by associating an Ultra file to the dataset.
:returns:
- x (:py:class:`ndaray`) - The shared domain x-axis
- y0 (:py:class:`ndarray`) - The first shared domain interpolated y-axis
- y1 (:py:class:`ndarray`) - The second shared domain interpolated y-axis
- LNorm (:py:class:`float`) - The LNorm of the overlapping data points
# hdf5
h5file = h5py.File('myfile.hdf5', 'w')
grp = h5file.create_group("my/values")
x0_dset = np.linspace(-14, 14)
y0_dset = np.sin(x0_dset)
x1_dset = np.linspace(-15, 15)
y1_dset = np.cos(x1_dset)
# XY Paired
grp.create_dataset("my_dataset_xy", data=np.array([x0_dset, y0_dset]))
# XY Separated
grp.create_dataset("my_dataset_x0", data=x0_dset)
grp.create_dataset("my_dataset_y0", data=y0_dset)
grp.create_dataset("my_dataset_x1", data=x1_dset)
grp.create_dataset("my_dataset_y1", data=y1_dset)
ds.associate('myfile.hdf5',
mime_type="hdf5",
absolute_path=False)
# ultra
ds.associate("my_ult_file.ult",
mime_type="ultra",
absolute_path=False)
# 2 2-D Arrays
x, y0, y1, LNormY = kosh.operators.KoshLNorm(ds["my/values/my_dataset_xy"][:],
ds['Gaussian (a: 5.0 w: 5.0 c: 0.0)'],
power=1, left=None, right=None, period=None)[:]
print(x)
# Only overlap points, so no left, right, or period params needed for `numpy.interp()`
x, y0, y1, LNormY = kosh.operators.KoshLNorm(ds['Gaussian (a: 5.0 w: 5.0 c: 0.0)'],
ds["my/values/my_dataset_xy"],
power=1, overlap_only=True)[:]
print(x)
# 4 1-D Arrays
x, y0, y1, LNormY = kosh.operators.KoshLNorm(ds["my/values/my_dataset_x0"],
ds["my/values/my_dataset_y0"],
ds["my/values/my_dataset_x1"],
ds["my/values/my_dataset_y1"],
power=1, left=None, right=None, period=None)[:]
print(x)
# Only overlap points, so no left, right, or period params needed for `numpy.interp()`
x, y0, y1, LNormY = kosh.operators.KoshLNorm(ds["my/values/my_dataset_x0"],
ds["my/values/my_dataset_y0"],
ds["my/values/my_dataset_x1"],
ds["my/values/my_dataset_y1"],
power=1, overlap_only=True)[:]
print(x)
extract numpy
['numpy']
(<HDF5 dataset "my_dataset_xy": shape (2, 50), type "<f8">,)
{0: {'start_nodes': [('hdf5', <kosh.loaders.hdf5.HDF5Loader object at 0x7fff0ee99be0>, 0.20309757352107882)], 'end_nodes': [('numpy', None, 0.20309757352107882)]}, 1: {'start_nodes': [('ultra', <kosh.loaders.UltraLoader.UltraLoader object at 0x7fff0ee99880>, 0.40571904914574564)], 'end_nodes': [('curves', None, 0.40571904914574564), ('curves/pydv', None, 0.40571904914574564), ('curves/pdv', None, 0.40571904914574564), ('pydv', None, 0.40571904914574564), ('pdv', None, 0.40571904914574564), ('dict', None, 0.40571904914574564), ('numpy', None, 0.40571904914574564)]}}
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) Input In [14], in <module> 23 ds.associate("my_ult_file.ult", 24 mime_type="ultra", 25 absolute_path=False) 27 # 2 2-D Arrays ---> 28 x, y0, y1, LNormY = kosh.operators.KoshLNorm(ds["my/values/my_dataset_xy"], 29 ds['Gaussian (a: 5.0 w: 5.0 c: 0.0)'], 30 31 power=1, left=None, right=None, period=None)[:] 33 print(x) 35 # Only overlap points, so no left, right, or period params needed for `numpy.interp()` File ~/Projects/ASCAML/kosh/kosh/exec_graphs/core.py:317, in KoshExecutionGraph.__getitem__(self, key) 310 def __getitem__(self, key): 311 """Very bare bone get item function 312 It is highly recommended to re-implement this. 313 Calls traverse then __getitem__ on the result of traverse 314 :param key: key to access 315 :type key: object (usually int, slice or str) 316 """ --> 317 return self.traverse(__getitem_key__=key) File ~/Projects/ASCAML/kosh/kosh/exec_graphs/core.py:371, in KoshExecutionGraph.traverse(self, format, *args, **kargs) 367 out.add_edge(node, pth[i + 1]) 369 # We can now travel back the pth to obtain 370 # the data. --> 371 return self._operate(out, end_node, format, **kargs) File ~/Projects/ASCAML/kosh/kosh/exec_graphs/core.py:425, in KoshExecutionGraph._operate(self, graph, node, output_format, **kargs) 423 kargs2["__getitem_key__"] = slice(None, None, None) 424 if do_res: --> 425 res = prev[1]._operate(graph, prev, node[0], **kargs2) 426 if new_keys is None and getitem_key != slice(None, None, None): 427 res = res[getitem_key] File ~/Projects/ASCAML/kosh/kosh/exec_graphs/core.py:434, in KoshExecutionGraph._operate(self, graph, node, output_format, **kargs) 432 return inputs[0] 433 if hasattr(self, "operate_"): --> 434 out = self.operate_(*inputs, format=output_format) 435 return out 436 elif hasattr(self, "transform_"): File ~/Projects/ASCAML/kosh/kosh/operators/core.py:85, in KoshOperator.operate_(self, *inputs, **kargs) 83 self.save(signature, result) 84 else: ---> 85 result = self.operate(*inputs, format=format) 87 return result File ~/Projects/ASCAML/kosh/kosh/operators/koshMaths.py:103, in KoshLNorm.operate(self, *inputs, **kargs) 100 features[1]['y-axis'] = inputs[3] 102 else: --> 103 raise ValueError("Inputs must be two 2-D Arrays [x0, y0], [xy, y1] or four 1-D arrays x0, y0, x1, y1") 105 # Acquire common domain 106 if self.options.get("overlap_only"): ValueError: Inputs must be two 2-D Arrays [x0, y0], [xy, y1] or four 1-D arrays x0, y0, x1, y1
