Skip to content

Ensemble

KoshEnsemble

Bases: KoshDataset

Source code in kosh/ensemble.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class KoshEnsemble(KoshDataset):
    def __init__(self, id, store, schema=None, record=None):
        """Kosh Ensemble
Ensemble allows to link together many datasets.
These datasets will inherit attributes and associated sources from the ensemble.

        :param id: dataset's unique Id
        :type id: str
        :param store: store containing the dataset
        :type store: KoshSinaStore
        :param schema: Kosh schema validator
        :type schema: KoshSchema
        :param record: to avoid looking up in sina pass sina record
        :type record: Record
        """
        with store.lock_strategy:
            super(KoshEnsemble, self).__init__(id, store,
                                               schema=schema, record=record,
                                               kosh_type=store._ensembles_type)
            self.__dict__["__protected__"] = ["__name__", "__creator__", "__store__",
                                              "_associated_data_", "__features__",
                                              "_associated_datasets_", "__ok_duplicates__",
                                              "__creation_date__"]
            # Attributes that the members can have on their own
            self.__dict__["__ok_duplicates__"] = ["creator", "id", "name", 'creation_date', 'last_modified_date']

    @lock_strategies.lock_method
    def __str__(self):
        """string representation"""
        st = super(KoshEnsemble, self).__str__()
        st = st.replace("KOSH DATASET", "KOSH ENSEMBLE")
        st = st[:st.find("--- Ensembles") - 1]
        if self._associated_datasets_ is not None:
            st += "\n--- Member Datasets ({})---\n".format(
                len(self._associated_datasets_))
            st += "\t{}".format(self._associated_datasets_)
        return st

    @lock_strategies.lock_method
    def cleanup_files(self, dry_run=False, interactive=False, **search_keys):
        """Cleanup the ensemble's members from references to dead files.
        You can filter associated objects by passing key=values
        e.g mime_type=hdf5 will only dissociate non-existing files associated with mime_type hdf5
        some_att=some_val will only dissociate non-existing files associated and having the attribute
        'some_att' with value of 'some_val'
        returns list of uris to be removed.
        :param dry_run: Only does a dry_run
        :type dry_run: bool
        :param interactive: interactive mode, ask before dissociating
        :type interactive: bool
        :returns: list of uris (to be) removed.
        :rtype: list
        """
        __check_valid_connection_type__(self.__store__.__connection_type__, ['write'])
        missings = super(
            KoshEnsemble,
            self).cleanup_files(
            dry_run=dry_run,
            interactive=interactive,
            **search_keys)
        for dataset in self.get_members():
            missings += dataset.cleanup_files(dry_run=dry_run,
                                              interactive=interactive, **search_keys)
        return missings

    @lock_strategies.lock_method
    def export(self, file=None):
        """Exports this ensemble datasets
        :param file: export datasets to a file
        :type file: None or str
        :return: dataset and its associated data
        :rtype: dict"""
        records = [cleanup_sina_record_from_kosh_sync(self.get_record()), ]
        for dataset_id in self.get_members(ids_only=True):
            records.append(
                cleanup_sina_record_from_kosh_sync(
                    self.get_record(dataset_id)))
        # We also need to export the relationships
        relationships = self.get_sina_store().relationships.find(
            None, self.__store__._ensemble_predicate, self.id)
        output_dict = {
            "minimum_kosh_version": None,
            "kosh_version": kosh.version(comparable=True),
            "sources_type": self.__store__._sources_type,
            "records": records,
            "relationships": relationships
        }

        update_json_file_with_records_and_relationships(file, output_dict)
        return output_dict

    @lock_strategies.lock_method
    def create(self, name="Unnamed Dataset", id=None,
               metadata={}, schema=None, sina_type=None, ensemble_tags=None,
               inherit_attributes=True, **kargs):
        """create a new (possibly named) dataset as a member of this ensemble.

        :param name: name for the dataset, defaults to None
        :type name: str, optional
        :param id: unique Id, defaults to None which means use uuid4()
        :type id: str, optional
        :param metadata: dictionary of attribute/value pair for the dataset, defaults to {}
        :type metadata: dict, optional
        :param schema: a KoshSchema object to validate datasets and when setting attributes
        :type schema: KoshSchema
        :param sina_type: If you want to query the store for a specific sina record type, not just a dataset
        :type sina_type: str
        :param inherit_attributes: Whether datasets inherit attributes from ensembles.
                                   If False, datasets can have the same attributes as ensembles
                                   and the different ensembles that the dataset belongs to can also
                                   have the same attributes. They can also have different values.
                                   Defaults to True.
        :type inherit_attributes: bool
        :param ensemble_tags: Organize datasets within ensemble by using tags. These "attributes"
                              are only available at the ensemble level for each dataset. These tags
                              can also be used in `ensemble.find_datasets(ensemble_tags=dict)`.
                              e.g., ensemble_tags={"even_or_odd": "even", "data_type": "test data"}
        :type ensemble_tags: dict
        :param kargs: extra keyword arguments (ignored)
        :type kargs: dict
        :raises RuntimeError: Dataset already exists
        :return: KoshDataset
        :rtype: KoshDataset
        """
        __check_valid_connection_type__(self.__store__.__connection_type__, ['write', 'append'])
        if sina_type == self.__store__._ensembles_type:
            raise ValueError("You cannot create an ensemble from an ensemble")

        attributes = self.list_attributes()
        for key in metadata:
            if key in attributes:
                if metadata[key] != getattr(self, key):
                    raise ValueError(
                        "'{}' is an attribute of this ensemble and "
                        "therefore cannot be an attribute of its descendants".format(key))
                else:
                    warnings.warn(
                        "'{}' is an attribute of this ensemble and "
                        "therefore cannot be an attribute of its descendants"
                        ". Values match so we will accept it here.".format(key), UserWarning)

        ds = self.__store__.create(
            name=name,
            id=id,
            metadata=metadata,
            schema=schema,
            sina_type=sina_type,
            **kargs)
        self.add(ds, inherit_attributes=inherit_attributes, ensemble_tags=ensemble_tags)
        return ds

    @lock_strategies.lock_method
    def add(self, dataset, inherit_attributes=True, ensemble_tags=None):
        """Adds a dataset to this ensemble
        :param dataset: The dataset to add to this ensemble
        :type dataset: KoshDataset or str
        :param inherit_attributes: Whether datasets inherit attributes from ensembles.
                                   If False, datasets can have the same attributes as ensembles
                                   and the different ensembles that the dataset belongs to can also
                                   have the same attributes. They can also have different values.
                                   Defaults to True.
        :type inherit_attributes: bool
        :param ensemble_tags: Organize datasets within ensemble by using tags. These "attributes"
                              are only available at the ensemble level for each dataset. These tags
                              can also be used in `ensemble.find_datasets(ensemble_tags=dict)`.
                              e.g., ensemble_tags={"even_or_odd": "even", "data_type": "test data"}
        :type ensemble_tags: dict
        """
        from sina.model import Relationship
        __check_valid_connection_type__(self.__store__.__connection_type__, ['write', 'append'])
        # Step1 make sure the dataset does not belong to another ensemble
        if isinstance(dataset, KoshDataset):
            dataset_id = dataset.id
        else:
            dataset_id = dataset
            dataset = self.__store__._load(dataset_id)
        relationships = self.get_sina_store().relationships.find(
            dataset_id, self.__store__._ensemble_predicate, None)

        if inherit_attributes:
            for rel in relationships:
                if rel.object_id != self.id:
                    other_ensemble = self.__store__.open(rel.object_id)
                    # Ok... Already a member of another ensemble.
                    # let's make sure there are no conflict here
                    for att in self.list_attributes():
                        if att in self.__dict__["__ok_duplicates__"]:
                            continue
                        if att in other_ensemble.list_attributes():
                            raise ValueError(
                                "Dataset {} is already part of ensemble {} "
                                "which already provides support for attribute: {}. Bailing".format(
                                    dataset_id, rel.object_id, att))
                else:
                    # ok it's already done, no need to do anything else
                    return

            # Ok we're good, let's now makes sure attributes are ok
            attributes = self.list_attributes(dictionary=True)
            dataset_attributes = dataset.list_attributes(dictionary=True)
            for att in dataset.list_attributes():
                if att in self.__dict__["__ok_duplicates__"]:
                    continue
                if att in attributes and dataset_attributes[att] != attributes[att]:
                    raise ValueError(
                        f"Dataset {dataset_id} has attribute `{att}` with value {dataset_attributes[att]}, "
                        f"this ensemble ({self.id}) has value `{attributes[att]}`\n"
                        "This error can by bypassed by setting `inherit_attributes=False` which makes it so "
                        "datasets can have the same attributes as ensembles and the different ensembles that "
                        "the dataset belongs to can also have the same attributes.")
            # At this point we need to add the ensemble attributes to the dataset
            for att in self.list_attributes():
                if att in self.__dict__["__ok_duplicates__"]:
                    continue
                dataset.___setattr___(att, getattr(self, att), force=True)
        else:
            # Use for core_sina.py ___setattr___()
            if isinstance(ensemble_tags, dict):
                ensemble_tags['INHERIT_ATTRIBUTES'] = False
            else:
                ensemble_tags = {'INHERIT_ATTRIBUTES': False}
        # Ok We are clear let's create the relationship
        rel = Relationship(
            self.id, dataset_id, self.__store__._ensemble_predicate)
        # Add ensemble tags
        if ensemble_tags is not None:
            if self.schema is not None:
                self.schema.validate(ensemble_tags, f"Ensemble {self.id}")
            if dataset.schema is not None:
                dataset.schema.validate(ensemble_tags, f"Dataset {dataset.id}")
            dataset.add_ensemble_tags(self.id, ensemble_tags)
        self.get_sina_store().relationships.insert(rel)

    @lock_strategies.lock_method
    def remove(self, dataset):
        """Removes a dataset from this ensemble. Does not delete the dataset.
        :param dataset: The dataset to remove
        :type dataset: KoshDataset or str
        """
        __check_valid_connection_type__(self.__store__.__connection_type__, ['write'])
        # Step1 make sure the dataset does not belong to another ensemble
        if isinstance(dataset, KoshDataset):
            dataset_id = dataset.id
        else:
            dataset_id = dataset
            dataset = self.__store__.open(dataset_id)
        relationships = self.get_sina_store().relationships.find(
            dataset_id, self.__store__._ensemble_predicate, self.id)
        # Delete ensemble tags from dataset
        ensemble_tags = dataset.list_ensemble_tags(ensemble_id=self.id)
        ensemble_tags = [et.replace(f"{self.id}_ENSEMBLE_TAG_", "") for et in ensemble_tags]
        dataset.delete_ensemble_tags(ensemble_id=self.id, ensemble_tags=ensemble_tags)
        if len(relationships) == 0:
            warnings.warn(
                "Dataset {} is not a member of ensemble {}".format(
                    dataset_id, self.id))
            return

        rel = relationships[0]
        self.get_sina_store().relationships.delete(rel.subject_id, rel.predicate, rel.object_id)

    delete = remove

    @lock_strategies.lock_method
    def get_members(self, ids_only=False):
        """Generator for member datasets
        :param ids_only: generator will return ids if True Kosh datasets otherwise
        :type ids_only: bool
        :returns: generator of dataset (or ids)
        :rtype: str or KoshDataset
        """
        for id in self._associated_datasets_:
            if ids_only:
                yield id
            else:
                yield self.__store__.open(id)

    @lock_strategies.lock_method
    def find_datasets(self, ensemble_tags=None, *atts, **keys):
        """Find datasets members of this ensemble that are matching some metadata.
        Arguments are the metadata names we are looking for e.g
        find("attr1", "attr2")
        you can further restrict by specifying exact value for a metadata
        via key=value
        you can return ids only by using: ids_only=True
        range can be specified via: sina.utils.DataRange(min, max)

        :param ensemble_tags: Further filter out datasets with ensemble tags
        :type ensemble_tags: dict
        :return: generator of matching datasets in this ensemble
        :rtype: generator
        """
        if ensemble_tags is not None:
            for key, val in ensemble_tags.items():
                keys[f"{self.id}_ENSEMBLE_TAG_{key}"] = val
        members_ids = list(self.get_members(ids_only=True))
        return self.__store__.find(id_pool=members_ids, *atts, **keys)

    @lock_strategies.lock_method
    def clone(self, *atts, **keys):
        """We cannot clone an ensemble"""
        __check_valid_connection_type__(self.__store__.__connection_type__, ['write', 'append'])
        raise NotImplementedError("Ensembles objects cannot clone themselves")

    @lock_strategies.lock_method
    def list_attributes(self, dictionary=False, no_duplicate=False):
        """list_attributes list all non protected attributes

        :param dictionary: return a dictionary of value/pair rather than just attributes names
        :type dictionary: bool

        :param no_duplicate: return only attributes that cannot be duplicated in members
        :type no_duplicate: bool

        :return: list of attributes set on object
        :rtype: list
        """

        attributes = super(KoshEnsemble, self).list_attributes(dictionary)
        if no_duplicate:
            return [x for x in attributes if x not in self.__dict__["__ok_duplicates__"]]
        else:
            return attributes

    @lock_strategies.lock_method
    def to_dataframe(self, data_columns=[], include_ensemble_attributes=True, include_ensemble_tags=True,
                     *atts, **keys):
        """Return the find_datasets object as a Pandas DataFrame.

        Pass in the same arguments and keyword arguments as the find method.

        Arguments are the metadata name we are looking for e.g
        find("attr1", "attr2")
        you can further restrict by specifying exact value for a metadata
        via key=value
        you can return ids only by using: ids_only=True
        range can be specified via: sina.utils.DataRange(min, max)

        "file_uri" is a reserved key that will return all records being associated
                   with the given "uri", e.g store.find(file_uri=uri)
        "types" let you search over specific sina record types only.
        "id_pool" will search based on id of Sina record or Kosh dataset. Can be a list.

        :param data_columns: Columns to extract. By default this will include ['id', 'name', 'creator',
                                                                               'creation_date', 'last_modified_date'].
                             If nothing is passed, will return all data.
        :type data_columns: Union(str, list), optional
        :param include_ensemble_attributes: Include ensemble attributes in DataFrame.
        :type include_ensemble_attributes: bool, optional
        :param include_ensemble_tags: Include ensemble tags in DataFrame.
        :type include_ensemble_tags: bool, optional
        :return: Pandas DataFrame
        :rtype: Pandas DataFrame
        """
        import pandas as pd
        if isinstance(data_columns, str):
            data_columns = [data_columns]

        keys['load_type'] = 'dictionary'
        keys['ids_only'] = False
        datasets = list(self.find_datasets(*atts, **keys))

        attr_dict = {}
        total_datasets = len(datasets)

        # Always have these by default
        defaults = ['id', 'name', 'creator', 'creation_date', 'last_modified_date']

        unique_keys = set()
        for dataset in datasets:
            unique_keys.update(list(dataset['data'].keys()))
        data_columns_all = sorted(unique_keys)
        # Remove all ensemble tags and only keep dataset attributes
        data_columns_no_ensemble_tags = [dc for dc in data_columns_all if "_ENSEMBLE_TAG_" not in dc]

        # Acquire all data if `data_columns` was not passed
        if not data_columns:
            data_columns = data_columns_no_ensemble_tags

        data_columns = defaults + data_columns  # Want defaults in front

        # Acquire ensemble attributes
        if include_ensemble_attributes:
            data_columns_ens_default_attributes = [f"{self.id}_ENSEMBLE_ATTRIBUTE_id",
                                                   f"{self.id}_ENSEMBLE_ATTRIBUTE_name",
                                                   f"{self.id}_ENSEMBLE_ATTRIBUTE_creator",
                                                   f"{self.id}_ENSEMBLE_ATTRIBUTE_creation_date",
                                                   f"{self.id}_ENSEMBLE_ATTRIBUTE_last_modified_date"]

            ens_attrs = self.list_attributes(dictionary=True)
            data_columns_ens_other_attributes = [f"{self.id}_ENSEMBLE_ATTRIBUTE_{attr}" for attr in ens_attrs.keys()
                                                 if attr not in defaults]

            data_columns += data_columns_ens_default_attributes + data_columns_ens_other_attributes

        # Remove all other ensemble tags and only keep this ensemble's tags
        if include_ensemble_tags:
            data_columns += [dc for dc in data_columns_all if f"{self.id}_ENSEMBLE_TAG_" in dc and
                             "_ENSEMBLE_TAG_INHERIT_ATTRIBUTES" not in dc]

        attr_dict = {d: [pd.NA] * total_datasets for d in data_columns}

        for i, dataset in enumerate(datasets):
            attr_dict['id'][i] = dataset['id']
            for column, values in dataset['data'].items():
                if column in data_columns:
                    attr_dict[column][i] = values.get('value', pd.NA)

        # Ensemble attributes
        if include_ensemble_attributes:
            for key, val in ens_attrs.items():
                attr_dict[f"{self.id}_ENSEMBLE_ATTRIBUTE_{key}"] = [val] * total_datasets

        df = pd.DataFrame(attr_dict)
        return df

