Kosh Transformers¶
This notebook introduces Kosh's transformers. Transformers allow data to be further post-processed after extraction from it's original URI.
Transformer allow for easy transformations such as sub-sampling to more complex operation such as data augmentation or detecting where data is valid or not.
Transformers can be chained. Each step can be cached. Kosh transformers also allow for caching. The default cache directory in stored in kosh.core.kosh_cache_dir
and points to: os.path.join(os.environ["HOME"], ".cache", "kosh")
.
Setting up the notebook¶
Let's import some modules and create simple loaders for ascii files
# import necessary modules
import kosh
import numpy
import time
import os
# Create a file to load in.
with open("kosh_transformers_chaining_example.ascii", "w") as f:
f.write("1 2. 3 4 5 6 7 8 9")
Now we need to create our custom loader
# A very basic loader
# this loader can read the *ascii* mime_type and return *numlist* as one of its output types
class StringsLoader(kosh.loaders.KoshLoader):
types ={"ascii": ["numlist", "a_format", "another_format"]} # mime_types and corresponding outpt formats
def extract(self):
"""The extract function
return a list of floats"""
time.sleep(2) # fake slow operation
with open(self.obj.uri) as f:
return [float(x) for x in f.read().split()]
def list_features(self):
# The only feature is "numbers"
return ["numbers",]
store = kosh.connect("transformers_example.sql", delete_all_contents=True)
dataset = store.create(name="test_transformer")
dataset.associate("kosh_transformers_chaining_example.ascii", mime_type="ascii")
dataset.associate("sample_files/run_000.hdf5", "hdf5")
# let's add our loader to the store
store.add_loader(StringsLoader)
# and print the features associated with this dataset
print(dataset.list_features())
print(dataset["node/metrics_0"][:][:])
['numbers', 'cycles', 'direction', 'elements', 'node', 'node/metrics_0', 'node/metrics_1', 'node/metrics_10', 'node/metrics_11', 'node/metrics_12', 'node/metrics_2', 'node/metrics_3', 'node/metrics_4', 'node/metrics_5', 'node/metrics_6', 'node/metrics_7', 'node/metrics_8', 'node/metrics_9', 'zone', 'zone/metrics_0', 'zone/metrics_1', 'zone/metrics_2', 'zone/metrics_3', 'zone/metrics_4'] [[74.60042 22.704462 81.75976 43.019024 90.3619 27.78305 71.98507 38.78283 31.862976 12.7631855 94.52985 74.529434 18.101988 57.22014 50.838238 75.56943 21.334723 63.617054 ] [30.224789 70.80611 62.686962 19.330027 81.621056 93.60426 21.645191 63.31401 92.55467 90.84677 27.292467 14.005975 49.63301 85.57087 9.917352 58.027737 69.95087 5.07952 ]]
Easiest: Using decorator¶
Let's say you have a function you would like to apply on your data as you extract it.
Kosh lets you easily wrap this function via decorators so you can use it within the Kosh workflow.
In the following examples our function will simply multiply every number by 10.
Numpy-based¶
If your function takes a numpy array as an input and outputs an numpy array simply use the @numpy_transformer decorator
@kosh.numpy_transformer
def ten_times(inputs):
return 10 * inputs[:]
print(dataset.get("node/metrics_0", transformers=[ten_times,])[:])
# or if the function was defined somewhere else
avg = kosh.numpy_transformer(numpy.average)
# We can chain transformers
print(dataset.get("node/metrics_0", transformers=[ten_times, avg]))
[[746.00415 227.04462 817.5976 430.19025 903.619 277.8305 719.8507 387.8283 318.62976 127.63185 945.29846 745.2943 181.01988 572.2014 508.3824 755.6943 213.34723 636.17053 ] [302.2479 708.06104 626.8696 193.30026 816.2106 936.0426 216.4519 633.14014 925.54675 908.4677 272.92468 140.05975 496.3301 855.7087 99.173515 580.27734 699.50867 50.795204]] 527.0764
Specific input/output types¶
It is possible that your function converts data from a different input mime_type than numpy
, for example our custom loader declares it can return numlist
. Similarly your transformer output mime_type can be something else than numpy
, e.g. pandas
or list
.
Similarly to KoshLoaders
, KoshTransformers
have a types
attribute, a dictionary, describing what it can take as input and for each of these what it can return.
Let's create a transformer than can convert these numlist to numpy arrays and multiply them by ten.
@kosh.typed_transformer({"numlist":["numpy",]})
def list_10_times_numpy(inputs):
return 10.*numpy.array(inputs)
print("Raw :", dataset["numbers"][:])
print("Transformed:", dataset.get("numbers", transformers=[list_10_times_numpy,]))
Multiple formats:¶
Finally there are cases where your function can return the data in various formats, transformers will pass the format
key argument, here again there's a dedicated decorator for this kind of functions
@kosh.typed_transformer_with_format({"numlist":["numpy","str", "numlist"]})
def list_to_formats(inputs, format):
out = 10. * numpy.array(inputs)
if format == "str":
return str(out)
elif format == "numlist":
return out.tolist()
else:
return out
print("Raw :", dataset["numbers"][:])
for format in ["numpy", "str", "numlist"]:
data = dataset.get("numbers", transformers=[list_to_formats,], format=format)[:]
print("As {:7s} :".format(format), data, type(data) )
A warning about indices¶
Kosh decorators will provide a transformer that will pass through the indices a user asked for (for efficiency). See this notebook for cases where this will return the wrong answer (e.g a transformer that flips the order of the input)
Now sometimes you will need a complex transformer with some initialization paramters, etc...
Let's learn how to build transformers from scratch
Basic Example Converting from list to numpy¶
This first example shows how to use transformers to convert between formats. We create a simple loader that returns a list of numbers as floats. This could be a loader for a very compex format.
Here two things could happen:
- The data is not a great format for us.
- The loader is slow (but uses proprietary libraries we cannot re-implement)
A transformer can help for both of this.
- The transformer will convert data to a desired format (numpy arrays here)
- The result will be cached so that we can quickly reload the data many times in the script.
Now let's create a transformer to convert this list of floats to a numpy array on the fly. (we understand it's a one liner in python)
All we need to do is inherit the basic kosh transformer and implement the transform
call.
transform
takes the inputs
and a format
as input.
It needs a numlist as an input
import time
class Ints2Np(kosh.transformers.KoshTransformer):
types = {"numlist": ["numpy"]} # Known inputs type and matching possible output formats
def transform(self, inputs, format):
time.sleep(2) # Artificial slowdown
return numpy.array(inputs, dtype=numpy.float32)
A simple feature retrieval (or a call to get
) will return our list
feature = dataset["numbers"]
feature[:]
But we want a numpy array and our loader cannot do that!
try:
feature(format="numpy")
except:
print("Failed as expected")
We need to use our transformer.
Let's inform Kosh about it
feature = dataset.get_execution_graph("numbers", transformers=[Ints2Np(),])
data = feature(format='numpy')
print(data)
It works but it is still slow if we call it again
%time feature(format="numpy")
We need to cache the result.
transform_to_npy = Ints2Np(cache=True, cache_dir=os.getcwd())
feature = dataset.get_execution_graph("numbers", transformers=[transform_to_npy,])
print("First time (caching)")
%time dataset.get("numbers", format="numpy", transformers=[transform_to_npy,])
print("Second time (cached)")
%time dataset.get("numbers", format="numpy", transformers=[transform_to_npy,])
Chaining Transformers¶
While this was neat, now that our data is in a format that we like we might want to further process it with other transformers. Fortunately these can be chained. The level of caching can be controlled as well.
Let's create an Even
transformer that gets only even numbers and a fake slow operation, in our case that transformer does nothing except pausing for a specific amount of time.
class Even(kosh.transformers.KoshTransformer):
types = {"numpy": ["numpy"]}
def transform(self, input, format):
return numpy.take(input, numpy.argwhere(numpy.mod(input, 2)==0))[:,0]
class SlowDowner(kosh.transformers.KoshTransformer):
types = {"numpy": ["numpy"]}
def __init__(self, sleep_time=3, cache_dir="kosh_cache", cache=False):
super(SlowDowner, self).__init__(cache_dir=cache_dir, cache=cache)
self.sleep_time = sleep_time
def transform(self, input, format):
# Fakes a slow operation
time.sleep(self.sleep_time)
return input
Let's chain these together
%time dataset.get("numbers", format="numpy", transformers=[transform_to_npy, SlowDowner(3), Even(), SlowDowner(4)])
Let's cache the last step
%time dataset.get("numbers", format="numpy", transformers=[transform_to_npy, SlowDowner(3), Even(), SlowDowner(4, cache_dir="kosh_cache", cache=True)])
Let's running again we should shove off the last 4 seconds, but let's cache the first 3 as well for next time
%time dataset.get("numbers", format="numpy", transformers=[transform_to_npy, SlowDowner(3, cache_dir="kosh_cache", cache=True), Even(), SlowDowner(4, cache_dir="kosh_cache", cache=True)])
Let's run it again all cached
%time dataset.get("numbers", format="numpy", transformers=[transform_to_npy, SlowDowner(3, cache_dir="kosh_cache", cache=True), Even(), SlowDowner(4, cache_dir="kosh_cache", cache=True)])
Some examples of transformers included in Kosh¶
Kosh comes with a few transformers
Numpy-related transformers¶
- KoshSimpleNpCache(cache_dir=kosh_cache_dir, cache=True) does nothing but caches the passed arrays using numpy.savez rather than the default (pickled objects)
- Take(cache_dir=kosh_cache_dir, cache=True, indices=[], axis=0, verbose=False) runs numpy.take. Will use mpi to split the indices over the available ranks, gather result on rank 0
- Delta(cache_dir=kosh_cache_dir, cache=True,cache_dir=kosh_cache_dir, cache=True, axis=0, pad=None, pad_value=0, verbose=False) computes difference over an axis between consecutive strides, possibly padding at start or end
Scikit Learn related transformers¶
see Next Notebook