import kosh
source_store = kosh.connect("source_store.sql", delete_all_contents=True)
target_store = kosh.connect("target_store.sql", delete_all_contents=True)
# Let's add a dataset to the source store
dataset = source_store.create(name="example")
dataset.foo = "bar"
# Let's import the dataset in our target store
target_store.import_dataset(dataset)
next(target_store.find(name="example"))
KOSH DATASET id: 53587b1920b04aac9629ba5faec84b93 name: example creator: cdoutrix --- Attributes --- creator: cdoutrix foo: bar name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---
Merging with existing datasets.¶
When moving datasets from one store to another we need to consider the possibility that the receiving (target_store
) store already contains one or many of the datasets imported from the incoming (source_store
) store. In this case, Kosh will merge the imported dataset attributes and associated sources with the existing dataset.
# Let's create a dataset with some attributes in the source store:
d_source = source_store.create(name="example 2", metadata={"foo":"foo", "bar":"bar"})
# Let's associate some file
d_source.associate("Example_Moving_Datasets.ipynb","notebook")
print(d_source)
# Let's create a similar dataset with the same name ('example') but different attributes in the target store:
d_target = target_store.create(name="example 2", metadata={"foo":"foo", "fuzz":"fuzzy"})
print(d_target)
KOSH DATASET id: c2ce0aa881dd429f824798d8041ede69 name: example 2 creator: cdoutrix --- Attributes --- bar: bar creator: cdoutrix foo: foo name: example 2 --- Associated Data (1)--- Mime_type: notebook /g/g19/cdoutrix/git/kosh/examples/Example_Moving_Datasets.ipynb ( 3e83847d698d4a9bb476e12f4cd30255 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- KOSH DATASET id: 7bde01bc0b71428696f24b30eeba99d4 name: example 2 creator: cdoutrix --- Attributes --- creator: cdoutrix foo: foo fuzz: fuzzy name: example 2 --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---
Let's import d_source
into the target store
target_store.import_dataset(d_source)
print(d_target)
KOSH DATASET id: 7bde01bc0b71428696f24b30eeba99d4 name: example 2 creator: cdoutrix --- Attributes --- bar: bar creator: cdoutrix foo: foo fuzz: fuzzy name: example 2 --- Associated Data (1)--- Mime_type: notebook /g/g19/cdoutrix/git/kosh/examples/Example_Moving_Datasets.ipynb ( 3e83847d698d4a9bb476e12f4cd30255 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---
What happened? Kosh ran a search on the target_store for dataset with the name attribute set to example 2
it found our already existing dataset d_target
. As a result Kosh merged the additional attributes and associated sources from d_source
in d_target
. As a result d_target
contains all of its original attributes and associated sources plus the ones from d_source
.
NOTE: Only the dataset in the target_store
is altered, the dataset in the source_store
is NEVER altered.
Now this worked because Kosh only found 1 dataset in the target_store
whose name matched the imported dataset.
At times it is possible that multiple datasets would match. In this case Kosh would bail out.
For example, let's create an additional dataset named "example" in each store (dataset2
in source_store
and dataset3
in target_store
).
This means both source_store
and target_store
will now each have 2 datasets named example
, but with different attributes.
# Source store
print("Original # of datasets named example in source store:",len(list(source_store.find(name="example"))))
dataset2 = source_store.create(name="example")
print("Now, # of datasets named example in source store:",len(list(source_store.find(name="example"))))
# Target store
print("Original # of datasets named example in target store:",len(list(target_store.find(name="example")))) # Only the dataset we imported earlier
dataset3 = target_store.create(name="example")
print("Now, # of datasets named example in target store:",len(list(target_store.find(name="example")))) # The dataset we imported earlier and the one we just added
Original # of datasets named example in source store: 1 Now, # of datasets named example in source store: 2 Original # of datasets named example in target store: 1 Now, # of datasets named example in target store: 2
Now let's try to import the dataset2
from source_store
into target_store
.
try:
target_store.import_dataset(dataset2)
except ValueError as err:
print(err)
dataset criteria: {'name': 'example'} matches multiple (2) datasets in store target_store.sql, try changing 'match_attributes' when calling this function
What happened?
When importing a dataset into a store, Kosh runs a search in the target_store
store for all datasets with a matching name
attribute.
If multiple datasets are found with the same name, Kosh cannot uniquely determine which dataset to merge with.
In our case target_store
contains the dataset previously imported and dataset3
which we just created. That means 2 datasets with the attribute name
and value example
are in the target_Store
and Kosh cannot uniquely determine which it should merge with.
In order to help Kosh we can use the match_attributes
to help Kosh pinpoint our dataset. By default match_attributes
is set to ["name",]
Before going further, let's populate these newly created datasets with additional attributes.
Some attribute (bar
) will have the same value for each datasets, but others (foo
, foosome
) will have non-matching values.
# Dataset in source_store
dataset2.bar = "foo"
dataset2.foo = "bar2"
dataset2.foosome = "foo1"
# Dataset in target store
dataset3.bar = "foo"
dataset3.foo = "bar3"
dataset3.foosome = "foo2"
# Let's print the dataset with `name` value of `example` in the target store
for ds in target_store.find(name="example"):
print(ds)
KOSH DATASET id: 5f51a3c9838d4ff5801fb6ae76449178 name: example creator: cdoutrix --- Attributes --- bar: foo creator: cdoutrix foo: bar3 foosome: foo2 name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- KOSH DATASET id: 53587b1920b04aac9629ba5faec84b93 name: example creator: cdoutrix --- Attributes --- creator: cdoutrix foo: bar name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---
It appears that asking Kosh to use bar
as an additional match_attribute
would let Kosh pinpoint a single dataset in the target_store
for ds in target_store.find(name="example", bar="foo2"):
print(ds)
Let's go for it
try:
target_store.import_dataset(dataset2, match_attributes=["name", "bar"])
except ValueError as err:
print(err)
Trying to import dataset with attribute 'foo' value : bar2. But value for this attribute in target is 'bar3'
What happened now?
As expected, Kosh did find a unique dataset with the attributes name
and bar
matching our incoming dataset.
So far, so good.
Unfortunately the dataset in the target_store
store shares a common attribute foo
with our incoming dataset, and their values do not match.
By default Kosh will bail out when conflicts arise.
Otherwise the dataset in the target_store
will be altered based on the values of the imported dataset (from source_store
).
NOTE: Only the dataset in the target_store
is altered, the dataset in the source_store
is NEVER altered.
print("source:", dataset2.foo)
print("target:", dataset3.foo)
source: bar2 target: bar3
Fortunately we can tell Kosh how to handle conflicts via the merge_handler
attribute, which is set to conservative
by default.
Other options are preserve
or overwrite
target_store.import_dataset(dataset2, match_attributes=["name", "bar"], merge_handler="preserve")
# Attributes are preserved (in the `target_store` only, the source dataset is never altered)
print(dataset2)
print("Attributes of interest on dataset2:", dataset2.foo, dataset2.foosome)
print(dataset3)
print("Attributes of interest on dataset3:", dataset3.foo, dataset3.foosome)
KOSH DATASET id: 0b9df67dd1b04f67a7d8042ea422d851 name: example creator: cdoutrix --- Attributes --- bar: foo creator: cdoutrix foo: bar2 foosome: foo1 name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- Attributes of interest on dataset2: bar2 foo1 KOSH DATASET id: 5f51a3c9838d4ff5801fb6ae76449178 name: example creator: cdoutrix --- Attributes --- bar: foo creator: cdoutrix foo: bar3 foosome: foo2 name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- Attributes of interest on dataset3: bar3 foo2
target_store.import_dataset(dataset2, match_attributes=["name", "bar"], merge_handler="overwrite")
# Attributes are overwritten (in the `target_store`, the source dataset is never altered)
print(dataset2)
print("Attributes of interest on dataset2:", dataset2.foo, dataset2.foosome)
print(dataset3)
print("Attributes of interest on dataset3:", dataset3.foo, dataset3.foosome)
KOSH DATASET id: 0b9df67dd1b04f67a7d8042ea422d851 name: example creator: cdoutrix --- Attributes --- bar: foo creator: cdoutrix foo: bar2 foosome: foo1 name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- Attributes of interest on dataset2: bar2 foo1 KOSH DATASET id: 5f51a3c9838d4ff5801fb6ae76449178 name: example creator: cdoutrix --- Attributes --- bar: foo creator: cdoutrix foo: bar2 foosome: foo1 name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- Attributes of interest on dataset3: bar2 foo1
Kosh also let you pass your own custom handler function. This function will receive the merge_handler_kargs
keyword arguments.
The function declaration should be: foo(store_dataset, imported_dataset_attributes_dict, section, **merge_handler_kargs)
Where:
store_dataset
is the destination kosh dataset or its non-data section dictionary.imported_dataset_attributes_dict
is a dictionary of attributes/values of the dataset we're importing.section
is the section of the record being updated.merge_handler_kargs
is a dict of passed for this function.
The function should return a dictionary of attributes/values that the target_dataset should have.
Let's design a function that would overwrite some parameters but preserve others, based on the input keyword overwrite_attributes
def my_handler(store_dataset, imported_dataset_dict, section, overwrite_attributes=[], **kargs):
# prepare the target dict
imported_attributes = imported_dataset_dict
target_attributes = {}
# We only care about the data section here
if section == "data":
store_attributes = store_dataset.list_attributes(dictionary=True)
target_attributes.update(imported_attributes)
target_attributes.update(store_attributes)
for attribute, value in imported_attributes.items():
if attribute in store_attributes:
if attribute in overwrite_attributes:
target_attributes[attribute] = value
return target_attributes
Now let's reset our dataset attributes and tell it to overwrite foo
but not foosome
dataset3.bar = "foo"
dataset3.foo = "bar3"
dataset3.foosome = "foo2"
target_store.import_dataset(dataset2, match_attributes=["name", "bar"], merge_handler=my_handler, merge_handler_kargs={"overwrite_attributes":["foo",]})
# Attribute foo is overwritten, foosome is preserved (in the target_store only, the source dataset is never altered)
print(dataset2)
print("Attribute of interest on dataset2:", dataset2.foo, dataset2.foosome)
print(dataset3)
print("Attribute of interest on dataset3:", dataset3.foo, dataset3.foosome)
KOSH DATASET id: 0b9df67dd1b04f67a7d8042ea422d851 name: example creator: cdoutrix --- Attributes --- bar: foo creator: cdoutrix foo: bar2 foosome: foo1 name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- Attribute of interest on dataset2: bar2 foo1 KOSH DATASET id: 5f51a3c9838d4ff5801fb6ae76449178 name: example creator: cdoutrix --- Attributes --- bar: foo creator: cdoutrix foo: bar2 foosome: foo2 name: example --- Associated Data (0)--- --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary --- Attribute of interest on dataset3: bar2 foo2
I just want to import everything and make copies¶
If you do not wish to merge but simply import everything as copies, then set match_attributes
to ["id",]
as it is highly unlikely that 2 datasets created in different stores will end up with the same (randomly generated) id.
print("Before:", len(list(target_store.find(name="example"))), "datasets", list(target_store.find(name="example", ids_only=True)))
print("importing:", len(list(source_store.find(name="example"))), "datasets", list(source_store.find(name="example", ids_only=True)))
target_store.import_dataset(dataset2, match_attributes=["id",])
print("After:", len(list(target_store.find(name="example"))), "datasets (One was already here)")
Before: 2 datasets ['5f51a3c9838d4ff5801fb6ae76449178', '53587b1920b04aac9629ba5faec84b93'] importing: 2 datasets ['0b9df67dd1b04f67a7d8042ea422d851', '53587b1920b04aac9629ba5faec84b93'] After: 3 datasets (One was already here)
/g/g19/cdoutrix/.conda/envs/kosh/lib/python3.9/site-packages/kosh/store.py:887: UserWarning: When searching by id use id_pool warnings.warn("When searching by id use id_pool")
I only want the metadata, not the curves¶
Sometimes you only care about some sections of the sina record, for example only the data
section and not the curve_sets
section.
Kosh can skip over specified sections you're not interested in, simply pass the section(s) to ignore via the skip_sina_record_sections
keyword.
some_store = kosh.connect("temp.sql", delete_all_contents=True)
some_store.import_dataset("sina_curve_rec_2.json", skip_sina_record_sections=["curve_sets",])
next(some_store.find()) # no curve
KOSH DATASET id: obj2 name: ??? creator: ??? --- Attributes --- param1: 1 param2: 2 param3: 3.3 --- Associated Data (1)--- Mime_type: image/png foo.png ( obj2 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---
I would like to apply filter when importing dataset¶
Similarly to Sina, you can apply filtering functions while importing datasets, these function will be applied to each record you are importing in the order you passed them. Function are expected to accept a dataset as an input and return a dataset.
Let's revisit the curve skipping concept from above.
# First let's create a function that removes the curves
def yank_curvesets(dataset):
cont = True
features = dataset.list_features()
for feature in features:
try:
dataset.remove_curve_or_curve_set(feature)
except Exception:
# was already removed or not a curve
pass
return dataset
# Let's remove attribute presets
def yank_presets(dataset):
if hasattr(dataset, "presets"):
delattr(dataset, "presets")
return dataset
# now let's import the record into the store
some_store = kosh.connect("temp.sql", delete_all_contents=True)
some_store.import_dataset("sina_curve_rec_2.json",ingest_funcs=[yank_curvesets, yank_presets])
print(next(some_store.find())) # No curve no attribute 'presets'
del some_store
KOSH DATASET id: obj2 name: ??? creator: ??? --- Attributes --- param1: 1 param2: 2 param3: 3.3 --- Associated Data (1)--- Mime_type: image/png foo.png ( obj2 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---
/g/g19/cdoutrix/.conda/envs/kosh/lib/python3.9/site-packages/kosh/store.py:268: UserWarning: Unknown user, you will be logged as anonymous user warnings.warn("Unknown user, you will be logged as anonymous user")
I would like to apply filter I got from Sina¶
Sina comes with some ingest function that based on Sina records. Kosh provides an operator to automatically convert datasets to record in input and record to datasets in output. This means you can use Sina ingest function in Kosh.
Let's once revisit the curve skipping concept from above, with record-based functions.
# First let's create a function that removes the curves
@kosh.utils.datasets_in_place_of_records
def yank_curvesets(record):
record["curve_sets"]={}
return record
# Let's remove attribute presets
@kosh.utils.datasets_in_place_of_records
def yank_presets(record):
if hasattr(record["data"], "presets"):
delattr(record["data"], "presets")
return record
# now let's import the record into the store
some_store = kosh.connect("temp.sql", delete_all_contents=True)
some_store.import_dataset("sina_curve_rec_2.json",ingest_funcs=[yank_curvesets, yank_presets])
print(next(some_store.find())) # No curve no attribute 'presets'
del some_store
KOSH DATASET id: obj2 name: ??? creator: ??? --- Attributes --- param1: 1 param2: 2 param3: 3.3 --- Associated Data (1)--- Mime_type: image/png foo.png ( obj2 ) --- Ensembles (0)--- [] --- Ensemble Attributes --- --- Alias Feature Dictionary ---
Associating stores vs importing stores.¶
While importing a few datasets can be useful, there are cases where one might want to import an entire sub_store
into a central_store
, without worrying about merging.
For one this operation can be time intensive. Furthermore if the sub_store
keeps being edited, synchronizing between the two stores can become a real headache.
A work around this is to open both stores and run your queries on each store. This too can become a burden if the number of stores to synchronize increases.
Kosh can handle this under the hood for you. By simply associating another sub_store
with your central_store
you end up with an up-to-date union of both stores.
import os
for name in ["central_store.sql", "sub_store.sql"]:
if os.path.exists(name):
os.remove(name)
central_store = kosh.connect("central_store.sql", delete_all_contents=True)
sub_store = kosh.connect("sub_store.sql", delete_all_contents=True)
central_store.create(name = "dataset_in_central_store")
sub_store.create(name = "dataset_in_sub_store")
# only one dataset in central_store
print([x.name for x in central_store.find()])
['dataset_in_central_store']
Now let's associate the sub_store
with the central_store
central_store.associate(sub_store)
# Two datasets in central_store
print([x.name for x in central_store.find()])
['dataset_in_central_store', 'dataset_in_sub_store']
Note: By default store association is unilateral, e.g the sub_store
stays untouched and will have no idea the central_store
exists.
print([x.name for x in sub_store.find()])
['dataset_in_sub_store']
At any time we can undo this operation:
central_store.dissociate(sub_store)
# only one dataset in central_store
print([x.name for x in central_store.find()])
['dataset_in_central_store']
We can now re-associate but this time making both stores associated with each other
central_store.associate(sub_store, reciprocal=True)
# Two datasets in central_store
print([x.name for x in central_store.find()])
# Two datasets in sub_store
print([x.name for x in sub_store.find()])
['dataset_in_central_store', 'dataset_in_sub_store'] ['dataset_in_sub_store', 'dataset_in_central_store']
Note: It is worth mentioning that association will be further picked up if one of the stores is associated with them:
third_store = kosh.connect("third_store.sql", delete_all_contents=True)
# No datasets in third_store
print("Datasets in 3rd store:", [x.name for x in third_store.find()])
third_store.associate(sub_store)
print("Stores associated with 3rd store (uris):", list(central_store.get_associated_stores()))
print("Stores associated with sub_store: (uris)", list(sub_store.get_associated_stores()))
print("Stores associated with central_store: (uris)", list(third_store.get_associated_stores()))
# Now we have 2 datasets in third_store
print("Datasets in 3rd store after association with sub_store:", [x.name for x in third_store.find()])
# Dissociating sub_store from central_store:
sub_store.dissociate(central_store, reciprocal=True)
# Now we have 1 dataset in third_store
print("Dataset in 3rd store after dissociation", [x.name for x in third_store.find()])
# Rather than the list of associated stores uris,
# we can get the stores themselves:
print("Stores associated with 3rd store", list(third_store.get_associated_stores(uris=False)))
# We can also get the associated store via its uri
# this is important if you plan on doing more store association
# as Kosh will consider two python stores to be different stores for this purpose.
# To be safe, you can also re-open a store after association were changed
print("Sub store retrieved from 3rd store via uri:", third_store.get_associated_store("sub_store.sql"))
Datasets in 3rd store: [] Stores associated with 3rd store (uris): ['sub_store.sql'] Stores associated with sub_store: (uris) ['central_store.sql'] Stores associated with central_store: (uris) ['sub_store.sql'] Datasets in 3rd store after association with sub_store: ['dataset_in_sub_store', 'dataset_in_central_store'] Dataset in 3rd store after dissociation ['dataset_in_sub_store'] Stores associated with 3rd store [<kosh.store.KoshStore object at 0x1554bf4b9910>] Sub store retrieved from 3rd store via uri: <kosh.store.KoshStore object at 0x1554bf4b9910>