__init__(id, store, schema=None, record=None)

Kosh Ensemble Ensemble allows to link together many datasets. These datasets will inherit attributes and associated sources from the ensemble.

    :param id: dataset's unique Id
    :type id: str
    :param store: store containing the dataset
    :type store: KoshSinaStore
    :param schema: Kosh schema validator
    :type schema: KoshSchema
    :param record: to avoid looking up in sina pass sina record
    :type record: Record
Source code in kosh/ensemble.py
    def __init__(self, id, store, schema=None, record=None):
        """Kosh Ensemble
Ensemble allows to link together many datasets.
These datasets will inherit attributes and associated sources from the ensemble.

        :param id: dataset's unique Id
        :type id: str
        :param store: store containing the dataset
        :type store: KoshSinaStore
        :param schema: Kosh schema validator
        :type schema: KoshSchema
        :param record: to avoid looking up in sina pass sina record
        :type record: Record
        """
        with store.lock_strategy:
            super(KoshEnsemble, self).__init__(id, store,
                                               schema=schema, record=record,
                                               kosh_type=store._ensembles_type)
            self.__dict__["__protected__"] = ["__name__", "__creator__", "__store__",
                                              "_associated_data_", "__features__",
                                              "_associated_datasets_", "__ok_duplicates__",
                                              "__creation_date__"]
            # Attributes that the members can have on their own
            self.__dict__["__ok_duplicates__"] = ["creator", "id", "name", 'creation_date', 'last_modified_date']

