Advanced Slicing in Kosh¶
This notebook is for advanced users.
We will show how to re-implement __getitem__
in Kosh's Loaders, Transformers or Operators, to allow efficient slicing.
Concepts¶
In early version of Kosh (<1.2) , Transformers and Loaders would only allow to retrieve a feature in its whole. Loaders could be fed user-defined keywords to subset a feature but nothing general was in place.
Similarly transformers would get the entire feature before processing it.
Starting with 1.2, Kosh offers the possibility to slice a feature at any level, allowing for efficient data retrieval.
In this notebook will assume we have a very big dataset that cannot fit in memory at once. We will show how Kosh can still work on chunks of the datasets.
The need for indexing¶
To demonstrate Kosh capabilities we will create a custom loader, that returns range(N) with N coming from the source name itself. When N becomes very big, a regular ingestion is no longer possible.
Standard Loader¶
Let's create this loader in its most basic form.
import kosh
import os
import numpy
class MyBasicLoader(kosh.KoshLoader):
types = {"range": ["numpy", ]}
def extract(self):
# ridiculous over-simplification, use name to get length!
length = int(os.path.basename(self.obj.uri))
return numpy.arange(length)
def list_features(self):
return ["range", ]
Let's add this loader to Kosh and try to retrieve a few fake datasets
It's a very simple loader, that can handle returning a single item or a slice.
Note that we didn't implement things like ellipsis, etc...
store = kosh.connect("data_slicing.sql", delete_all_contents=True)
store.add_loader(MyBasicLoader)
ds = store.create()
# associate a fake source with it
# name is "5" so we should return numpy.arange(5)
ds.associate("5", mime_type="range")
rg = ds["range"]
print("values:", rg())
values: [0 1 2 3 4]
So far all is good. But now let's assume we need a really long range
ds = store.create()
# associate a fake source with it
# name is "5" so we should return numpy.arange(5)
ds.associate("500000000000000000000000", mime_type="range")
rg = ds["range"]
try:
print("values:", rg())
except Exception as error:
print("We failed with the following error:", error)
We failed with the following error: Maximum allowed size exceeded
Maybe slicing this would work
try:
print("values:", rg[:3])
except Exception as error:
print("We failed with the following error:", error)
We failed with the following error: Maximum allowed size exceeded
Unfortunately no, this is because our loader does not explicitly define a way to slice the data.
Introducing __getitem__
in the loader¶
Let's solve this by adding a __getitem__
function to our loader.
It's a very simple loader, that can handle returning a single item or a slice.
Note that we didn't implement things like ellipsis, etc...
import kosh
class MySlicingLoader(MyBasicLoader):
types = {"range": ["numpy", ]}
def __getitem__(self, key):
length = int(os.path.basename(self.obj.uri))
if isinstance(key, int):
if 0 <= key < length:
return numpy.array(key)
elif -length <= key < 0:
return length + key
else:
raise ValueError("Index {} is out of range".format(key))
elif isinstance(key, slice):
start = key.start
stop = key.stop
step = key.step
if start is None:
start = 0
if step is None:
step = 1
if stop is None:
stop = length
if -length < start < 0:
start += length
if -length < stop < 0:
stop += length
return numpy.arange(start, stop, step, dtype=numpy.float64)
else:
raise ValueError("Invalid key value: {}".format(key))
Now let's remove the old loader and add this to our store.
del(store.loaders["range"])
store.add_loader(MySlicingLoader)
rg = ds["range"]
print(rg[:5])
print(rg[2020:2025])
[0. 1. 2. 3. 4.] [2020. 2021. 2022. 2023. 2024.]
Problem solved! we can now easily slice and dice our data at the loader level?
Transformers and loaders.¶
But what about Transformers or Operators?
Let's create a transformer that multiplies the data by 2.
class Twice(kosh.transformers.KoshTransformer):
types = {"numpy":["numpy",]}
def transform(self, input_, format):
return input_*2.
Let's apply this transformer on our sliced data.
rg = ds.get_execution_graph("range", transformers=[Twice(),])
try:
print(rg[:5])
except Exception as err:
print("We run into the error again!: ", err)
We run into the error again!: Maximum allowed size exceeded
Helping the loader from the transformer/operator: __getitem_propagate__
¶
We need to implement the propagation function to let the loader's __getitem__
function know which indices are required: __getitem__propagate__
, in addition to the requested key, get_item_propagate also receive the index of the input to which we will propagate the corresponding key. More the index later in this notebook.
class TwiceWithPropagate(Twice):
def __getitem_propagate__(self, key, input_index):
return key
rg = ds.get_execution_graph("range", transformers=[TwiceWithPropagate(),])
rg[:5]
array([0., 2., 4., 6., 8.])
It works!
Gotchas!¶
So why didn't Kosh implement this function by default on all Transformers/Operators?
It turns out index propagation can be tricky. Let's examine the case where the transformers also flips the data. Let's apply this to a smaller dataset so we can more easily follow the logic.
class FlipPropagate(kosh.transformers.KoshTransformer):
def __getitem_propagate__(self, key, input_index):
return key
def transform(self, input_, format):
return input_[::-1] * 2.
ds = store.create()
ds.associate("20", mime_type="range")
rg = ds.get_execution_graph("range", transformers=[FlipPropagate(),])
rg[:5]
array([8., 6., 4., 2., 0.])
Compare to the full solution
rg[:]
array([38., 36., 34., 32., 30., 28., 26., 24., 22., 20., 18., 16., 14., 12., 10., 8., 6., 4., 2., 0.])
We should have received: array([38., 36., 34., 32., 30.])
So what went wrong?
Well our transformer dutifully propagated to our loader that we were only interested in the first 5 elements. As a result the loader sent back 0, 1, 2, 3, 4
which our transformer appropriately flipped and multiplied by 2.
So what should we do? Well we need to implement __getitem_propagate__
in such a way that the loaders returns the last 5 elements and not the first five elements.
Here again we will over simplify and implement only positive int and slices as possible keys.
class FlipPropagateOk(kosh.transformers.KoshTransformer):
def __getitem_propagate__(self, key, input_index):
if isinstance(key, int):
return -1 - key
elif isinstance(key, slice):
if key.stop is None:
start = 0
else:
start = -key.stop
if key.start is None:
stop = None
else:
stop = -key.start
return slice(start, stop, key.step)
else:
return slice(None, None, None)
def transform(self, input_, format):
return input_[::-1] * 2.
rg = ds.get_execution_graph("range", transformers=[FlipPropagateOk(),])
rg[:5]
array([38., 36., 34., 32., 30.])
Hurray! It works!
The same is true for Operators.
class ADD(kosh.KoshOperator):
types = {"numpy": ["numpy", ]}
def operate(self, *inputs, **kargs):
out = inputs[0]
for input_ in inputs[1:]:
out += input_
return out
def __getitem_propagate__(self, key, input_index):
return key
rg1 = ds.get_execution_graph("range", transformers=[FlipPropagateOk(),])
rg2 = ds.get_execution_graph("range", transformers=[TwiceWithPropagate()])
add = ADD(rg1, rg2)
print(add[5:-6])
[38. 38. 38. 38. 38. 38. 38. 38. 38.]
Now why do we also send the index to the operator?
This can be useful for complex operators that take in a lot of inputs. Two use case come to mind:
- The indexing is different based on the position of the input
- the indexing can generate a result that kills propagation (e.g do nothing)
Let's create an operator that would act as a virtual concatenator.
The operator will take feature read by our slicing loader, for simplicity we will assume the features are all 10 long
In the __get_item_propagate__
function we will check if the input is in the range requested. If not we will kill propagtion, otherwise we figure the indices needed for that feature.
It is important to note that input_index
is passed as a keyword, so our function definition MUST declare it with this exact name.
class VirtualConcatenator(kosh.KoshOperator):
types = {"numpy":["numpy",]}
def __init__(self, *inputs, **kargs):
# Assume each input is 10 long
self.length=len(inputs) * 10
super(VirtualConcatenator, self).__init__(*inputs, **kargs)
def __len__(self):
return self.length
def operate(self, *inputs, **args):
out = None
# This line purpose is to show how the propagate worked
print("Received:" ,inputs)
for input_ in inputs:
if input_ is not None:
# We got data back
if out is None:
out = numpy.array(input_)
else:
out = numpy.concatenate((out, numpy.array(input_)))
return out
Now let's setup a dozen input "features" to this operator.
ds = store.create()
ds.associate("10", mime_type="range")
VC = VirtualConcatenator(*[ds["range"] for x in range(12)])
all = VC[15:63]
len(all)
Received: (array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64), array([], dtype=int64))
0
the key
was passed to our loader, because it has a __get_item__
function. Unfortunately slice(15,65) is empty for all datasets...
Let's implement the propagate:
class VirtualConcatenator(kosh.KoshOperator):
types = {"numpy":["numpy",]}
def __init__(self, *inputs, **kargs):
# Assume each input is 10 long
self.length=len(inputs) * 10
super(VirtualConcatenator, self).__init__(*inputs, **kargs)
def __len__(self):
return self.length
def operate(self, *inputs, **args):
out = None
# This line purpose is to show how the propagate worked
print("Received:" ,inputs)
for input_ in inputs:
if input_ is not None:
# We got data back
if out is None:
out = numpy.array(input_)
else:
out = numpy.concatenate((out, numpy.array(input_)))
return out
def __getitem_propagate__(self, key, input_index):
"""only implementing slices with positive numbers"""
start = key.start
if start is None:
start = 0
stop = key.stop
if stop is None:
stop = self.length
start = start - (input_index)*10
if start >= 10:
# we start passed this feature
# let's tell Kosh to not propagate
return None
elif start < 0:
start = 0
stop = stop - (input_index)*10
if stop < 0:
# we end before this starts
# let's tell kosh to not propagte
return None
elif stop > 10:
stop = 10
# Ok there is some intersection
return slice(start, stop, key.step)
VC = VirtualConcatenator(*[ds["range"] for x in range(12)])
all = VC[15:63]
len(all)
Received: (None, array([5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2.]), None, None, None, None, None)
48
Lastly it is worth noting, that you can control the value sent to operate
when you abort the propagation. The default as seen above is None
but you can set self.index_results[index_result]
to whatever value you want. For example here let's use an empy array
class VirtualConcatenator(kosh.KoshOperator):
types = {"numpy":["numpy",]}
def __init__(self, *inputs, **kargs):
# Assume each input is 10 long
self.length=len(inputs) * 10
super(VirtualConcatenator, self).__init__(*inputs, **kargs)
def __len__(self):
return self.length
def operate(self, *inputs, **args):
out = numpy.array(inputs[0])
# This line purpose is to show how the propagate worked
print("Received:" ,inputs)
for input_ in inputs[1:]:
out = numpy.concatenate((out, numpy.array(input_)))
return out
def __getitem_propagate__(self, key, input_index):
"""only implementing slices with positive numbers"""
start = key.start
if start is None:
start = 0
stop = key.stop
if stop is None:
stop = self.length
start = start - (input_index)*10
if start >= 10:
# we start passed this feature
# let's tell Kosh to not propagate
# And return an empty array
self.index_results[input_index] = numpy.array([])
return None
elif start < 0:
start = 0
stop = stop - (input_index)*10
if stop < 0:
# we end before this starts
# let's tell kosh to not propagte
# And return an empty array
self.index_results[input_index] = numpy.array([])
return None
elif stop > 10:
stop = 10
# Ok there is some intersection
return slice(start, stop, key.step)
VC = VirtualConcatenator(*[ds["range"] for x in range(12)])
all = VC[15:63]
len(all)
Received: (array([], dtype=float64), array([5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]), array([0., 1., 2.]), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64), array([], dtype=float64))
48