__str__()

string representation

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def __str__(self):
    """string representation"""
    st = super(KoshEnsemble, self).__str__()
    st = st.replace("KOSH DATASET", "KOSH ENSEMBLE")
    st = st[:st.find("--- Ensembles") - 1]
    if self._associated_datasets_ is not None:
        st += "\n--- Member Datasets ({})---\n".format(
            len(self._associated_datasets_))
        st += "\t{}".format(self._associated_datasets_)
    return st

add(dataset, inherit_attributes=True, ensemble_tags=None)

Adds a dataset to this ensemble

Parameters:

Name Type Description Default
dataset KoshDataset | str

The dataset to add to this ensemble

required
inherit_attributes bool

Whether datasets inherit attributes from ensembles. If False, datasets can have the same attributes as ensembles and the different ensembles that the dataset belongs to can also have the same attributes. They can also have different values. Defaults to True.

True
ensemble_tags dict

Organize datasets within ensemble by using tags. These "attributes" are only available at the ensemble level for each dataset. These tags can also be used in ensemble.find_datasets(ensemble_tags=dict). e.g., ensemble_tags={"even_or_odd": "even", "data_type": "test data"}

None
Source code in kosh/ensemble.py
@lock_strategies.lock_method
def add(self, dataset, inherit_attributes=True, ensemble_tags=None):
    """Adds a dataset to this ensemble
    :param dataset: The dataset to add to this ensemble
    :type dataset: KoshDataset or str
    :param inherit_attributes: Whether datasets inherit attributes from ensembles.
                               If False, datasets can have the same attributes as ensembles
                               and the different ensembles that the dataset belongs to can also
                               have the same attributes. They can also have different values.
                               Defaults to True.
    :type inherit_attributes: bool
    :param ensemble_tags: Organize datasets within ensemble by using tags. These "attributes"
                          are only available at the ensemble level for each dataset. These tags
                          can also be used in `ensemble.find_datasets(ensemble_tags=dict)`.
                          e.g., ensemble_tags={"even_or_odd": "even", "data_type": "test data"}
    :type ensemble_tags: dict
    """
    from sina.model import Relationship
    __check_valid_connection_type__(self.__store__.__connection_type__, ['write', 'append'])
    # Step1 make sure the dataset does not belong to another ensemble
    if isinstance(dataset, KoshDataset):
        dataset_id = dataset.id
    else:
        dataset_id = dataset
        dataset = self.__store__._load(dataset_id)
    relationships = self.get_sina_store().relationships.find(
        dataset_id, self.__store__._ensemble_predicate, None)

    if inherit_attributes:
        for rel in relationships:
            if rel.object_id != self.id:
                other_ensemble = self.__store__.open(rel.object_id)
                # Ok... Already a member of another ensemble.
                # let's make sure there are no conflict here
                for att in self.list_attributes():
                    if att in self.__dict__["__ok_duplicates__"]:
                        continue
                    if att in other_ensemble.list_attributes():
                        raise ValueError(
                            "Dataset {} is already part of ensemble {} "
                            "which already provides support for attribute: {}. Bailing".format(
                                dataset_id, rel.object_id, att))
            else:
                # ok it's already done, no need to do anything else
                return

        # Ok we're good, let's now makes sure attributes are ok
        attributes = self.list_attributes(dictionary=True)
        dataset_attributes = dataset.list_attributes(dictionary=True)
        for att in dataset.list_attributes():
            if att in self.__dict__["__ok_duplicates__"]:
                continue
            if att in attributes and dataset_attributes[att] != attributes[att]:
                raise ValueError(
                    f"Dataset {dataset_id} has attribute `{att}` with value {dataset_attributes[att]}, "
                    f"this ensemble ({self.id}) has value `{attributes[att]}`\n"
                    "This error can by bypassed by setting `inherit_attributes=False` which makes it so "
                    "datasets can have the same attributes as ensembles and the different ensembles that "
                    "the dataset belongs to can also have the same attributes.")
        # At this point we need to add the ensemble attributes to the dataset
        for att in self.list_attributes():
            if att in self.__dict__["__ok_duplicates__"]:
                continue
            dataset.___setattr___(att, getattr(self, att), force=True)
    else:
        # Use for core_sina.py ___setattr___()
        if isinstance(ensemble_tags, dict):
            ensemble_tags['INHERIT_ATTRIBUTES'] = False
        else:
            ensemble_tags = {'INHERIT_ATTRIBUTES': False}
    # Ok We are clear let's create the relationship
    rel = Relationship(
        self.id, dataset_id, self.__store__._ensemble_predicate)
    # Add ensemble tags
    if ensemble_tags is not None:
        if self.schema is not None:
            self.schema.validate(ensemble_tags, f"Ensemble {self.id}")
        if dataset.schema is not None:
            dataset.schema.validate(ensemble_tags, f"Dataset {dataset.id}")
        dataset.add_ensemble_tags(self.id, ensemble_tags)
    self.get_sina_store().relationships.insert(rel)

cleanup_files(dry_run=False, interactive=False, **search_keys)

Cleanup the ensemble's members from references to dead files. You can filter associated objects by passing key=values e.g mime_type=hdf5 will only dissociate non-existing files associated with mime_type hdf5 some_att=some_val will only dissociate non-existing files associated and having the attribute 'some_att' with value of 'some_val' returns list of uris to be removed.

Parameters:

Name Type Description Default
dry_run bool

Only does a dry_run

False
interactive bool

interactive mode, ask before dissociating

False

Returns:

Type Description
list

list of uris (to be) removed.

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def cleanup_files(self, dry_run=False, interactive=False, **search_keys):
    """Cleanup the ensemble's members from references to dead files.
    You can filter associated objects by passing key=values
    e.g mime_type=hdf5 will only dissociate non-existing files associated with mime_type hdf5
    some_att=some_val will only dissociate non-existing files associated and having the attribute
    'some_att' with value of 'some_val'
    returns list of uris to be removed.
    :param dry_run: Only does a dry_run
    :type dry_run: bool
    :param interactive: interactive mode, ask before dissociating
    :type interactive: bool
    :returns: list of uris (to be) removed.
    :rtype: list
    """
    __check_valid_connection_type__(self.__store__.__connection_type__, ['write'])
    missings = super(
        KoshEnsemble,
        self).cleanup_files(
        dry_run=dry_run,
        interactive=interactive,
        **search_keys)
    for dataset in self.get_members():
        missings += dataset.cleanup_files(dry_run=dry_run,
                                          interactive=interactive, **search_keys)
    return missings

clone(*atts, **keys)

We cannot clone an ensemble

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def clone(self, *atts, **keys):
    """We cannot clone an ensemble"""
    __check_valid_connection_type__(self.__store__.__connection_type__, ['write', 'append'])
    raise NotImplementedError("Ensembles objects cannot clone themselves")

create(name='Unnamed Dataset', id=None, metadata={}, schema=None, sina_type=None, ensemble_tags=None, inherit_attributes=True, **kargs)

create a new (possibly named) dataset as a member of this ensemble.

Parameters:

Name Type Description Default
name (str, optional)

name for the dataset, defaults to None

'Unnamed Dataset'
id (str, optional)

unique Id, defaults to None which means use uuid4()

None
metadata (dict, optional)

dictionary of attribute/value pair for the dataset, defaults to {}

{}
schema KoshSchema

a KoshSchema object to validate datasets and when setting attributes

None
sina_type str

If you want to query the store for a specific sina record type, not just a dataset

None
inherit_attributes bool

Whether datasets inherit attributes from ensembles. If False, datasets can have the same attributes as ensembles and the different ensembles that the dataset belongs to can also have the same attributes. They can also have different values. Defaults to True.

True
ensemble_tags dict

Organize datasets within ensemble by using tags. These "attributes" are only available at the ensemble level for each dataset. These tags can also be used in ensemble.find_datasets(ensemble_tags=dict). e.g., ensemble_tags={"even_or_odd": "even", "data_type": "test data"}

None
kargs dict

extra keyword arguments (ignored)

{}

Returns:

Type Description
KoshDataset

KoshDataset

Raises:

Type Description
RuntimeError

Dataset already exists

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def create(self, name="Unnamed Dataset", id=None,
           metadata={}, schema=None, sina_type=None, ensemble_tags=None,
           inherit_attributes=True, **kargs):
    """create a new (possibly named) dataset as a member of this ensemble.

    :param name: name for the dataset, defaults to None
    :type name: str, optional
    :param id: unique Id, defaults to None which means use uuid4()
    :type id: str, optional
    :param metadata: dictionary of attribute/value pair for the dataset, defaults to {}
    :type metadata: dict, optional
    :param schema: a KoshSchema object to validate datasets and when setting attributes
    :type schema: KoshSchema
    :param sina_type: If you want to query the store for a specific sina record type, not just a dataset
    :type sina_type: str
    :param inherit_attributes: Whether datasets inherit attributes from ensembles.
                               If False, datasets can have the same attributes as ensembles
                               and the different ensembles that the dataset belongs to can also
                               have the same attributes. They can also have different values.
                               Defaults to True.
    :type inherit_attributes: bool
    :param ensemble_tags: Organize datasets within ensemble by using tags. These "attributes"
                          are only available at the ensemble level for each dataset. These tags
                          can also be used in `ensemble.find_datasets(ensemble_tags=dict)`.
                          e.g., ensemble_tags={"even_or_odd": "even", "data_type": "test data"}
    :type ensemble_tags: dict
    :param kargs: extra keyword arguments (ignored)
    :type kargs: dict
    :raises RuntimeError: Dataset already exists
    :return: KoshDataset
    :rtype: KoshDataset
    """
    __check_valid_connection_type__(self.__store__.__connection_type__, ['write', 'append'])
    if sina_type == self.__store__._ensembles_type:
        raise ValueError("You cannot create an ensemble from an ensemble")

    attributes = self.list_attributes()
    for key in metadata:
        if key in attributes:
            if metadata[key] != getattr(self, key):
                raise ValueError(
                    "'{}' is an attribute of this ensemble and "
                    "therefore cannot be an attribute of its descendants".format(key))
            else:
                warnings.warn(
                    "'{}' is an attribute of this ensemble and "
                    "therefore cannot be an attribute of its descendants"
                    ". Values match so we will accept it here.".format(key), UserWarning)

    ds = self.__store__.create(
        name=name,
        id=id,
        metadata=metadata,
        schema=schema,
        sina_type=sina_type,
        **kargs)
    self.add(ds, inherit_attributes=inherit_attributes, ensemble_tags=ensemble_tags)
    return ds

export(file=None)

Exports this ensemble datasets

Parameters:

Name Type Description Default
file None | str

export datasets to a file

None

Returns:

Type Description
dict

dataset and its associated data

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def export(self, file=None):
    """Exports this ensemble datasets
    :param file: export datasets to a file
    :type file: None or str
    :return: dataset and its associated data
    :rtype: dict"""
    records = [cleanup_sina_record_from_kosh_sync(self.get_record()), ]
    for dataset_id in self.get_members(ids_only=True):
        records.append(
            cleanup_sina_record_from_kosh_sync(
                self.get_record(dataset_id)))
    # We also need to export the relationships
    relationships = self.get_sina_store().relationships.find(
        None, self.__store__._ensemble_predicate, self.id)
    output_dict = {
        "minimum_kosh_version": None,
        "kosh_version": kosh.version(comparable=True),
        "sources_type": self.__store__._sources_type,
        "records": records,
        "relationships": relationships
    }

    update_json_file_with_records_and_relationships(file, output_dict)
    return output_dict

find_datasets(ensemble_tags=None, *atts, **keys)

Find datasets members of this ensemble that are matching some metadata. Arguments are the metadata names we are looking for e.g find("attr1", "attr2") you can further restrict by specifying exact value for a metadata via key=value you can return ids only by using: ids_only=True range can be specified via: sina.utils.DataRange(min, max)

Parameters:

Name Type Description Default
ensemble_tags dict

Further filter out datasets with ensemble tags

None

Returns:

Type Description
generator

generator of matching datasets in this ensemble

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def find_datasets(self, ensemble_tags=None, *atts, **keys):
    """Find datasets members of this ensemble that are matching some metadata.
    Arguments are the metadata names we are looking for e.g
    find("attr1", "attr2")
    you can further restrict by specifying exact value for a metadata
    via key=value
    you can return ids only by using: ids_only=True
    range can be specified via: sina.utils.DataRange(min, max)

    :param ensemble_tags: Further filter out datasets with ensemble tags
    :type ensemble_tags: dict
    :return: generator of matching datasets in this ensemble
    :rtype: generator
    """
    if ensemble_tags is not None:
        for key, val in ensemble_tags.items():
            keys[f"{self.id}_ENSEMBLE_TAG_{key}"] = val
    members_ids = list(self.get_members(ids_only=True))
    return self.__store__.find(id_pool=members_ids, *atts, **keys)

get_members(ids_only=False)

Generator for member datasets

Parameters:

Name Type Description Default
ids_only bool

generator will return ids if True Kosh datasets otherwise

False

Returns:

Type Description
str | KoshDataset

generator of dataset (or ids)

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def get_members(self, ids_only=False):
    """Generator for member datasets
    :param ids_only: generator will return ids if True Kosh datasets otherwise
    :type ids_only: bool
    :returns: generator of dataset (or ids)
    :rtype: str or KoshDataset
    """
    for id in self._associated_datasets_:
        if ids_only:
            yield id
        else:
            yield self.__store__.open(id)

list_attributes(dictionary=False, no_duplicate=False)

list_attributes list all non protected attributes

Parameters:

Name Type Description Default
dictionary bool

return a dictionary of value/pair rather than just attributes names

False
no_duplicate bool

return only attributes that cannot be duplicated in members

False

Returns:

Type Description
list

list of attributes set on object

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def list_attributes(self, dictionary=False, no_duplicate=False):
    """list_attributes list all non protected attributes

    :param dictionary: return a dictionary of value/pair rather than just attributes names
    :type dictionary: bool

    :param no_duplicate: return only attributes that cannot be duplicated in members
    :type no_duplicate: bool

    :return: list of attributes set on object
    :rtype: list
    """

    attributes = super(KoshEnsemble, self).list_attributes(dictionary)
    if no_duplicate:
        return [x for x in attributes if x not in self.__dict__["__ok_duplicates__"]]
    else:
        return attributes

remove(dataset)

Removes a dataset from this ensemble. Does not delete the dataset.

Parameters:

Name Type Description Default
dataset KoshDataset | str

The dataset to remove

required
Source code in kosh/ensemble.py
@lock_strategies.lock_method
def remove(self, dataset):
    """Removes a dataset from this ensemble. Does not delete the dataset.
    :param dataset: The dataset to remove
    :type dataset: KoshDataset or str
    """
    __check_valid_connection_type__(self.__store__.__connection_type__, ['write'])
    # Step1 make sure the dataset does not belong to another ensemble
    if isinstance(dataset, KoshDataset):
        dataset_id = dataset.id
    else:
        dataset_id = dataset
        dataset = self.__store__.open(dataset_id)
    relationships = self.get_sina_store().relationships.find(
        dataset_id, self.__store__._ensemble_predicate, self.id)
    # Delete ensemble tags from dataset
    ensemble_tags = dataset.list_ensemble_tags(ensemble_id=self.id)
    ensemble_tags = [et.replace(f"{self.id}_ENSEMBLE_TAG_", "") for et in ensemble_tags]
    dataset.delete_ensemble_tags(ensemble_id=self.id, ensemble_tags=ensemble_tags)
    if len(relationships) == 0:
        warnings.warn(
            "Dataset {} is not a member of ensemble {}".format(
                dataset_id, self.id))
        return

    rel = relationships[0]
    self.get_sina_store().relationships.delete(rel.subject_id, rel.predicate, rel.object_id)

to_dataframe(data_columns=[], include_ensemble_attributes=True, include_ensemble_tags=True, *atts, **keys)

Return the find_datasets object as a Pandas DataFrame.

Pass in the same arguments and keyword arguments as the find method.

Arguments are the metadata name we are looking for e.g find("attr1", "attr2") you can further restrict by specifying exact value for a metadata via key=value you can return ids only by using: ids_only=True range can be specified via: sina.utils.DataRange(min, max)

"file_uri" is a reserved key that will return all records being associated with the given "uri", e.g store.find(file_uri=uri) "types" let you search over specific sina record types only. "id_pool" will search based on id of Sina record or Kosh dataset. Can be a list.

Parameters:

Name Type Description Default
data_columns (Union(str, list), optional)

Columns to extract. By default this will include ['id', 'name', 'creator', 'creation_date', 'last_modified_date']. If nothing is passed, will return all data.

[]
include_ensemble_attributes (bool, optional)

Include ensemble attributes in DataFrame.

True
include_ensemble_tags (bool, optional)

Include ensemble tags in DataFrame.

True

Returns:

Type Description
Pandas DataFrame

Pandas DataFrame

Source code in kosh/ensemble.py
@lock_strategies.lock_method
def to_dataframe(self, data_columns=[], include_ensemble_attributes=True, include_ensemble_tags=True,
                 *atts, **keys):
    """Return the find_datasets object as a Pandas DataFrame.

    Pass in the same arguments and keyword arguments as the find method.

    Arguments are the metadata name we are looking for e.g
    find("attr1", "attr2")
    you can further restrict by specifying exact value for a metadata
    via key=value
    you can return ids only by using: ids_only=True
    range can be specified via: sina.utils.DataRange(min, max)

    "file_uri" is a reserved key that will return all records being associated
               with the given "uri", e.g store.find(file_uri=uri)
    "types" let you search over specific sina record types only.
    "id_pool" will search based on id of Sina record or Kosh dataset. Can be a list.

    :param data_columns: Columns to extract. By default this will include ['id', 'name', 'creator',
                                                                           'creation_date', 'last_modified_date'].
                         If nothing is passed, will return all data.
    :type data_columns: Union(str, list), optional
    :param include_ensemble_attributes: Include ensemble attributes in DataFrame.
    :type include_ensemble_attributes: bool, optional
    :param include_ensemble_tags: Include ensemble tags in DataFrame.
    :type include_ensemble_tags: bool, optional
    :return: Pandas DataFrame
    :rtype: Pandas DataFrame
    """
    import pandas as pd
    if isinstance(data_columns, str):
        data_columns = [data_columns]

    keys['load_type'] = 'dictionary'
    keys['ids_only'] = False
    datasets = list(self.find_datasets(*atts, **keys))

    attr_dict = {}
    total_datasets = len(datasets)

    # Always have these by default
    defaults = ['id', 'name', 'creator', 'creation_date', 'last_modified_date']

    unique_keys = set()
    for dataset in datasets:
        unique_keys.update(list(dataset['data'].keys()))
    data_columns_all = sorted(unique_keys)
    # Remove all ensemble tags and only keep dataset attributes
    data_columns_no_ensemble_tags = [dc for dc in data_columns_all if "_ENSEMBLE_TAG_" not in dc]

    # Acquire all data if `data_columns` was not passed
    if not data_columns:
        data_columns = data_columns_no_ensemble_tags

    data_columns = defaults + data_columns  # Want defaults in front

    # Acquire ensemble attributes
    if include_ensemble_attributes:
        data_columns_ens_default_attributes = [f"{self.id}_ENSEMBLE_ATTRIBUTE_id",
                                               f"{self.id}_ENSEMBLE_ATTRIBUTE_name",
                                               f"{self.id}_ENSEMBLE_ATTRIBUTE_creator",
                                               f"{self.id}_ENSEMBLE_ATTRIBUTE_creation_date",
                                               f"{self.id}_ENSEMBLE_ATTRIBUTE_last_modified_date"]

        ens_attrs = self.list_attributes(dictionary=True)
        data_columns_ens_other_attributes = [f"{self.id}_ENSEMBLE_ATTRIBUTE_{attr}" for attr in ens_attrs.keys()
                                             if attr not in defaults]

        data_columns += data_columns_ens_default_attributes + data_columns_ens_other_attributes

    # Remove all other ensemble tags and only keep this ensemble's tags
    if include_ensemble_tags:
        data_columns += [dc for dc in data_columns_all if f"{self.id}_ENSEMBLE_TAG_" in dc and
                         "_ENSEMBLE_TAG_INHERIT_ATTRIBUTES" not in dc]

    attr_dict = {d: [pd.NA] * total_datasets for d in data_columns}

    for i, dataset in enumerate(datasets):
        attr_dict['id'][i] = dataset['id']
        for column, values in dataset['data'].items():
            if column in data_columns:
                attr_dict[column][i] = values.get('value', pd.NA)

    # Ensemble attributes
    if include_ensemble_attributes:
        for key, val in ens_attrs.items():
            attr_dict[f"{self.id}_ENSEMBLE_ATTRIBUTE_{key}"] = [val] * total_datasets

    df = pd.DataFrame(attr_dict)
    return df